Available Hire Me
← All Writing Spring Boot

Kafka Schema Registry with Avro — Schema Evolution Without Breaking Changes

How to use Confluent Schema Registry with Avro serialisation in Spring Boot — schema registration, producer and consumer setup, and evolving schemas safely without breaking existing consumers.

Kafka topics are durable. Messages written today may be consumed weeks or months from now by consumers that don’t exist yet. In that environment, treating your message format as an internal implementation detail — something you can change freely with a redeploy — is a mistake that produces subtle, hard-to-diagnose consumer failures at the worst possible time.

Schema Registry with Avro solves this by making message schemas explicit, versioned, and enforceable. Producers register the schema they write with; consumers fetch it by ID and use it to deserialise. When a schema changes, compatibility rules determine whether it is safe. This post covers the full setup in Spring Boot, including the parts that documentation tends to gloss over.

What Schema Registry Actually Does

Without Schema Registry, a Kafka message is opaque bytes. The producer and consumer share knowledge of the format implicitly — usually by importing the same DTO class. When the DTO changes, the consumer must be updated and deployed simultaneously, or it will fail to deserialise older messages.

With Schema Registry:

  • Every Avro schema is registered under a subject (typically <topic-name>-value).
  • The producer serialises the message using the schema and prefixes the serialised bytes with a 5-byte magic header containing the schema ID.
  • The consumer reads the schema ID from the header, fetches the schema from the registry, and uses it to deserialise — regardless of which version the producer used.

This decouples producer and consumer deployments. You can add a new optional field to your schema, deploy the producer, and all existing consumers continue working without a redeploy.

Dependencies

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
    <groupId>io.confluent</groupId>
    <artifactId>kafka-avro-serializer</artifactId>
    <version>7.6.0</version>
</dependency>
<dependency>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro</artifactId>
    <version>1.11.3</version>
</dependency>

Confluent’s artifacts are not in Maven Central — add the Confluent repository:

<repository>
    <id>confluent</id>
    <url>https://packages.confluent.io/maven/</url>
</repository>

Also add the Avro Maven plugin to generate Java classes from .avsc schema files at compile time:

<plugin>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro-maven-plugin</artifactId>
    <version>1.11.3</version>
    <executions>
        <execution>
            <goals><goal>schema</goal></goals>
            <configuration>
                <sourceDirectory>src/main/avro</sourceDirectory>
                <outputDirectory>target/generated-sources/avro</outputDirectory>
            </configuration>
        </execution>
    </executions>
</plugin>

Defining the Schema

Create src/main/avro/OrderPlaced.avsc:

{
  "type": "record",
  "name": "OrderPlaced",
  "namespace": "uk.co.trinitylogic.trading.events",
  "fields": [
    { "name": "orderId",     "type": "string" },
    { "name": "marketId",    "type": "string" },
    { "name": "selectionId", "type": "long"   },
    { "name": "side",        "type": { "type": "enum", "name": "Side", "symbols": ["BACK", "LAY"] } },
    { "name": "price",       "type": "double" },
    { "name": "size",        "type": "double" },
    { "name": "placedAt",    "type": "long",   "logicalType": "timestamp-millis" }
  ]
}

Running mvn generate-sources produces OrderPlaced.java in target/generated-sources/avro. Treat it as generated code — never edit it directly.

Producer Configuration

@Configuration
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, OrderPlaced> producerFactory(
            @Value("${spring.kafka.bootstrap-servers}") String bootstrapServers,
            @Value("${spring.kafka.properties.schema.registry.url}") String schemaRegistryUrl) {

        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,    bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
            org.apache.kafka.common.serialization.StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            io.confluent.kafka.serializers.KafkaAvroSerializer.class);
        props.put("schema.registry.url",                      schemaRegistryUrl);
        props.put(ProducerConfig.ACKS_CONFIG,                 "all");
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,   true);

        return new DefaultKafkaProducerFactory<>(props);
    }

    @Bean
    public KafkaTemplate<String, OrderPlaced> kafkaTemplate(
            ProducerFactory<String, OrderPlaced> producerFactory) {
        return new KafkaTemplate<>(producerFactory);
    }
}

KafkaAvroSerializer handles schema registration automatically on first use — it contacts the registry, registers the schema if it doesn’t exist, and caches the returned schema ID. Subsequent sends for the same schema skip the registration step.

Consumer Configuration

@Configuration
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory<String, OrderPlaced> consumerFactory(
            @Value("${spring.kafka.bootstrap-servers}") String bootstrapServers,
            @Value("${spring.kafka.properties.schema.registry.url}") String schemaRegistryUrl) {

        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,     bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG,              "order-processor");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,     "earliest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            org.apache.kafka.common.serialization.StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
        props.put("schema.registry.url",                       schemaRegistryUrl);
        props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);

        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, OrderPlaced> kafkaListenerContainerFactory(
            ConsumerFactory<String, OrderPlaced> consumerFactory) {

        ConcurrentKafkaListenerContainerFactory<String, OrderPlaced> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        return factory;
    }
}

SPECIFIC_AVRO_READER_CONFIG set to true tells the deserialiser to produce the generated OrderPlaced class rather than a generic GenericRecord. Without this, you get a GenericRecord and have to access fields by name as strings — which loses type safety.

Sending and Receiving

@Service
public class OrderEventProducer {

    private final KafkaTemplate<String, OrderPlaced> kafkaTemplate;

    public void publish(OrderPlaced event) {
        kafkaTemplate.send("order-placed", event.getOrderId().toString(), event)
            .whenComplete((result, ex) -> {
                if (ex != null) {
                    log.error("Failed to publish order event {}", event.getOrderId(), ex);
                } else {
                    log.debug("Published order {} to partition {}",
                        event.getOrderId(),
                        result.getRecordMetadata().partition());
                }
            });
    }
}
@Component
public class OrderEventConsumer {

    @KafkaListener(topics = "order-placed", groupId = "order-processor")
    public void onOrderPlaced(OrderPlaced event) {
        log.info("Processing order {} for market {}", event.getOrderId(), event.getMarketId());
        // process the event
    }
}

Schema Evolution — What’s Safe and What Isn’t

Avro supports three compatibility modes, configured per subject in the registry:

BACKWARD (default): new schema can read data written by the previous schema. New consumers can read old messages. Safe changes:

  • Add an optional field with a default value.
  • Remove a field (old data just won’t have it).

FORWARD: old schema can read data written by the new schema. Old consumers can read new messages. Safe changes:

  • Add a field without a default (the old schema ignores it).

FULL: both backward and forward. Most restrictive, most interoperable.

Adding a field with a default is the workhorse of safe schema evolution:

{
  "name": "strategyId",
  "type": ["null", "string"],
  "default": null
}

This is a union type — null or string — with a default of null. Existing consumers that don’t know about this field will use the default; new consumers can read the field when it’s present. Backward and forward compatible.

What is not safe under BACKWARD compatibility:

  • Removing a field without a default.
  • Changing a field’s type (e.g. int to long).
  • Renaming a field without an alias.

Renaming a field requires using Avro aliases to maintain backward compatibility:

{
  "name": "exchangeOrderId",
  "aliases": ["orderId"],
  "type": "string"
}

Old messages written with orderId are mapped to exchangeOrderId by the alias. New messages use exchangeOrderId directly.

Testing with an Embedded Registry

For integration tests, use the Confluent test utilities to spin up an embedded Schema Registry alongside an embedded Kafka broker:

<dependency>
    <groupId>io.confluent</groupId>
    <artifactId>kafka-schema-registry</artifactId>
    <version>7.6.0</version>
    <scope>test</scope>
</dependency>

Or use Testcontainers with the Confluent Platform image, which bundles both Kafka and Schema Registry in a single container:

@Testcontainers
@SpringBootTest
class OrderEventIntegrationTest {

    @Container
    static final ConfluentSchemaRegistryContainer schemaRegistry =
        new ConfluentSchemaRegistryContainer("confluentinc/cp-schema-registry:7.6.0")
            .dependsOn(kafka);

    @DynamicPropertySource
    static void configureProperties(DynamicPropertyRegistry registry) {
        registry.add("spring.kafka.bootstrap-servers", kafka::getBootstrapServers);
        registry.add("spring.kafka.properties.schema.registry.url",
            schemaRegistry::getSchemaRegistryUrl);
    }
}

This gives you a real serialisation round-trip in tests — the producer registers the schema, the consumer fetches it, and you verify the full Avro encode/decode path.

ProTips

  • Set compatibility mode explicitly when creating a subject. Don’t rely on the global default — a global change breaks every subject at once.
  • Register schemas in CI before deployment. Use the Schema Registry Maven plugin or the REST API to pre-register schemas and validate compatibility in your pipeline. Catch breaking changes before they reach production.
  • Version your subject names for major breaking changes. If you must make a breaking change, create a new subject (order-placed-v2) rather than breaking the existing one. Run old and new consumers in parallel during the cutover window.
  • Don’t use auto.register.schemas=true in production. It means any producer can register any schema, including accidentally breaking ones. Register schemas explicitly and set auto.register.schemas=false on producers.

If you’re building event-driven systems with Kafka and want to get schema governance right from the start, get in touch.

Samuel Jackson

Samuel Jackson

Senior Java Back End Developer & Contractor

Senior Java Back End Developer — Betfair Exchange API specialist, Spring Boot, AWS, and event-driven architecture. 20+ years delivering high-performance systems across betting, finance, energy, retail, and government. Available for Java contracting.