The producers are the clients that send messages to the Kafka cluster. They can send messages to a specific topic or a partition of a topic. Where the message is sent depends on the partitioning strategy used and the key of the message which is optional. The partitioning strategy can be Round-robin (no-key) or based on the key of the message (or any other custom strategy). An example configuration is something like this:
bootstrap.servers=localhost:9092
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
topic.name=example_topic
The bootstrap.servers
is the address of the Kafka cluster.
The key.serializer
and value.serializer
are the serializers for the key and value of the message.
The serializers are used to convert the key and value of the message to bytes before sending them to the Kafka cluster.
The value.serializer
can be almost anything, but the usual scenarios are:
org.apache.kafka.common.serialization.StringSerializer
for string messages (UTF-8) that have no real structureio.confluent.kafka.serializers.KafkaAvroSerializer
for messages that have a schema (like JSON, XML, etc.), where the schema is stored in the Schema Registry, and the messages are serialized using the Avro serialization format.io.confluent.kafka.serializers.KafkaJsonSerializer
for messages that have a schema (like JSON, XML, etc.), where the schema is stored in the Schema Registry, and the messages are serialized using the JSON serialization format.io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer
for messages that have a schema (like JSON, XML, etc.), where the schema is stored in the Schema Registry, and the messages are serialized using the Protobuf serialization format.
The topic.name
is the name of the topic to which the messages will be sent.
Note: In the case of the Avro, JSON, and Protobuf serializers, the schema is stored in the Schema Registry, and the messages are serialized using the schema. This means that another property is needed to specify the address of the Schema Registry. The property is
schema.registry.url
.
Let’s take a look at a simple producer example in Java:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class ProducerDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// Create Producer Properties
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("key.serializer", org.apache.kafka.common.serialization.StringSerializer.class);
properties.setProperty("value.serializer", org.apache.kafka.common.serialization.StringSerializer.class);
properties.setProperty("topic.name", "example_topic")
// Create the Producer
KafkaProducer<String, String> producer = new KafkaProducer<>(properties); // The KafkaProducer will ignore any properties that are not needed
// Create a Producer Record
ProducerRecord<String, String> record = new ProducerRecord<>(properties.getProperty("topic.name"), "hello world"); // This means that the value of the message is "hello world" and that it will be sent to the topic "example_topic" (the key is null)
// Send Data - Asynchronous
producer.send(record, (RecordMetadata recordMetadata, Exception e) -> { // Callback function that is executed every time a record is sent or an exception is thrown while sending the record
if (e == null) {
System.out.println("Received new metadata. \n" +
"Topic: " + recordMetadata.topic() + "\n" +
"Partition: " + recordMetadata.partition() + "\n" +
"Offset: " + recordMetadata.offset() + "\n" +
"Timestamp: " + recordMetadata.timestamp());
} else {
System.out.println("Error while producing: " + e);
}
}).get(); // block the .send() to make it synchronous - don't do this in production!
// Flush Data
producer.flush();
// Flush and Close Producer
producer.close();
}
}
A similar example in Python:
from kafka import KafkaProducer
from kafka.errors import KafkaError
properties = {
"bootstrap.servers": "localhost:9092",
"topic.name": "example_topic"
}
producer = KafkaProducer(
bootstrap_servers=properties["bootstrap.servers"],
key_serializer=str.encode,
value_serializer=str.encode
)
# The b'hello world' means that the value of the message is "hello world" and that it will be sent to the topic "example_topic" (the key is null)
future = producer.send(properties["topic.name"], b'hello world')
try:
record_metadata = future.get(timeout=10)
print(record_metadata.topic)
print(record_metadata.partition)
print(record_metadata.offset)
except KafkaError as e:
print(e)
producer.flush()
producer.close()
Note: For the Python example, I’m using the kafka-python library. It is not the official library, but it is the most popular one.