Hire Me
← All Writing Apache Kafka

Consumer Patterns — Idempotency and Exactly-Once Processing

Building idempotent Kafka consumers in Java — deduplication patterns, idempotency keys, outbox integration, and when to reach for Kafka transactions.

Kafka gives you at-least-once delivery by default. Commit your offsets manually, set enable.auto.commit=false, and your consumer will never lose a message — but it will process some messages more than once. A consumer that restarts mid-batch, a rebalance that causes a partition to be re-read, a Kubernetes pod that crashes after processing but before committing — all of these result in re-delivery. The question is not whether your consumer will see duplicates. It will. The question is whether your consumer can handle them safely.

Idempotency is the property that makes re-processing safe. An idempotent operation produces the same result whether executed once or ten times. Building idempotent consumers is the pragmatic path to correctness in Kafka-based systems — one I’ve applied on benefit claim event pipelines at DWP Digital and financial transaction processors at Mosaic Smart Data.

Idempotency Keys

Every event that can be re-delivered needs a stable, unique identifier — an idempotency key. For Kafka messages, the natural candidate is the message key combined with the offset and partition, but this is fragile if topics get compacted or consumers need to reprocess historical events. Better to embed the idempotency key in the event payload itself:

public record BenefitClaimEvent(
    String eventId,        // stable UUID, generated at source
    String claimId,
    EventType type,
    ClaimStatus newStatus,
    Instant occurredAt
) {}

The eventId is generated once at the point of publication and survives redelivery unchanged. Any downstream consumer can use it as a deduplication key without caring about partition or offset.

Deduplication with a Database Store

The simplest deduplication approach is a processed-events table. Before handling an event, check whether the eventId has been seen. If it has, skip and commit the offset:

@Service
@RequiredArgsConstructor
@Slf4j
public class ClaimEventConsumer {

    private final ProcessedEventRepository processedEventRepo;
    private final ClaimProjectionService projectionService;

    @KafkaListener(topics = "benefit.claim.events",
                   groupId = "claim-projection-consumer",
                   containerFactory = "manualAckContainerFactory")
    public void consume(ConsumerRecord<String, BenefitClaimEvent> record,
                        Acknowledgment ack) {

        BenefitClaimEvent event = record.value();
        String eventId = event.eventId();

        if (processedEventRepo.existsByEventId(eventId)) {
            log.debug("Duplicate event skipped: {}", eventId);
            ack.acknowledge();
            return;
        }

        try {
            projectionService.applyEvent(event);
            processedEventRepo.markProcessed(eventId, Instant.now());
            ack.acknowledge();
        } catch (Exception e) {
            log.error("Failed to process event {}: {}", eventId, e.getMessage());
            // Do not acknowledge — will be redelivered
        }
    }
}

The processed-events table needs an index on eventId and a TTL-based cleanup job — you don’t need to retain event IDs forever, just long enough to cover your maximum redelivery window. For most systems, 72 hours is sufficient.

This approach has a race condition under high concurrency — two threads may both find the event unprocessed before either marks it done. Solve this with a unique constraint on eventId in the database and treating the resulting DataIntegrityViolationException as a duplicate signal:

try {
    projectionService.applyEvent(event);
    processedEventRepo.markProcessed(eventId, Instant.now());
    ack.acknowledge();
} catch (DataIntegrityViolationException e) {
    // Concurrent duplicate — safe to acknowledge and move on
    log.debug("Concurrent duplicate resolved for event {}", eventId);
    ack.acknowledge();
} catch (Exception e) {
    log.error("Failed to process event {}: {}", eventId, e.getMessage());
}

Deduplication with Redis

For high-throughput consumers where database round-trips are too slow, Redis provides sub-millisecond deduplication. A Redis SET with a TTL is an efficient deduplication store:

@Component
@RequiredArgsConstructor
public class RedisDeduplicator {

    private final StringRedisTemplate redisTemplate;
    private static final Duration TTL = Duration.ofHours(72);

    /**
     * Returns true if this eventId is new (first time seen).
     * Returns false if it's a duplicate.
     */
    public boolean tryAcquire(String eventId) {
        Boolean set = redisTemplate.opsForValue()
            .setIfAbsent("dedup:" + eventId, "1", TTL);
        return Boolean.TRUE.equals(set);
    }
}

setIfAbsent maps to Redis SET NX EX — it’s atomic. If two threads call it simultaneously with the same key, exactly one will get true. The TTL means the key expires automatically — no cleanup job required.

@KafkaListener(topics = "mosaic.trade.events", groupId = "trade-enrichment-consumer")
public void consume(TradeEvent event, Acknowledgment ack) {
    if (!deduplicator.tryAcquire(event.eventId())) {
        log.debug("Duplicate trade event skipped: {}", event.eventId());
        ack.acknowledge();
        return;
    }
    enrichmentService.enrich(event);
    ack.acknowledge();
}

The trade-off: Redis is a cache, not a database. If Redis is unavailable or loses data (AOF is off, node restarts), your deduplication window has a gap. For financial transaction processing at Mosaic, I ran both — Redis for fast-path deduplication on the hot path, with the database as a durable fallback.

Making the Processing Logic Itself Idempotent

The cleanest approach is to make the side-effectful operation itself idempotent, so deduplication becomes a performance optimisation rather than a correctness requirement. Database upserts are the canonical example:

@Repository
public interface ClaimProjectionRepository extends JpaRepository<ClaimProjection, String> {

    @Modifying
    @Query("""
        INSERT INTO claim_projection (claim_id, status, updated_at, event_id)
        VALUES (:claimId, :status, :updatedAt, :eventId)
        ON CONFLICT (claim_id) DO UPDATE
            SET status     = EXCLUDED.status,
                updated_at = EXCLUDED.updated_at,
                event_id   = EXCLUDED.event_id
        WHERE claim_projection.updated_at < EXCLUDED.updated_at
        """)
    void upsert(@Param("claimId") String claimId,
                @Param("status") String status,
                @Param("updatedAt") Instant updatedAt,
                @Param("eventId") String eventId);
}

The WHERE updated_at < EXCLUDED.updated_at condition prevents an older, redelivered event from overwriting a more recent update — a common issue when events arrive out of order or when reprocessing historical data. This is optimistic idempotency: the event is always applied, but only takes effect if it’s newer than what’s already recorded.

The Outbox Pattern for Reliable Downstream Publishing

Idempotent consumers solve the inbound problem. The outbound problem — ensuring that downstream Kafka messages are published exactly once when your consumer triggers them — requires the Outbox Pattern.

The pattern: your consumer writes to a database outbox table in the same transaction as the domain state change. A separate relay process reads the outbox and publishes to Kafka. The relay uses the outbox record’s ID as an idempotency key to avoid duplicate publishes:

@Service
@RequiredArgsConstructor
@Transactional
public class ClaimProjectionService {

    private final ClaimProjectionRepository projectionRepo;
    private final OutboxRepository outboxRepo;

    public void applyEvent(BenefitClaimEvent event) {
        // Domain write and outbox write are atomic
        projectionRepo.upsert(event.claimId(), event.newStatus().name(),
                              event.occurredAt(), event.eventId());

        outboxRepo.save(new OutboxMessage(
            event.eventId() + ":downstream",
            "benefit.claims.updated",
            event.claimId(),
            buildDownstreamPayload(event)
        ));
    }
}

The relay reads from the outbox and marks records as dispatched after the Kafka send is acknowledged. Because the outbox record has a unique ID, the relay can safely retry without creating duplicate Kafka messages.

When to Use Kafka Transactions

Kafka’s transactional producer (covered in my exactly-once semantics post) provides atomic multi-topic writes within Kafka. For the consume-transform-produce pattern — read from one topic, write to another — transactions are the right tool:

@KafkaListener(topics = "raw.trade.events", containerFactory = "transactionalContainerFactory")
@Transactional("kafkaTransactionManager")
public void consume(TradeEvent raw, Acknowledgment ack) {
    EnrichedTradeEvent enriched = enrichmentService.enrich(raw);
    kafkaTemplate.send("enriched.trade.events", enriched.tradeId(), enriched);
    // Offset commit and produce are atomic
}

The critical caveat: Kafka transactions cover the Kafka-to-Kafka path only. If your consumer also writes to a database, that write is outside the transaction scope. The Outbox Pattern and idempotent database writes are still needed — transactions alone are not enough.

My default recommendation: start with idempotent consumers and manual offset commits. Add Redis deduplication if throughput demands it. Reach for Kafka transactions only when you need atomic multi-topic writes and can accept the additional latency overhead.

If you’re building event-driven systems where correctness under failure matters and want an engineer who has shipped these patterns in production, get in touch.