How to manage positions, exposure, and P&L across multiple Betfair markets simultaneously in Java — portfolio tracking, risk limits, and market lifecycle management.
Running a single-market strategy is relatively straightforward. Running 20 markets in parallel introduces a different class of problem: you need to know your total exposure at any point, track P&L across markets in different states (pre-off, in-play, suspended, settled), and enforce risk limits that cut across the whole portfolio — not just one market at a time.
This post covers the architecture for multi-market portfolio management in Java: position tracking, exposure limits, P&L aggregation, and the market lifecycle state machine that keeps it all coherent.
Start with a clean domain model. A portfolio is a collection of market positions:
public record MarketPosition(
String marketId,
String selectionId,
Side side,
double averagePrice,
double stake,
double matched,
double liability,
MarketStatus status
) {
public double exposure() {
return side == Side.LAY ? liability : stake;
}
public double profitIfWin() {
return side == Side.BACK
? stake * (averagePrice - 1)
: -liability;
}
public double profitIfLose() {
return side == Side.BACK ? -stake : stake;
}
}
public record Portfolio(Map<String, List<MarketPosition>> positionsByMarket) {
public double totalExposure() {
return positionsByMarket.values().stream()
.flatMap(Collection::stream)
.mapToDouble(MarketPosition::exposure)
.sum();
}
public double exposureFor(String marketId) {
return positionsByMarket.getOrDefault(marketId, List.of()).stream()
.mapToDouble(MarketPosition::exposure)
.sum();
}
public List<MarketPosition> activePositions() {
return positionsByMarket.values().stream()
.flatMap(Collection::stream)
.filter(p -> p.status() != MarketStatus.SETTLED
&& p.status() != MarketStatus.CLOSED)
.toList();
}
}
A thread-safe tracker maintains live state and issues a snapshot on demand:
@Component
public class PortfolioTracker {
private final ConcurrentHashMap<String, ConcurrentHashMap<String, MarketPosition>> positions =
new ConcurrentHashMap<>();
private final ReadWriteLock lock = new ReentrantReadWriteLock();
public void upsertPosition(MarketPosition position) {
lock.writeLock().lock();
try {
positions
.computeIfAbsent(position.marketId(), k -> new ConcurrentHashMap<>())
.put(positionKey(position), position);
} finally {
lock.writeLock().unlock();
}
}
public Portfolio snapshot() {
lock.readLock().lock();
try {
var copy = positions.entrySet().stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
e -> new ArrayList<>(e.getValue().values())
));
return new Portfolio(copy);
} finally {
lock.readLock().unlock();
}
}
public double totalExposure() {
return snapshot().totalExposure();
}
public void evictMarket(String marketId) {
lock.writeLock().lock();
try {
positions.remove(marketId);
} finally {
lock.writeLock().unlock();
}
}
private String positionKey(MarketPosition p) {
return p.selectionId() + "-" + p.side().name();
}
}
ReadWriteLock gives concurrent read access — multiple strategy threads can check exposure simultaneously — while serialising writes from order fills.
Each market moves through a defined sequence of states. Tracking this lets the portfolio manager react correctly to suspensions, closures, and settlement:
public enum MarketStatus {
INACTIVE, ACTIVE, SUSPENDED, IN_PLAY, CLOSED, SETTLED
}
@Component
public class MarketLifecycleManager {
private final ConcurrentHashMap<String, MarketStatus> marketStates = new ConcurrentHashMap<>();
private final ApplicationEventPublisher publisher;
public void onMarketUpdate(String marketId, String rawStatus, boolean inPlay) {
var newStatus = resolveStatus(rawStatus, inPlay);
var previous = marketStates.put(marketId, newStatus);
if (!newStatus.equals(previous)) {
publisher.publishEvent(new MarketStatusChangedEvent(marketId, previous, newStatus));
}
}
private MarketStatus resolveStatus(String rawStatus, boolean inPlay) {
return switch (rawStatus) {
case "ACTIVE" -> inPlay ? MarketStatus.IN_PLAY : MarketStatus.ACTIVE;
case "SUSPENDED" -> MarketStatus.SUSPENDED;
case "CLOSED" -> MarketStatus.CLOSED;
default -> MarketStatus.INACTIVE;
};
}
public MarketStatus statusOf(String marketId) {
return marketStates.getOrDefault(marketId, MarketStatus.INACTIVE);
}
public Set<String> tradableMarkets() {
return marketStates.entrySet().stream()
.filter(e -> e.getValue() == MarketStatus.ACTIVE
|| e.getValue() == MarketStatus.IN_PLAY)
.map(Map.Entry::getKey)
.collect(Collectors.toSet());
}
}
Risk limits must gate order submission, not just report after the fact:
@Component
public class RiskController {
private final PortfolioTracker portfolio;
@Value("${trading.max-total-exposure:500.0}")
private double maxTotalExposure;
@Value("${trading.max-market-exposure:100.0}")
private double maxMarketExposure;
@Value("${trading.max-daily-loss:200.0}")
private double maxDailyLoss;
private final AtomicReference<Double> realisedPnlToday = new AtomicReference<>(0.0);
public RiskDecision evaluate(ProposedOrder order) {
var currentExposure = portfolio.totalExposure();
var marketExposure = portfolio.snapshot().exposureFor(order.marketId());
var orderExposure = order.exposure();
if (currentExposure + orderExposure > maxTotalExposure) {
return RiskDecision.reject("Total exposure limit would be breached: %.2f + %.2f > %.2f"
.formatted(currentExposure, orderExposure, maxTotalExposure));
}
if (marketExposure + orderExposure > maxMarketExposure) {
return RiskDecision.reject("Market exposure limit for %s would be breached"
.formatted(order.marketId()));
}
if (realisedPnlToday.get() < -maxDailyLoss) {
return RiskDecision.reject("Daily loss limit reached: %.2f"
.formatted(realisedPnlToday.get()));
}
return RiskDecision.approve();
}
public void recordSettlement(double pnl) {
realisedPnlToday.updateAndGet(v -> v + pnl);
}
}
public record RiskDecision(boolean approved, String reason) {
public static RiskDecision approve() { return new RiskDecision(true, null); }
public static RiskDecision reject(String reason) { return new RiskDecision(false, reason); }
}
The router coordinates strategy signals with the risk controller and market lifecycle:
@Component
public class MultiMarketOrderRouter {
private final RiskController risk;
private final MarketLifecycleManager lifecycle;
private final BettingApi api;
private final PortfolioTracker portfolio;
public void route(ProposedOrder order) {
var status = lifecycle.statusOf(order.marketId());
if (status == MarketStatus.SUSPENDED || status == MarketStatus.CLOSED) {
log.info("Rejecting order for {} — market status {}", order.marketId(), status);
return;
}
var decision = risk.evaluate(order);
if (!decision.approved()) {
log.warn("Order rejected by risk controller: {}", decision.reason());
return;
}
try {
var result = api.placeOrders(order.toPlaceInstruction());
recordFills(result, order);
} catch (ApiException e) {
log.error("Order placement failed for {}: {}", order.marketId(), e.getMessage());
}
}
private void recordFills(PlaceExecutionReport result, ProposedOrder original) {
result.instructionReports().stream()
.filter(r -> r.status() == InstructionReportStatus.SUCCESS)
.forEach(r -> portfolio.upsertPosition(new MarketPosition(
original.marketId(),
r.instruction().selectionId(),
original.side(),
r.averagePriceMatched(),
r.sizeMatched(),
r.sizeMatched(),
calculateLiability(r, original.side()),
MarketStatus.ACTIVE
)));
}
}
Subscribing to many markets in a single stream is more efficient than one connection per market. The Streaming API supports a list of market IDs in one subscription message, and resubscribing with an updated list is atomic:
@Component
public class MultiMarketStreamManager {
private final BetfairStreamConnection connection;
private final Set<String> subscribedMarkets = ConcurrentHashMap.newKeySet();
public void subscribe(Collection<String> marketIds) {
var newMarkets = marketIds.stream()
.filter(id -> !subscribedMarkets.contains(id))
.toList();
if (newMarkets.isEmpty()) return;
subscribedMarkets.addAll(newMarkets);
connection.send(buildSubscription(new ArrayList<>(subscribedMarkets)));
log.info("Subscribed to {} markets ({} new)", subscribedMarkets.size(), newMarkets.size());
}
public void unsubscribe(String marketId) {
if (subscribedMarkets.remove(marketId) && !subscribedMarkets.isEmpty()) {
connection.send(buildSubscription(new ArrayList<>(subscribedMarkets)));
}
}
private String buildSubscription(List<String> marketIds) {
var ids = marketIds.stream()
.map(id -> "\"" + id + "\"")
.collect(Collectors.joining(",", "[", "]"));
return """
{"op":"marketSubscription","id":2,
"marketFilter":{"marketIds":%s},
"marketDataFilter":{"fields":["EX_BEST_OFFERS","EX_TRADED","EX_MARKET_DEF"]}}
""".formatted(ids);
}
}
When a market settles, reconcile the portfolio against the Betfair API — don’t rely on local position tracking alone:
@Component
public class SettlementReconciler {
private final BettingApi api;
private final PortfolioTracker portfolio;
private final RiskController risk;
@EventListener
public void onMarketClosed(MarketStatusChangedEvent event) {
if (event.newStatus() != MarketStatus.CLOSED) return;
var settled = api.listSettledBets(
SettledBetFilter.builder().marketId(event.marketId()).build()
);
settled.forEach(bet -> {
risk.recordSettlement(bet.profit());
log.info("Market {} settled — selection {} P&L: {:.2f}",
event.marketId(), bet.selectionId(), bet.profit());
});
portfolio.evictMarket(event.marketId());
}
}
The API’s settled bet list is authoritative. Partial fills, voided bets, and Rule 4 deductions affect the final figures in ways your local model won’t capture unless you reconcile explicitly.
Race conditions between status and position updates. A suspension event and an order fill can arrive within milliseconds of each other on different threads. The lifecycle state and portfolio tracker must update correctly under concurrent writes — acquiring the write lock in PortfolioTracker and using ConcurrentHashMap in MarketLifecycleManager gives you the right separation. Never take both locks in sequence without a defined ordering, or you introduce a deadlock risk.
Memory growth from stale markets. Settled markets accumulate positions in memory indefinitely unless you evict them. The evictMarket call in the settlement reconciler handles this — call it after reconciliation, not before.
API rate limits across markets. The Betfair REST API applies limits at the key level, not the market level. Fetching listMarketBook for 20 markets individually costs 20 data requests. Batch them: up to 10 market IDs per listMarketBook call, and you stay within the data weight limits far more comfortably.
Daily P&L reset. The realisedPnlToday counter in RiskController needs resetting at midnight. Schedule a reset job rather than letting it accumulate indefinitely:
@Scheduled(cron = "0 0 0 * * *", zone = "Europe/London")
public void resetDailyPnl() {
realisedPnlToday.set(0.0);
log.info("Daily P&L counter reset");
}
If you’re building multi-market trading infrastructure in Java and want experienced hands on the architecture, discuss your project.