Kafka and the Avro serialisation system

Matthias Kausl
willhaben Tech Blog
5 min readNov 9, 2020

--

Tanner Boriackhttps://unsplash.com/photos/jkuR9QteDGY

At willhaben Apache Kafka is a core components of our architecture, used for creating topics of events from different domains. It is used, for example, for near real-time synchronisation of our classified ad events.

Kafka does not impose any restrictions on what data format these events can have. Events contain a payload of a raw bytes. It is up to the consumers of the Kafka topic to decode the event into meaningful data (e.g. strings, json).

One popular choice of data serialisation format for Kafka is Apache Avro, which is also the data format we use in our Kafka topics at willhaben. We also extensively use Spring Boot that provides an easy way of consuming Kafka topics. Confluent provides a library for encoding and decoding of Avro encoded Kafka events that works with Spring boot. Together, producing and consuming Kafka topics with Avro encoded messages just works magically, but it is worth looking under the hood to see how it works.

Avro data serialisation system

Avro uses schema files to define a data format. I won’t go into the details of Avro here, but here is a simple example of a schema from the GitHub repository. It defines a data format called “SomeRecord” which consists only one field called name which is of type string.

With this schema file, the Avro maven plugin can generate a Java class SomeRecord.java which represents this data format. With this class, we can serialise and deserialise data into the Avro binary encoding format and, for example, send it on a Kafka topic.

Avro’s binary encoding format does not include any schema information. Avro’s documentation notes data binary representation should always include the writers schema with the serialised data.

Binary encoding does not include field names, self-contained information about the types of individual bytes, nor field or record separators. Therefore readers are wholly reliant on the schema used when the data was encoded.

For using Avro with Kafka this means that for every event we have to include either the schema itself with every message (which would increase message size), or a reference to the schema.

If we later want to add another field to the schema, we usually want to stay compatible with applications that only have an older version of the schema. In our example we add a field “data” that can either be a string or null (that is, the field data is optional).

In an architecture that uses Kafka, schemas of the events have to evolve and change over time. How do consumers get the “writers schema” of a message on a Kafka topic when the application itself does only include an older version?

Schema registry

Applications that use Confluent’s Avro serialisation library for Kafka also have to use Confluent’s schema registry. It is a separate process that provides a REST API to store and retrieve schemas and also check for different kinds of compatibility types. The schema registry persists the schemas into a special Kafka topic called ‘_schemas’ which contains one Avro schema file per message-type and version.

How does a Kafka consumer know which schema to retrieve from the registry when receiving a message? Confluent’s documentation specifies a wire format, which includes an ID which can be used to retrieve the Avro schema file from the schema registries REST API.

When using Confluent’s schema registry, the following steps have to be taken in order to write and later read an event form a Kafka topic in Avro binary encoded format:

  1. The publisher (event writer) has to call the schema registry’s REST API to upload an Avro schema file (once per schema version)
  2. The schema registry checks for compatibility and returns an ID on success or an error if for example the schema is not compatible with earlier versions
  3. The publisher encodes the data according to Confluent’s wire format (including schema ID and Avro binary encoding of the data)
  4. The consumer reads the raw data from the Kafka topic
  5. The consumer calls the schema registry’s REST API to download the Avro schema file with the ID provided in the first few bytes of the data
  6. The consumer decodes the message using the downloaded schema

Confluents kafka-avro-serializer library implements this behavior.

Schema compatibility

One major advantage of using the schema registry is its ability to check for compatibility between the old and the new version of the schema. Confluents schema registry has several compatibility types: Forward, Backward and “Full” compatibility.

Schema V2 is forward compatible to schema V1 if data serialised with schema V2 can be deserialised with schema V1. Here the producer has to be upgraded first to the new schema and then the consumer. The schema V2 can have new fields and removed optional fields.

Schema V2 is backward compatible to schema V1 if data serialised with schema V1 can be deserialised with schema V2. Here the consumer has to be upgraded first. The schema V2 can have new optional fields and deleted old fields.

Schema V2 is full compatible to schema V1 if data serialised with schema V1 can be deserialised with schema V2 and vice versa. Here the upgrade can happen in any order. The schema V2 can only have new optional fields and deleted optional fields.

Schema compatibility is important for Kafka based applications, because if the consumer cannot deserialise data, events might be skipped. The compatibility check happens at runtime when the producer tries to publish a new schema version before sending any message. If the compatibility check fails, no incompatible messages were sent on the Kafka topic.

Caveats

The biggest drawback of this system is in my opinion that both producer and consumer depend on not only Kafka itself but also the schema registry to be available at runtime.

If the consumer tries to read an Avro encoded message from a Kafka topic, but cannot reach the schema registries REST API to download the required version of the schema, an Exception is thrown and, in the worst case, the message gets discarded.

This is true even for the case that consumer or producer application have a local copy of the current schema file available. A common misconception at willhaben was that, if the consumer has a local copy of the current Avro schema, it uses this local schema file to decode the message. This is not supported by Confluents kafka-avro-serializer library at this time. The local Avro schema file is only be used to generate Java classes for a schema (like described before). These classes can be used to handle the consumed data, but are not used for serialisation or deserialisation.

You should also be aware that the schema registry depends on the _schema Kafka topic to persists its data.

This topic (_schema) is a common source of truth for schema IDs, and you should back it up.

If this topic gets lost somehow, there is no way to restore the IDs generated when publishing schema files to the registries REST API and therefore the messages on the Kafka topics referencing these schemas cannot be decoded.

Conclusion

Producers and consumers of Confluents kafka-avro-serializer library have a runtime dependency on the schema registry. Backup of the _schema topic is crucial.

--

--