Hire Me
← All Writing Architecture

The Saga Pattern for Distributed Transactions with Kafka

How to implement the Saga pattern for distributed transactions in Java with Kafka — choreography vs orchestration, compensating transactions, state tracking, and the failure modes that trip up every first implementation.

In a microservices architecture, you eventually face a transaction that spans multiple services — and you can’t use a database transaction to coordinate them. Creating a new benefit claim might require writing to the claims service, triggering a payment instruction in the payments service, notifying the eligibility service, and scheduling a review in the casework service. Each service has its own database. There is no distributed transaction coordinator that spans all of them. If one step fails halfway through, you need a strategy to undo the steps that already succeeded.

That strategy is the Saga pattern. I’ve implemented it on the DWP Digital JSA platform, where a claim lifecycle spans multiple bounded contexts and partial failures need deterministic, auditable recovery. This is what a production Saga implementation looks like in Java with Kafka.

What a Saga Is

A Saga is a sequence of local transactions, each of which publishes an event or message that triggers the next step. If a step fails, the Saga executes compensating transactions — inverse operations that undo the completed steps.

There are two implementation styles:

Choreography — each service listens for events and decides what to do next. No central coordinator. Services are loosely coupled but the overall flow is implicit and spread across multiple services.

Orchestration — a central Saga orchestrator sends commands to services and tracks the overall state. More coupling to the orchestrator but the flow is explicit and in one place.

For complex, long-running sagas with many steps and non-trivial error handling, orchestration is almost always the right choice. Choreography works well for simple, short sagas. I’ll focus on orchestration here.

The Saga State Machine

Every saga instance has a state. Model it explicitly:

public enum ClaimRegistrationState {
    STARTED,
    CLAIM_CREATED,
    ELIGIBILITY_CHECKED,
    PAYMENT_SCHEDULED,
    NOTIFICATION_SENT,
    COMPLETED,

    // Compensation states
    COMPENSATING_PAYMENT,
    COMPENSATING_ELIGIBILITY,
    COMPENSATING_CLAIM,
    FAILED
}

@Document(collection = "saga_instances")
@Data
public class ClaimRegistrationSaga {

    @Id
    private String sagaId;
    private String claimId;
    private ClaimRegistrationState state;
    private String failureReason;
    private Instant startedAt;
    private Instant updatedAt;
    private int stepAttempts;

    public static ClaimRegistrationSaga start(String claimId) {
        ClaimRegistrationSaga saga = new ClaimRegistrationSaga();
        saga.sagaId    = UUID.randomUUID().toString();
        saga.claimId   = claimId;
        saga.state     = ClaimRegistrationState.STARTED;
        saga.startedAt = Instant.now();
        saga.updatedAt = Instant.now();
        return saga;
    }

    public void transitionTo(ClaimRegistrationState newState) {
        this.state     = newState;
        this.updatedAt = Instant.now();
        this.stepAttempts = 0;
    }
}

Persist the saga state to MongoDB (or any database) — this is your source of truth for recovery. If the orchestrator crashes mid-execution, it can reload saga state on restart and resume from where it left off.

The Orchestrator

@Service
@RequiredArgsConstructor
@Slf4j
public class ClaimRegistrationOrchestrator {

    private final SagaRepository sagaRepository;
    private final KafkaTemplate<String, Object> kafka;

    public void startSaga(NewClaimRequest request) {
        ClaimRegistrationSaga saga = ClaimRegistrationSaga.start(request.claimId());
        sagaRepository.save(saga);

        // Step 1: create the claim
        kafka.send("claims.commands",
            request.claimId(),
            new CreateClaimCommand(saga.getSagaId(), request));

        log.info("Saga {} started for claim {}", saga.getSagaId(), request.claimId());
    }

    @KafkaListener(topics = "claims.events", groupId = "saga-orchestrator")
    public void onClaimEvent(SagaEvent event) {
        ClaimRegistrationSaga saga = sagaRepository.findBySagaId(event.sagaId())
            .orElseThrow(() -> new SagaNotFoundException(event.sagaId()));

        switch (event.eventType()) {
            case "ClaimCreated"          -> handleClaimCreated(saga, event);
            case "ClaimCreationFailed"   -> startCompensation(saga, "Claim creation failed");
            case "EligibilityConfirmed"  -> handleEligibilityConfirmed(saga, event);
            case "EligibilityRejected"   -> startCompensation(saga, "Eligibility check failed");
            case "PaymentScheduled"      -> handlePaymentScheduled(saga, event);
            case "PaymentFailed"         -> startCompensation(saga, "Payment scheduling failed");
            case "NotificationSent"      -> completeSaga(saga);
            default -> log.warn("Unknown event type {} for saga {}", event.eventType(), saga.getSagaId());
        }
    }

    private void handleClaimCreated(ClaimRegistrationSaga saga, SagaEvent event) {
        saga.transitionTo(ClaimRegistrationState.CLAIM_CREATED);
        sagaRepository.save(saga);

        // Step 2: check eligibility
        kafka.send("eligibility.commands",
            saga.getClaimId(),
            new CheckEligibilityCommand(saga.getSagaId(), saga.getClaimId()));
    }

    private void handleEligibilityConfirmed(ClaimRegistrationSaga saga, SagaEvent event) {
        saga.transitionTo(ClaimRegistrationState.ELIGIBILITY_CHECKED);
        sagaRepository.save(saga);

        // Step 3: schedule payment
        kafka.send("payments.commands",
            saga.getClaimId(),
            new SchedulePaymentCommand(saga.getSagaId(), saga.getClaimId()));
    }

    private void handlePaymentScheduled(ClaimRegistrationSaga saga, SagaEvent event) {
        saga.transitionTo(ClaimRegistrationState.PAYMENT_SCHEDULED);
        sagaRepository.save(saga);

        // Step 4: send notification
        kafka.send("notifications.commands",
            saga.getClaimId(),
            new SendNotificationCommand(saga.getSagaId(), saga.getClaimId()));
    }

    private void completeSaga(ClaimRegistrationSaga saga) {
        saga.transitionTo(ClaimRegistrationState.COMPLETED);
        sagaRepository.save(saga);
        log.info("Saga {} completed successfully for claim {}", saga.getSagaId(), saga.getClaimId());
    }

    private void startCompensation(ClaimRegistrationSaga saga, String reason) {
        log.warn("Saga {} entering compensation: {}", saga.getSagaId(), reason);
        saga.setFailureReason(reason);

        // Compensate in reverse order from current state
        switch (saga.getState()) {
            case PAYMENT_SCHEDULED -> {
                saga.transitionTo(ClaimRegistrationState.COMPENSATING_PAYMENT);
                kafka.send("payments.commands", saga.getClaimId(),
                    new CancelPaymentCommand(saga.getSagaId(), saga.getClaimId()));
            }
            case ELIGIBILITY_CHECKED, CLAIM_CREATED -> {
                saga.transitionTo(ClaimRegistrationState.COMPENSATING_CLAIM);
                kafka.send("claims.commands", saga.getClaimId(),
                    new DeleteClaimCommand(saga.getSagaId(), saga.getClaimId()));
            }
            default -> {
                saga.transitionTo(ClaimRegistrationState.FAILED);
                log.error("Saga {} failed at state {} — no compensation needed",
                    saga.getSagaId(), saga.getState());
            }
        }
        sagaRepository.save(saga);
    }
}

Compensating Transactions

Each forward step must have a corresponding compensating transaction. Compensations must be idempotent — if they’re retried due to a failure during compensation, they must be safe to execute multiple times.

@KafkaListener(topics = "claims.commands", groupId = "claims-service")
public void onClaimsCommand(SagaCommand command) {
    switch (command.commandType()) {
        case "CreateClaim"  -> handleCreateClaim(command);
        case "DeleteClaim"  -> handleDeleteClaim(command);  // compensation
    }
}

private void handleDeleteClaim(SagaCommand command) {
    // Idempotent: safe to call multiple times
    claimRepository.findById(command.claimId()).ifPresent(claim -> {
        claim.markAsDeleted("saga-compensation:" + command.sagaId());
        claimRepository.save(claim);
    });

    // Always publish success — even if the claim didn't exist
    kafka.send("claims.events", command.claimId(),
        new SagaEvent(command.sagaId(), "ClaimDeleted"));
}

The compensating handler publishes a success event even if the resource didn’t exist — this is correct behaviour, because the goal of compensation is to ensure the resource is in the “not created” state. If it was never created or was already deleted, that goal is already achieved.

Handling Timeouts

A saga step may never respond — the downstream service may be down. You need a timeout mechanism:

@Scheduled(fixedDelay = 30_000)
public void checkTimeouts() {
    Instant timeout = Instant.now().minus(Duration.ofMinutes(5));

    List<ClaimRegistrationSaga> stalled = sagaRepository
        .findByStateNotInAndUpdatedAtBefore(
            List.of(ClaimRegistrationState.COMPLETED, ClaimRegistrationState.FAILED),
            timeout);

    for (ClaimRegistrationSaga saga : stalled) {
        log.warn("Saga {} has been in state {} for >5 minutes — triggering timeout compensation",
            saga.getSagaId(), saga.getState());
        startCompensation(saga, "Step timeout after 5 minutes");
    }
}

Timeout handling is where many Saga implementations fall short. Without it, a stalled saga sits in a terminal-but-not-completed state indefinitely, leaving the system in a partially-consistent state with no automatic recovery.

Observability

A Saga that fails silently is worse than a synchronous transaction that throws an exception. Every state transition, compensation, timeout, and final outcome should be logged with the saga ID and business identifier:

private void logTransition(ClaimRegistrationSaga saga,
                            ClaimRegistrationState from,
                            ClaimRegistrationState to) {
    log.info("SAGA {} claim={} transition: {} -> {}", 
        saga.getSagaId(), saga.getClaimId(), from, to);
}

Build a dashboard or query over your saga collection to surface sagas in FAILED or COMPENSATING states — these require investigation. A healthy system should have very few non-COMPLETED sagas older than a few minutes.

The Saga pattern adds real complexity. But in a microservices architecture, the alternative is either distributed transactions (which don’t scale and introduce tight coupling) or silent partial failures (which are the worst possible outcome). Sagas are the right trade-off: explicit complexity in the orchestrator, clean boundaries between services, and deterministic recovery.

If you’re designing distributed systems where business transaction correctness matters across service boundaries, get in touch.