Hire Me
← All Writing Testing

Testcontainers for Kafka — Real Broker Integration Tests

How to write real Kafka integration tests in Spring Boot using Testcontainers — spinning up a real broker, testing producers and consumers end-to-end, verifying exactly-once behaviour, and structuring tests that are fast and reliable.

Testing Kafka consumers and producers with mocks is a trap. You end up testing that your mock Kafka behaves the way you told it to, not that your code behaves correctly against a real broker. The serialisation configuration, consumer group coordination, offset management, error handler behaviour, dead-letter routing — none of that is exercised by a mock. Testcontainers solves this by giving you a real Kafka broker in your test suite, with a lifecycle managed automatically by JUnit.

At DWP Digital, all Kafka consumer and producer tests run against real brokers via Testcontainers. When we catch a bug in a consumer retry configuration or a DLT routing rule, it’s caught by a test, not in production.

Dependencies

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-testcontainers</artifactId>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.testcontainers</groupId>
    <artifactId>kafka</artifactId>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.awaitility</groupId>
    <artifactId>awaitility</artifactId>
    <scope>test</scope>
</dependency>

Basic Setup: One Broker for All Tests

Spinning up a Kafka container per test class is slow. Use a shared static container and Spring’s @DynamicPropertySource to wire the broker URL into the application context:

@SpringBootTest
@Testcontainers
class KafkaIntegrationTestBase {

    @Container
    static final KafkaContainer kafka =
        new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.6.0"))
            .withReuse(true);  // reuse container across test runs in dev

    @DynamicPropertySource
    static void kafkaProperties(DynamicPropertyRegistry registry) {
        registry.add("spring.kafka.bootstrap-servers", kafka::getBootstrapServers);
    }
}

withReuse(true) enables Testcontainers’ container reuse feature — the container stays running between test runs in your local development environment, cutting startup time from ~10 seconds to near-zero after the first run. In CI, containers always start fresh.

Testing a Producer

@SpringBootTest
class ClaimEventProducerTest extends KafkaIntegrationTestBase {

    @Autowired
    private ClaimEventProducer producer;

    @Autowired
    private EmbeddedKafkaBroker broker; // not needed — we use the real broker

    private KafkaConsumer<String, String> testConsumer;

    @BeforeEach
    void setup() {
        testConsumer = new KafkaConsumer<>(Map.of(
            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers(),
            ConsumerConfig.GROUP_ID_CONFIG,          "test-consumer-" + UUID.randomUUID(),
            ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest",
            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,   StringDeserializer.class.getName(),
            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()
        ));
        testConsumer.subscribe(List.of("claim.events"));
    }

    @AfterEach
    void teardown() {
        testConsumer.close();
    }

    @Test
    void publishesClaimCreatedEvent_withCorrectKeyAndPayload() throws Exception {
        ClaimCreatedEvent event = new ClaimCreatedEvent("claim-001", "claimant-42", Instant.now());

        producer.publish(event);

        await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> {
            ConsumerRecords<String, String> records = testConsumer.poll(Duration.ofMillis(500));
            assertThat(records).isNotEmpty();

            ConsumerRecord<String, String> record = records.iterator().next();
            assertThat(record.key()).isEqualTo("claim-001");

            ClaimCreatedEvent received = objectMapper.readValue(record.value(), ClaimCreatedEvent.class);
            assertThat(received.claimId()).isEqualTo("claim-001");
            assertThat(received.claimantId()).isEqualTo("claimant-42");
        });
    }
}

The await() from Awaitility handles the inherent asynchrony — Kafka delivery isn’t instant and you shouldn’t use Thread.sleep(). Awaitility retries the assertion until it passes or the timeout expires.

Testing a Consumer End-to-End

Consumer tests verify that a message published to a topic results in the expected side effect in your application — a database write, a downstream event, a state change:

@SpringBootTest
class ClaimDecisionConsumerTest extends KafkaIntegrationTestBase {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    private ClaimProjectionRepository projectionRepository;

    @Autowired
    private ObjectMapper objectMapper;

    @Test
    void consumesDecisionEvent_andUpdatesProjection() throws Exception {
        String claimId = "claim-" + UUID.randomUUID();
        ClaimDecisionEvent event = new ClaimDecisionEvent(
            claimId, Decision.APPROVED, Instant.now());

        kafkaTemplate.send("claim.decisions", claimId,
            objectMapper.writeValueAsString(event)).get(5, TimeUnit.SECONDS);

        await().atMost(Duration.ofSeconds(15)).untilAsserted(() -> {
            Optional<ClaimProjection> projection = projectionRepository.findById(claimId);
            assertThat(projection).isPresent();
            assertThat(projection.get().getDecision()).isEqualTo(Decision.APPROVED);
        });
    }
}

This test exercises the complete consumer path: Kafka delivery, deserialisation, Spring @KafkaListener dispatch, business logic, and the database write. A mock would verify none of that.

Testing Dead-Letter Topic Routing

One of the most important things to test is what happens when a consumer fails. Does your error handler retry correctly? Does a poison pill message end up in the DLT rather than blocking the partition?

@Test
void poisonPillMessage_routedToDeadLetterTopic() throws Exception {
    // Subscribe to the DLT before publishing the poison pill
    KafkaConsumer<String, String> dltConsumer = buildTestConsumer("claim.decisions.DLT");
    dltConsumer.subscribe(List.of("claim.decisions.DLT"));

    // Publish an unparseable message
    kafkaTemplate.send("claim.decisions", "bad-key", "this is not valid JSON @@@")
        .get(5, TimeUnit.SECONDS);

    await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> {
        ConsumerRecords<String, String> dltRecords = dltConsumer.poll(Duration.ofMillis(500));
        assertThat(dltRecords).isNotEmpty();

        ConsumerRecord<String, String> dltRecord = dltRecords.iterator().next();
        assertThat(dltRecord.value()).contains("this is not valid JSON @@@");
    });

    dltConsumer.close();
}

This test verifies the complete error handling path — including retry exhaustion and DLT routing — against a real broker with real consumer group coordination. You cannot test this with mocks.

Testing Consumer Idempotency

If your consumer is supposed to be idempotent — processing the same message twice produces the same result as processing it once — test it explicitly:

@Test
void duplicateMessage_processedIdempotently() throws Exception {
    String claimId = "claim-" + UUID.randomUUID();
    String eventId = UUID.randomUUID().toString();

    ClaimDecisionEvent event = new ClaimDecisionEvent(claimId, Decision.APPROVED, Instant.now());
    String payload = objectMapper.writeValueAsString(event);

    // Publish the same event twice with the same event ID header
    ProducerRecord<String, String> record1 = new ProducerRecord<>("claim.decisions", claimId, payload);
    record1.headers().add("eventId", eventId.getBytes());

    ProducerRecord<String, String> record2 = new ProducerRecord<>("claim.decisions", claimId, payload);
    record2.headers().add("eventId", eventId.getBytes());

    kafkaTemplate.send(record1).get(5, TimeUnit.SECONDS);
    kafkaTemplate.send(record2).get(5, TimeUnit.SECONDS);

    await().atMost(Duration.ofSeconds(15)).untilAsserted(() -> {
        Optional<ClaimProjection> projection = projectionRepository.findById(claimId);
        assertThat(projection).isPresent();
    });

    // Exactly one projection should exist despite two deliveries
    assertThat(projectionRepository.findAllByClaimId(claimId)).hasSize(1);
}

Structuring Tests for Speed

Testcontainers Kafka tests are slower than unit tests — plan for 5–15 seconds per test class (most of which is Spring context startup, not Kafka). Structure tests to minimise context restarts:

@BeforeEach
void cleanDatabase() {
    projectionRepository.deleteAll();
    processedEventRepository.deleteAll();
}

The investment in real Kafka integration tests pays back quickly. The class of bugs they catch — message ordering assumptions, partition assignment edge cases, retry loop behaviour, DLT routing misconfiguration — are exactly the bugs that are expensive and embarrassing to find in production.

If you’re building event-driven systems and want an engineer who tests them properly, get in touch.