How to write real Kafka integration tests in Spring Boot using Testcontainers — spinning up a real broker, testing producers and consumers end-to-end, verifying exactly-once behaviour, and structuring tests that are fast and reliable.
Testing Kafka consumers and producers with mocks is a trap. You end up testing that your mock Kafka behaves the way you told it to, not that your code behaves correctly against a real broker. The serialisation configuration, consumer group coordination, offset management, error handler behaviour, dead-letter routing — none of that is exercised by a mock. Testcontainers solves this by giving you a real Kafka broker in your test suite, with a lifecycle managed automatically by JUnit.
At DWP Digital, all Kafka consumer and producer tests run against real brokers via Testcontainers. When we catch a bug in a consumer retry configuration or a DLT routing rule, it’s caught by a test, not in production.
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-testcontainers</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
Spinning up a Kafka container per test class is slow. Use a shared static container and Spring’s @DynamicPropertySource to wire the broker URL into the application context:
@SpringBootTest
@Testcontainers
class KafkaIntegrationTestBase {
@Container
static final KafkaContainer kafka =
new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.6.0"))
.withReuse(true); // reuse container across test runs in dev
@DynamicPropertySource
static void kafkaProperties(DynamicPropertyRegistry registry) {
registry.add("spring.kafka.bootstrap-servers", kafka::getBootstrapServers);
}
}
withReuse(true) enables Testcontainers’ container reuse feature — the container stays running between test runs in your local development environment, cutting startup time from ~10 seconds to near-zero after the first run. In CI, containers always start fresh.
@SpringBootTest
class ClaimEventProducerTest extends KafkaIntegrationTestBase {
@Autowired
private ClaimEventProducer producer;
@Autowired
private EmbeddedKafkaBroker broker; // not needed — we use the real broker
private KafkaConsumer<String, String> testConsumer;
@BeforeEach
void setup() {
testConsumer = new KafkaConsumer<>(Map.of(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers(),
ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-" + UUID.randomUUID(),
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName(),
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()
));
testConsumer.subscribe(List.of("claim.events"));
}
@AfterEach
void teardown() {
testConsumer.close();
}
@Test
void publishesClaimCreatedEvent_withCorrectKeyAndPayload() throws Exception {
ClaimCreatedEvent event = new ClaimCreatedEvent("claim-001", "claimant-42", Instant.now());
producer.publish(event);
await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> {
ConsumerRecords<String, String> records = testConsumer.poll(Duration.ofMillis(500));
assertThat(records).isNotEmpty();
ConsumerRecord<String, String> record = records.iterator().next();
assertThat(record.key()).isEqualTo("claim-001");
ClaimCreatedEvent received = objectMapper.readValue(record.value(), ClaimCreatedEvent.class);
assertThat(received.claimId()).isEqualTo("claim-001");
assertThat(received.claimantId()).isEqualTo("claimant-42");
});
}
}
The await() from Awaitility handles the inherent asynchrony — Kafka delivery isn’t instant and you shouldn’t use Thread.sleep(). Awaitility retries the assertion until it passes or the timeout expires.
Consumer tests verify that a message published to a topic results in the expected side effect in your application — a database write, a downstream event, a state change:
@SpringBootTest
class ClaimDecisionConsumerTest extends KafkaIntegrationTestBase {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
private ClaimProjectionRepository projectionRepository;
@Autowired
private ObjectMapper objectMapper;
@Test
void consumesDecisionEvent_andUpdatesProjection() throws Exception {
String claimId = "claim-" + UUID.randomUUID();
ClaimDecisionEvent event = new ClaimDecisionEvent(
claimId, Decision.APPROVED, Instant.now());
kafkaTemplate.send("claim.decisions", claimId,
objectMapper.writeValueAsString(event)).get(5, TimeUnit.SECONDS);
await().atMost(Duration.ofSeconds(15)).untilAsserted(() -> {
Optional<ClaimProjection> projection = projectionRepository.findById(claimId);
assertThat(projection).isPresent();
assertThat(projection.get().getDecision()).isEqualTo(Decision.APPROVED);
});
}
}
This test exercises the complete consumer path: Kafka delivery, deserialisation, Spring @KafkaListener dispatch, business logic, and the database write. A mock would verify none of that.
One of the most important things to test is what happens when a consumer fails. Does your error handler retry correctly? Does a poison pill message end up in the DLT rather than blocking the partition?
@Test
void poisonPillMessage_routedToDeadLetterTopic() throws Exception {
// Subscribe to the DLT before publishing the poison pill
KafkaConsumer<String, String> dltConsumer = buildTestConsumer("claim.decisions.DLT");
dltConsumer.subscribe(List.of("claim.decisions.DLT"));
// Publish an unparseable message
kafkaTemplate.send("claim.decisions", "bad-key", "this is not valid JSON @@@")
.get(5, TimeUnit.SECONDS);
await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> {
ConsumerRecords<String, String> dltRecords = dltConsumer.poll(Duration.ofMillis(500));
assertThat(dltRecords).isNotEmpty();
ConsumerRecord<String, String> dltRecord = dltRecords.iterator().next();
assertThat(dltRecord.value()).contains("this is not valid JSON @@@");
});
dltConsumer.close();
}
This test verifies the complete error handling path — including retry exhaustion and DLT routing — against a real broker with real consumer group coordination. You cannot test this with mocks.
If your consumer is supposed to be idempotent — processing the same message twice produces the same result as processing it once — test it explicitly:
@Test
void duplicateMessage_processedIdempotently() throws Exception {
String claimId = "claim-" + UUID.randomUUID();
String eventId = UUID.randomUUID().toString();
ClaimDecisionEvent event = new ClaimDecisionEvent(claimId, Decision.APPROVED, Instant.now());
String payload = objectMapper.writeValueAsString(event);
// Publish the same event twice with the same event ID header
ProducerRecord<String, String> record1 = new ProducerRecord<>("claim.decisions", claimId, payload);
record1.headers().add("eventId", eventId.getBytes());
ProducerRecord<String, String> record2 = new ProducerRecord<>("claim.decisions", claimId, payload);
record2.headers().add("eventId", eventId.getBytes());
kafkaTemplate.send(record1).get(5, TimeUnit.SECONDS);
kafkaTemplate.send(record2).get(5, TimeUnit.SECONDS);
await().atMost(Duration.ofSeconds(15)).untilAsserted(() -> {
Optional<ClaimProjection> projection = projectionRepository.findById(claimId);
assertThat(projection).isPresent();
});
// Exactly one projection should exist despite two deliveries
assertThat(projectionRepository.findAllByClaimId(claimId)).hasSize(1);
}
Testcontainers Kafka tests are slower than unit tests — plan for 5–15 seconds per test class (most of which is Spring context startup, not Kafka). Structure tests to minimise context restarts:
@SpringBootTest on a base class@TestPropertySource or @DynamicPropertySource on the base class so all subclasses share the same container and context@BeforeEach rather than @AfterEach — so state is clean before each test regardless of whether the previous test passed@BeforeEach
void cleanDatabase() {
projectionRepository.deleteAll();
processedEventRepository.deleteAll();
}
The investment in real Kafka integration tests pays back quickly. The class of bugs they catch — message ordering assumptions, partition assignment edge cases, retry loop behaviour, DLT routing misconfiguration — are exactly the bugs that are expensive and embarrassing to find in production.
If you’re building event-driven systems and want an engineer who tests them properly, get in touch.