How to build a production-ready event store on DynamoDB with Spring Boot — table design, conditional writes, optimistic locking, and projection rebuilds.
Event sourcing is one of those patterns that sounds elegant in theory but requires careful infrastructure decisions in practice. When you choose to persist domain events rather than current state, the event store becomes the foundation of your system — every aggregate rebuild, every projection, every audit trail flows through it.
DynamoDB is an underrated choice for this role. Its conditional write semantics make optimistic locking natural, its sort key ordering gives you a free sequence guarantee per aggregate, and its DynamoDB Streams integration opens a path to eventually consistent projections without polling. This post walks through a production-ready event store implementation in Spring Boot backed by DynamoDB.
The access pattern for an event store is simple and predictable: append events for an aggregate, read events for an aggregate in order, and occasionally scan recent events for projection rebuild. DynamoDB’s composite key model maps directly to this — aggregateId as the partition key, sequenceNumber as the sort key. You get ordered reads per aggregate for free.
RDS alternatives work, but you need to design for contention on a hot aggregate. DynamoDB’s partition-per-aggregate model eliminates that problem. With PAY_PER_REQUEST billing you can prototype cheaply before committing to provisioned capacity.
Table eventsTable = Table.Builder.create(this, "DomainEvents")
.tableName("domain-events")
.partitionKey(Attribute.builder()
.name("aggregateId")
.type(AttributeType.STRING)
.build())
.sortKey(Attribute.builder()
.name("sequenceNumber")
.type(AttributeType.NUMBER)
.build())
.billingMode(BillingMode.PAY_PER_REQUEST)
.stream(StreamViewType.NEW_IMAGE)
.build();
Each item holds: aggregateId, sequenceNumber, aggregateType, eventType, payload (JSON), occurredAt, and correlationId. Nothing else — the event store is append-only and never updated.
public sealed interface DomainEvent permits
FundsDeposited, FundsWithdrawn, BetPlaced, BetSettled {
String aggregateId();
String eventType();
Instant occurredAt();
}
public record FundsDeposited(
String aggregateId,
BigDecimal amount,
String currency,
Instant occurredAt
) implements DomainEvent {
public String eventType() { return "FundsDeposited"; }
}
Sealed interfaces make exhaustive event handling straightforward — the compiler tells you when a new event type is added and you haven’t handled it everywhere.
The DynamoEventStore wraps the AWS SDK v2 DynamoDbClient and enforces the optimistic locking invariant with a conditional write.
@Component
public class DynamoEventStore {
private final DynamoDbClient dynamo;
private final ObjectMapper mapper;
private final String tableName;
public DynamoEventStore(DynamoDbClient dynamo, ObjectMapper mapper,
@Value("${app.events.table}") String tableName) {
this.dynamo = dynamo;
this.mapper = mapper;
this.tableName = tableName;
}
public void append(String aggregateId, String aggregateType,
List<DomainEvent> events, long expectedVersion) {
if (events.isEmpty()) return;
var writes = new ArrayList<TransactWriteItem>(events.size());
long seq = expectedVersion + 1;
for (DomainEvent event : events) {
var item = Map.of(
"aggregateId", av(aggregateId),
"sequenceNumber", av(seq),
"aggregateType", av(aggregateType),
"eventType", av(event.eventType()),
"payload", av(serialise(event)),
"occurredAt", av(event.occurredAt().toString())
);
writes.add(TransactWriteItem.builder()
.put(Put.builder()
.tableName(tableName)
.item(item)
.conditionExpression("attribute_not_exists(sequenceNumber)")
.build())
.build());
seq++;
}
try {
dynamo.transactWriteItems(TransactWriteItemsRequest.builder()
.transactItems(writes)
.build());
} catch (TransactionCanceledException e) {
throw new OptimisticLockException(
"Concurrent write conflict for aggregate " + aggregateId, e);
}
}
public List<StoredEvent> load(String aggregateId, long fromSequence) {
var response = dynamo.query(QueryRequest.builder()
.tableName(tableName)
.keyConditionExpression(
"aggregateId = :id AND sequenceNumber >= :from")
.expressionAttributeValues(Map.of(
":id", av(aggregateId),
":from", av(fromSequence)))
.build());
return response.items().stream()
.map(this::toStoredEvent)
.toList();
}
private static AttributeValue av(String s) { return AttributeValue.fromS(s); }
private static AttributeValue av(long n) { return AttributeValue.fromN(String.valueOf(n)); }
private String serialise(DomainEvent e) {
try { return mapper.writeValueAsString(e); }
catch (JsonProcessingException ex) { throw new RuntimeException(ex); }
}
}
attribute_not_exists(sequenceNumber) is the optimistic lock — if another writer has already appended at that sequence number, the condition fails and DynamoDB throws TransactionCanceledException. Catch it and surface a domain-level conflict.
The aggregate rehydrates itself from its event history:
public class BettingAccount {
private String id;
private BigDecimal balance = BigDecimal.ZERO;
private long version = 0;
private final List<DomainEvent> uncommittedEvents = new ArrayList<>();
private BettingAccount() {}
public static BettingAccount reconstitute(List<StoredEvent> history) {
var account = new BettingAccount();
history.forEach(account::apply);
return account;
}
public void deposit(BigDecimal amount) {
if (amount.compareTo(BigDecimal.ZERO) <= 0)
throw new IllegalArgumentException("Deposit amount must be positive");
var event = new FundsDeposited(id, amount, "GBP", Instant.now());
apply(new StoredEvent(event, ++version));
uncommittedEvents.add(event);
}
private void apply(StoredEvent stored) {
version = stored.sequenceNumber();
switch (stored.event()) {
case FundsDeposited e -> balance = balance.add(e.amount());
case FundsWithdrawn e -> balance = balance.subtract(e.amount());
case BetPlaced e -> balance = balance.subtract(e.stake());
case BetSettled e -> balance = balance.add(e.payout());
}
}
public long version() { return version; }
public List<DomainEvent> uncommittedEvents() { return List.copyOf(uncommittedEvents); }
public void clearUncommittedEvents() { uncommittedEvents.clear(); }
}
reconstitute replays every stored event to rebuild current state — no SELECT * FROM accounts needed.
@Repository
public class BettingAccountRepository {
private final DynamoEventStore store;
public Optional<BettingAccount> findById(String id) {
var history = store.load(id, 1L);
if (history.isEmpty()) return Optional.empty();
return Optional.of(BettingAccount.reconstitute(history));
}
public void save(BettingAccount account) {
store.append(
account.id(),
"BettingAccount",
account.uncommittedEvents(),
account.version() - account.uncommittedEvents().size()
);
account.clearUncommittedEvents();
}
}
DynamoDB Streams emit every write to a Lambda function or Kinesis consumer. Use this to keep read-side projections eventually consistent without coupling them to your write path.
@Component
public class EventProjectionHandler implements RequestHandler<DynamodbEvent, Void> {
private final BalanceSummaryRepository balanceSummary;
@Override
public Void handleRequest(DynamodbEvent event, Context context) {
event.getRecords().stream()
.filter(r -> "INSERT".equals(r.getEventName()))
.map(r -> r.getDynamodb().getNewImage())
.forEach(this::project);
return null;
}
private void project(Map<String, AttributeValue> item) {
var eventType = item.get("eventType").getS();
var aggregateId = item.get("aggregateId").getS();
var payload = item.get("payload").getS();
switch (eventType) {
case "FundsDeposited" -> balanceSummary.credit(aggregateId, parseAmount(payload));
case "FundsWithdrawn", "BetPlaced" -> balanceSummary.debit(aggregateId, parseAmount(payload));
case "BetSettled" -> balanceSummary.credit(aggregateId, parsePayout(payload));
}
}
}
DynamoDB Streams delivers at-least-once. Make every projection write idempotent — upsert on the projection’s natural key, never blind insert.
Replaying 50,000 events to reconstitute an account is expensive. Snapshotting avoids the full replay by capturing current state periodically:
@Component
public class SnapshotService {
private final DynamoDbClient dynamo;
private final String snapshotTable;
private static final int SNAPSHOT_THRESHOLD = 100;
public boolean shouldSnapshot(BettingAccount account) {
return account.version() % SNAPSHOT_THRESHOLD == 0;
}
public void save(BettingAccount account) {
var item = Map.of(
"aggregateId", av(account.id()),
"version", av(account.version()),
"state", av(serialiseState(account)),
"createdAt", av(Instant.now().toString())
);
dynamo.putItem(PutItemRequest.builder()
.tableName(snapshotTable)
.item(item)
.build());
}
}
When loading, check for a snapshot first. If one exists, load events from snapshot.version + 1 rather than from event 1.
app:
events:
table: domain-events-${spring.profiles.active}
cloud:
aws:
dynamodb:
endpoint: http://localhost:8000 # DynamoDB Local in dev
@Configuration
public class DynamoConfig {
@Bean
@Profile("!local")
public DynamoDbClient dynamoDbClient() {
return DynamoDbClient.builder()
.region(Region.EU_WEST_1)
.credentialsProvider(DefaultCredentialsProvider.create())
.build();
}
@Bean
@Profile("local")
public DynamoDbClient localDynamoDbClient(
@Value("${cloud.aws.dynamodb.endpoint}") String endpoint) {
return DynamoDbClient.builder()
.endpointOverride(URI.create(endpoint))
.credentialsProvider(StaticCredentialsProvider.create(
AwsBasicCredentials.create("local", "local")))
.region(Region.EU_WEST_1)
.build();
}
}
@SpringBootTest
@Testcontainers
class DynamoEventStoreTest {
@Container
static final GenericContainer<?> dynamo =
new GenericContainer<>("amazon/dynamodb-local:latest")
.withExposedPorts(8000);
@DynamicPropertySource
static void props(DynamicPropertyRegistry registry) {
registry.add("cloud.aws.dynamodb.endpoint",
() -> "http://localhost:" + dynamo.getMappedPort(8000));
registry.add("spring.profiles.active", () -> "local");
}
@Test
void appendAndLoad_roundTrip() {
var store = context.getBean(DynamoEventStore.class);
var events = List.of(
new FundsDeposited("acc-1", new BigDecimal("100.00"), "GBP", Instant.now()));
store.append("acc-1", "BettingAccount", events, 0L);
var loaded = store.load("acc-1", 1L);
assertThat(loaded).hasSize(1);
assertThat(loaded.get(0).eventType()).isEqualTo("FundsDeposited");
}
@Test
void append_throwsOnVersionConflict() {
var store = context.getBean(DynamoEventStore.class);
var events = List.of(
new FundsDeposited("acc-2", new BigDecimal("50.00"), "GBP", Instant.now()));
store.append("acc-2", "BettingAccount", events, 0L);
assertThatThrownBy(() -> store.append("acc-2", "BettingAccount", events, 0L))
.isInstanceOf(OptimisticLockException.class);
}
}
DynamoDB Local via Testcontainers gives every test run a clean, isolated table — no mocking, no shared state.
Read cost grows with aggregate size. An account with 10,000 events costs 10,000 read units every reconstitution. Introduce snapshots before this becomes a concern — 100–500 events per aggregate is a reasonable threshold.
DynamoDB Streams has a 24-hour retention window. If your projection consumer falls behind by more than 24 hours, you’ll miss events. Monitor IteratorAgeMilliseconds in CloudWatch and alert early.
Partition throughput limits. Each DynamoDB partition serves 3,000 read units/s and 1,000 write units/s. Hot aggregates with rapid event bursts can throttle. Use PAY_PER_REQUEST in early stages; switch to provisioned capacity with auto-scaling once traffic patterns are stable.
Idempotency on projections. At-least-once delivery means every projection write must be idempotent — upsert on the projection’s natural key, never blind insert.
Event sourcing with DynamoDB is a solid foundation for systems that need a full audit trail, time-travel debugging, or multiple read models from a single event stream. The conditional write semantics handle concurrency without a distributed lock, and Streams give you a clean event bus to feed projections without polling. If you’re building this kind of system, discuss your project.