How to use Confluent Schema Registry with Avro serialisation in Spring Boot — schema registration, producer and consumer setup, and evolving schemas safely without breaking existing consumers.
Kafka topics are durable. Messages written today may be consumed weeks or months from now by consumers that don’t exist yet. In that environment, treating your message format as an internal implementation detail — something you can change freely with a redeploy — is a mistake that produces subtle, hard-to-diagnose consumer failures at the worst possible time.
Schema Registry with Avro solves this by making message schemas explicit, versioned, and enforceable. Producers register the schema they write with; consumers fetch it by ID and use it to deserialise. When a schema changes, compatibility rules determine whether it is safe. This post covers the full setup in Spring Boot, including the parts that documentation tends to gloss over.
Without Schema Registry, a Kafka message is opaque bytes. The producer and consumer share knowledge of the format implicitly — usually by importing the same DTO class. When the DTO changes, the consumer must be updated and deployed simultaneously, or it will fail to deserialise older messages.
With Schema Registry:
<topic-name>-value).This decouples producer and consumer deployments. You can add a new optional field to your schema, deploy the producer, and all existing consumers continue working without a redeploy.
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>7.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.11.3</version>
</dependency>
Confluent’s artifacts are not in Maven Central — add the Confluent repository:
<repository>
<id>confluent</id>
<url>https://packages.confluent.io/maven/</url>
</repository>
Also add the Avro Maven plugin to generate Java classes from .avsc schema files at compile time:
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.11.3</version>
<executions>
<execution>
<goals><goal>schema</goal></goals>
<configuration>
<sourceDirectory>src/main/avro</sourceDirectory>
<outputDirectory>target/generated-sources/avro</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
Create src/main/avro/OrderPlaced.avsc:
{
"type": "record",
"name": "OrderPlaced",
"namespace": "uk.co.trinitylogic.trading.events",
"fields": [
{ "name": "orderId", "type": "string" },
{ "name": "marketId", "type": "string" },
{ "name": "selectionId", "type": "long" },
{ "name": "side", "type": { "type": "enum", "name": "Side", "symbols": ["BACK", "LAY"] } },
{ "name": "price", "type": "double" },
{ "name": "size", "type": "double" },
{ "name": "placedAt", "type": "long", "logicalType": "timestamp-millis" }
]
}
Running mvn generate-sources produces OrderPlaced.java in target/generated-sources/avro. Treat it as generated code — never edit it directly.
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, OrderPlaced> producerFactory(
@Value("${spring.kafka.bootstrap-servers}") String bootstrapServers,
@Value("${spring.kafka.properties.schema.registry.url}") String schemaRegistryUrl) {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
org.apache.kafka.common.serialization.StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put("schema.registry.url", schemaRegistryUrl);
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
return new DefaultKafkaProducerFactory<>(props);
}
@Bean
public KafkaTemplate<String, OrderPlaced> kafkaTemplate(
ProducerFactory<String, OrderPlaced> producerFactory) {
return new KafkaTemplate<>(producerFactory);
}
}
KafkaAvroSerializer handles schema registration automatically on first use — it contacts the registry, registers the schema if it doesn’t exist, and caches the returned schema ID. Subsequent sends for the same schema skip the registration step.
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, OrderPlaced> consumerFactory(
@Value("${spring.kafka.bootstrap-servers}") String bootstrapServers,
@Value("${spring.kafka.properties.schema.registry.url}") String schemaRegistryUrl) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-processor");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
org.apache.kafka.common.serialization.StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
props.put("schema.registry.url", schemaRegistryUrl);
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, OrderPlaced> kafkaListenerContainerFactory(
ConsumerFactory<String, OrderPlaced> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, OrderPlaced> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
return factory;
}
}
SPECIFIC_AVRO_READER_CONFIG set to true tells the deserialiser to produce the generated OrderPlaced class rather than a generic GenericRecord. Without this, you get a GenericRecord and have to access fields by name as strings — which loses type safety.
@Service
public class OrderEventProducer {
private final KafkaTemplate<String, OrderPlaced> kafkaTemplate;
public void publish(OrderPlaced event) {
kafkaTemplate.send("order-placed", event.getOrderId().toString(), event)
.whenComplete((result, ex) -> {
if (ex != null) {
log.error("Failed to publish order event {}", event.getOrderId(), ex);
} else {
log.debug("Published order {} to partition {}",
event.getOrderId(),
result.getRecordMetadata().partition());
}
});
}
}
@Component
public class OrderEventConsumer {
@KafkaListener(topics = "order-placed", groupId = "order-processor")
public void onOrderPlaced(OrderPlaced event) {
log.info("Processing order {} for market {}", event.getOrderId(), event.getMarketId());
// process the event
}
}
Avro supports three compatibility modes, configured per subject in the registry:
BACKWARD (default): new schema can read data written by the previous schema. New consumers can read old messages. Safe changes:
FORWARD: old schema can read data written by the new schema. Old consumers can read new messages. Safe changes:
FULL: both backward and forward. Most restrictive, most interoperable.
Adding a field with a default is the workhorse of safe schema evolution:
{
"name": "strategyId",
"type": ["null", "string"],
"default": null
}
This is a union type — null or string — with a default of null. Existing consumers that don’t know about this field will use the default; new consumers can read the field when it’s present. Backward and forward compatible.
What is not safe under BACKWARD compatibility:
int to long).Renaming a field requires using Avro aliases to maintain backward compatibility:
{
"name": "exchangeOrderId",
"aliases": ["orderId"],
"type": "string"
}
Old messages written with orderId are mapped to exchangeOrderId by the alias. New messages use exchangeOrderId directly.
For integration tests, use the Confluent test utilities to spin up an embedded Schema Registry alongside an embedded Kafka broker:
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry</artifactId>
<version>7.6.0</version>
<scope>test</scope>
</dependency>
Or use Testcontainers with the Confluent Platform image, which bundles both Kafka and Schema Registry in a single container:
@Testcontainers
@SpringBootTest
class OrderEventIntegrationTest {
@Container
static final ConfluentSchemaRegistryContainer schemaRegistry =
new ConfluentSchemaRegistryContainer("confluentinc/cp-schema-registry:7.6.0")
.dependsOn(kafka);
@DynamicPropertySource
static void configureProperties(DynamicPropertyRegistry registry) {
registry.add("spring.kafka.bootstrap-servers", kafka::getBootstrapServers);
registry.add("spring.kafka.properties.schema.registry.url",
schemaRegistry::getSchemaRegistryUrl);
}
}
This gives you a real serialisation round-trip in tests — the producer registers the schema, the consumer fetches it, and you verify the full Avro encode/decode path.
order-placed-v2) rather than breaking the existing one. Run old and new consumers in parallel during the cutover window.auto.register.schemas=true in production. It means any producer can register any schema, including accidentally breaking ones. Register schemas explicitly and set auto.register.schemas=false on producers.If you’re building event-driven systems with Kafka and want to get schema governance right from the start, get in touch.