Hire Me
← All Writing Spring Boot

Real-Time Stream Processing with Kafka Streams

How to build real-time stream processing pipelines with Kafka Streams and Spring Boot — the Streams DSL, stateful operations, KTable joins, windowing, exactly-once processing, and production topology design.

Most teams reach for Kafka Streams when they realise their consumer-transform-produce pattern has outgrown what a plain @KafkaListener can cleanly express. Once you have stateful aggregations, joins across multiple topics, or windowed computations, you’re building a stream processor whether you call it that or not. Kafka Streams gives you the right primitives and handles the hard parts — state management, fault tolerance, partition assignment, and exactly-once semantics — so you can focus on the processing logic.

At Mosaic Smart Data we used Kafka Streams to enrich raw financial transaction events with reference data in real time, joining a high-volume event stream against a slowly-updating institution table with sub-second latency requirements. This post covers the patterns that made that work reliably.

What Kafka Streams Is

Kafka Streams is a client library — not a separate cluster or deployment. It runs inside your application, reads from Kafka topics, processes records, and writes results back to Kafka topics. There’s no separate infrastructure to operate beyond the Kafka cluster you already have.

The processing model is a topology of nodes connected by streams. Each node is either a source (reads from a topic), a processor (transforms records), or a sink (writes to a topic). You define the topology using either the Streams DSL (high-level, functional) or the Processor API (low-level, explicit). The DSL covers the vast majority of real-world use cases.

Spring Boot Setup

Add the dependency and configure the application:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
spring:
  kafka:
    streams:
      application-id: claims-enrichment-service
      bootstrap-servers: localhost:9092
      properties:
        default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
        default.value.serde: org.springframework.kafka.support.serializer.JsonSerde
        processing.guarantee: exactly_once_v2
        num.stream.threads: 4

The application-id is the Kafka consumer group ID for the Streams application. It must be unique per logical application and stable across restarts — Kafka Streams uses it to manage partition assignments and state store directories.

A Simple Enrichment Topology

@Configuration
@EnableKafkaStreams
public class ClaimsEnrichmentTopology {

    @Bean
    public KStream<String, ClaimEvent> claimsEnrichmentStream(StreamsBuilder builder) {

        // Source: raw claim events
        KStream<String, ClaimEvent> claimEvents = builder.stream(
            "claim.events.raw",
            Consumed.with(Serdes.String(), claimEventSerde()));

        // Filter out test events
        KStream<String, ClaimEvent> realEvents = claimEvents
            .filter((claimId, event) -> !event.isTestEvent());

        // Transform: add processing timestamp
        KStream<String, EnrichedClaimEvent> enriched = realEvents
            .mapValues(event -> EnrichedClaimEvent.from(event, Instant.now()));

        // Branch: route approved vs rejected to separate topics
        Map<String, KStream<String, EnrichedClaimEvent>> branches = enriched.split()
            .branch((id, e) -> e.getDecision() == Decision.APPROVED, Branched.as("approved"))
            .branch((id, e) -> e.getDecision() == Decision.REJECTED, Branched.as("rejected"))
            .defaultBranch(Branched.as("pending"));

        branches.get("approved").to("claim.events.approved");
        branches.get("rejected").to("claim.events.rejected");
        branches.get("pending").to("claim.events.pending");

        return claimEvents;
    }
}

The @EnableKafkaStreams annotation wires Spring Boot’s auto-configuration to manage the KafkaStreams instance lifecycle — starting it on application startup and stopping it cleanly on shutdown.

Stateful Aggregations with KGroupedStream

Stateless operations like filter and mapValues are straightforward. Where Kafka Streams earns its keep is stateful processing — counting, aggregating, and joining across time.

@Bean
public KTable<String, ClaimStats> claimStatsByRegion(StreamsBuilder builder) {

    return builder
        .stream("claim.events.approved", Consumed.with(Serdes.String(), claimEventSerde()))
        .selectKey((claimId, event) -> event.getRegion())  // rekey by region
        .groupByKey(Grouped.with(Serdes.String(), claimEventSerde()))
        .aggregate(
            ClaimStats::empty,                             // initialiser
            (region, event, stats) -> stats.add(event),   // aggregator
            Materialized.<String, ClaimStats, KeyValueStore<Bytes, byte[]>>as("claim-stats-store")
                .withKeySerde(Serdes.String())
                .withValueSerde(claimStatsSerde())
        );
}

The aggregation result is a KTable — a changelog stream where each record represents the latest value for a key. The state is backed by a local RocksDB store (claim-stats-store) and replicated to a Kafka changelog topic for fault tolerance. If the application restarts, Kafka Streams rebuilds the state from the changelog topic before resuming processing.

KTable Joins: Enriching Events with Reference Data

The pattern I used at Mosaic Smart Data — joining a high-volume event stream against reference data — is a classic KStream-KTable join:

@Bean
public KStream<String, EnrichedClaimEvent> enrichedClaimStream(StreamsBuilder builder) {

    // High-volume stream: claim events
    KStream<String, ClaimEvent> claimEvents = builder.stream(
        "claim.events.raw",
        Consumed.with(Serdes.String(), claimEventSerde()));

    // Slowly-updating table: claimant profiles
    KTable<String, ClaimantProfile> claimantProfiles = builder.table(
        "claimant.profiles",
        Consumed.with(Serdes.String(), claimantProfileSerde()),
        Materialized.as("claimant-profile-store"));

    // Join: enrich each event with the latest claimant profile
    return claimEvents.join(
        claimantProfiles,
        (event, profile) -> EnrichedClaimEvent.from(event, profile),
        Joined.with(Serdes.String(), claimEventSerde(), claimantProfileSerde())
    );
}

This is a non-windowed join — each claim event is enriched with whatever the claimant profile table holds at the moment the event arrives. When a profile update arrives on the claimant.profiles topic, the table updates and subsequent joins use the new data. No database lookups, no external service calls — the reference data lives in a local state store, co-located with the processing.

The critical requirement: the stream and the table must be co-partitioned — same number of partitions, same partitioning strategy. Kafka Streams enforces this and will throw at startup if they’re not.

Windowed Aggregations

For time-based analytics — event counts per minute, rolling averages, session-based groupings — Kafka Streams provides three window types:

Tumbling windows — fixed, non-overlapping time buckets:

KTable<Windowed<String>, Long> eventsPerMinute = claimEvents
    .groupByKey()
    .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(1)))
    .count(Materialized.as("events-per-minute-store"));

Hopping windows — fixed size, overlapping (a 5-minute window advancing every 1 minute):

KTable<Windowed<String>, Long> rollingCount = claimEvents
    .groupByKey()
    .windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofMinutes(5), Duration.ofSeconds(30))
        .advanceBy(Duration.ofMinutes(1)))
    .count();

Session windows — variable-length windows based on activity gaps:

KTable<Windowed<String>, Long> sessionCount = claimEvents
    .groupByKey()
    .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMinutes(30)))
    .count();

The grace period in windowed operations is important for out-of-order events. Without it, late-arriving events that fall within an already-closed window are silently dropped. Set it based on your expected event latency.

Interactive Queries: Reading State Stores

State stores aren’t just internal to the topology — you can query them directly from outside the Streams application. This lets you expose aggregated state as a REST API without a separate read database:

@RestController
@RequiredArgsConstructor
public class ClaimStatsController {

    private final KafkaStreams kafkaStreams;

    @GetMapping("/stats/{region}")
    public ClaimStats getStats(@PathVariable String region) {
        ReadOnlyKeyValueStore<String, ClaimStats> store = kafkaStreams
            .store(StoreQueryParameters.fromNameAndType(
                "claim-stats-store",
                QueryableStoreTypes.keyValueStore()));

        ClaimStats stats = store.get(region);
        return stats != null ? stats : ClaimStats.empty();
    }
}

In a multi-instance deployment, a key may be held in a state store on a different instance. Kafka Streams’ metadata API lets you locate the correct instance and proxy the request — a pattern worth implementing if you’re exposing interactive queries at scale.

Handling Deserialization Errors

By default, a deserialization failure throws an exception and can halt your stream processing. Configure a handler to skip corrupt records and send them to a dead-letter topic:

properties.put(
    StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
    LogAndContinueExceptionHandler.class);

LogAndContinueExceptionHandler logs the error and skips the record. For production use I prefer a custom handler that also routes the raw bytes to a dead-letter topic for later inspection:

public class DeadLetterDeserializationHandler implements DeserializationExceptionHandler {

    @Override
    public DeserializationHandlerResponse handle(ProcessorContext context,
                                                  ConsumerRecord<byte[], byte[]> record,
                                                  Exception exception) {
        log.error("Deserialization failed on topic={} partition={} offset={}: {}",
            record.topic(), record.partition(), record.offset(), exception.getMessage());
        // In production: forward raw bytes to dead-letter topic via a separate producer
        return DeserializationHandlerResponse.CONTINUE;
    }
}

Topology Visualisation

During development, printing the topology description is invaluable for understanding what you’ve built:

@Bean
public ApplicationRunner topologyPrinter(KafkaStreams kafkaStreams) {
    return args -> log.info("Topology:\n{}", kafkaStreams.toString());
}

The output is a text representation of every source, processor, state store, and sink in your topology — essential for debugging unexpected behaviour or verifying that joins are correctly co-partitioned.

Kafka Streams is one of those libraries where the first topology you write feels complex, and the tenth feels obvious. The primitives — stream, table, join, aggregate, window — map cleanly onto real business problems once you internalise them. If you’re at the point where your consumer logic is becoming hard to test or reason about, it’s usually a sign that Kafka Streams is the right next step.

If you’re building real-time processing pipelines on Kafka and want an engineer who’s designed and operated them in production, get in touch.