The Schema Registry is a service that is used to store the schemas of the messages that are sent to the Kafka cluster. It is used by the producers and consumers that use the Avro, JSON, or Protobuf serializers. It is not required by the producers and consumers, but it is highly recommended to use it because it provides type safety and compatibility between different consumers and producers (or applications in general). It is usually deployed as a Docker container and usually on the same machine as the Kafka cluster. It is also usually deployed with the REST Proxy.

The Schema Registry has a REST API that can be used to manage the schemas. The most important endpoints are:

  • POST /subjects/{subject}/versions - used to register a new schema for a subject
  • GET /subjects/{subject}/versions/{version} - used to get a specific version of a schema for a subject
  • GET /subjects/{subject}/versions - used to get all the versions of a schema for a subject
  • GET /subjects/{subject}/versions/{version}/schema - used to get the schema of a specific version of a schema for a subject

In it simplest form, the schema is just a JSON object that describes the structure of the message. For example, the schema for a message that has a name and an age field that are both strings would be:

{
  "type": "record",
  "name": "Person",
  "namespace": "com.example",
  "fields": [
    {
      "name": "name",
      "type": "string"
    },
    {
      "name": "age",
      "type": "string"
    }
  ]
}

The schema is usually stored in a file with the .avsc extension.

There are also tools to generate a metaclass from the schema that can be used to serialize and deserialize the messages in the language of your choice. For example, for the schema above, the metaclass in Java would be:

package com.example;
 
import org.apache.avro.Schema;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.avro.specific.SpecificData;
 
 
public class Person extends SpecificRecordBase {
  public static final Schema SCHEMA$ = new Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Person\",\"namespace\":\"com.example\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"age\",\"type\":\"string\"}]}");
 
  public static Schema getClassSchema() {
    return SCHEMA$;
  }
 
  private String name;
  private String age;
 
  public Person() {
  }
 
  public Person(String name, String age) {
    this.name = name;
    this.age = age;
  }
 
  @Override
  public Schema getSchema() {
    return SCHEMA$;
  }
 
  @Override
  public Object get(int field$) {
    switch (field$) {
      case 0:
        return name;
      case 1:
        return age;
      default:
        throw new org.apache.avro.AvroRuntimeException("Bad index");
    }
  }
 
  @Override
  public void put(int field$, Object value$) {
    switch (field$) {
      case 0:
        name = value$ != null ? value$.toString() : null;
        break;
      case 1:
        age = value$ != null ? value$.toString() : null;
        break;
      default:
        throw new org.apache.avro.AvroRuntimeException("Bad index");
    }
  }
 
  public String getName() {
    return name;
  }
 
  public void setName(String value) {
    this.name = value;
  }
 
  public String getAge() {
    return age;
  }
 
  public void setAge(String value) {
    this.age = value;
  }
}

and the corresponding metaclass in Python would be:

from typing import Dict, Any, Union, TypeVar, Type, cast
import typing
import json
import avro.schema
import avro.io
 
 
class Person(object):
  """
  Autogenerated by Avro
  """
  _AVRO_SCHEMA = avro.schema.Parse(json.dumps({
      "type": "record",
      "name": "Person",
      "namespace": "com.example",
      "fields": [
          {
              "name": "name",
              "type": "string"
          },
          {
              "name": "age",
              "type": "string"
          }
      ]
  }))
  _AVRO_CODEC = avro.io.BinaryDecoder
 
  def __init__(self, name: str=None, age: str=None) -> None:
      self.name = name
      self.age = age
 
  def __repr__(self) -> str:
      return str(self.to_dict())
 
  def to_dict(self) -> Dict[str, Any]:
      return {
          "name": self.name,
          "age": self.age
      }
 
  @staticmethod
  def from_dict(d: Dict[str, Any]) -> 'Person':
      return Person(
          name=d.get('name', None),
          age=d.get('age', None)
      )
 
  @staticmethod
  def deserialize(data: Union[bytes, bytearray, memoryview], codec: Type[_AVRO_CODEC]=_AVRO_CODEC) -> 'Person':
      reader = avro.io.DatumReader(Person._AVRO_SCHEMA)
      decoder = codec(reader, reader)
      return cast(Person, reader.read(Person._AVRO_SCHEMA, decoder))
 
  def serialize(self, codec: Type[_AVRO_CODEC]=_AVRO_CODEC) -> bytes:
      writer = avro.io.DatumWriter(Person._AVRO_SCHEMA)
      buffer = io.BytesIO()
      encoder = codec(writer)
      writer.write(self.to_dict(), encoder)
      return buffer.getvalue()

These metaclasses allow you to initialize and access the fields of the message in a type-safe way. They can also be used to easily serialize and deserialize the messages. For example in the Java example above, the message can be serialized like this:

import com.example.Person;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
 
import java.util.Properties;
import java.util.concurrent.ExecutionException;
 
 
public class ProducerDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // Create Producer Properties
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("key.serializer", org.apache.kafka.common.serialization.StringSerializer.class);
        properties.setProperty("value.serializer", io.confluent.kafka.serializers.KafkaAvroSerializer.class);
        properties.setProperty("schema.registry.url", "http://localhost:8081"); // Note that we now need to specify the address of the Schema Registry
        properties.setProperty("topic.name", "people")
 
        // Create the Producer
        KafkaProducer<String, Person> producer = new KafkaProducer<>(properties);
 
        // Create a Producer Record
        Person person = new Person("John", "25");
        ProducerRecord<String, Person> record = new ProducerRecord<>(properties.getProperty("topic.name"), person);
 
        // Send Data - Asynchronous
        producer.send(record, (RecordMetadata recordMetadata, Exception e) -> {
            if (e == null) {
                System.out.println("Received new metadata. \n" +
                        "Topic: " + recordMetadata.topic() + "\n" +
                        "Partition: " + recordMetadata.partition() + "\n" +
                        "Offset: " + recordMetadata.offset() + "\n" +
                        "Timestamp: " + recordMetadata.timestamp());
            } else {
                System.out.println("Error while producing: " + e);
            }
        }).get(); // block the .send() to make it synchronous - don't do this in production!
 
        // Flush Data
        producer.flush();
 
        // Flush and Close Producer
        producer.close();
    }
}