Frequently asked questions and answers about Kafka consumers and producers, and how to send/receive data using Kafka.
Here's an example in Python of a Kafka producer. For more languages and frameworks, including Java, .NET, Go, JavaScript, and Spring see our Getting Started guides.
from confluent_kafka import Producer
# Configure a Producer
config = {
"bootstrap.servers": "localhost:8080"
}
producer = Producer(config)
# Create an error-handling callback.
def delivery_callback(err, msg):
if err:
print(f"ERROR: Message failed delivery: {err}")
else:
print(f"Produced event key = {msg.key()} value = {msg.value()}")
# Produce data.
mykey = ...
myvalue = ...
producer.produce("mytopic", mykey, myvalue, callback=delivery_callback)
# Cleanup
producer.flush()
Check out our client code samples and free training courses to learn more.
Here's an example of a consumer in Python. For more languages and frameworks, including Java, .NET, Go, JavaScript, and Spring see our Getting Started guides.
from confluent_kafka import Consumer
# Configure a Consumer
config = {
"bootstrap.servers": "localhost:8080",
"group.id": "example-consumer-group",
"auto.offset.reset": "earliest"
}
consumer = Consumer(config)
consumer.subscribe(["mytopic"])
# Poll for new messages from Kafka and print them
try:
while True:
msg = consumer.poll(1.0)
if msg is None:
print("Waiting...")
elif msg.error():
print(f"ERROR: {msg.error()}")
else:
print(f"Consumed event key = {msg.key().decode('utf-8')} value = {msg.value().decode('utf-8')}")
except KeyboardInterrupt:
pass
finally:
# Cleanup.
consumer.close()
Check out our client code samples and free training courses to learn more.
A consumer group in Kafka is a single logical consumer implemented with multiple physical consumers for reasons of throughput and resilience.
When a single consumer cannot keep up with the throughput of messages that Kafka is providing, you can horizontally scale that consumer by creating additional instances of it.
The partitions of all the topics are divided among the consumers in the group. As new group members arrive and old members leave, the partitions are reassigned so that each member receives its proportional share of partitions. This is known as rebalancing the group. Kafka keeps track of the members of a consumer group and allocates data to them.
To utilize all of the consumers in a consumer group, there must be as many partitions as consumer group members. If there are fewer partitions than consumers in the group, then there will be idle consumers. If there are more partitions than consumers in the group, then consumers will read from more than one partition.
You can learn more about consumers in this free Apache Kafka 101 course.
Group ID is a configuration item in Kafka consumers. If you want multiple consumers to share a workload, you give them the same group.id. If you want a consumer to work independently, you give it a unique group.id. Group ID is just a string.
Kafka consumers are built to scale across multiple machines, by working in groups. Each consumer tells the Kafka broker which group it belongs to, and then outbound messages will be automatically load-balanced among members of that group.
For example, you might have a purchases topic. The system you’re writing is to trigger a notification when a customer buys something. Sending out large volumes of email can be slow, so you might have five machines all consuming with a group.id = "email", so they can share that load. At the same time, you might want to summarize those purchases to get sales figures, and that might only need one machine with its own group.id = "sales".
Group IDs should not be confused with client IDs. A group ID will affect the way records are consumed, but a client ID is just a label.
A client ID in Kafka is a label you define that names a particular consumer or producer. You can give your client a friendly name so that debugging is easier. For details see the consumer and producer documentation.
Client IDs should not be confused with group IDs. A group ID will affect the way records are consumed, but a client ID is just a label.
To connect to Kafka you need to find a client library for your language, or use the REST Proxy. There are client libraries for over 20 programming languages, so finding one is generally easy.
You can see some language-specific examples further down this page, or take a look at the documentation for your language's client, but the process is always:
Kafka connections are separated into producers (write-only connections) and consumers (read-only connections).
import org.apache.kafka.clients.producer.*;
...
// Configure and connect.
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
Producer<String, String> producer = new KafkaProducer<>(props);
// Create an event.
String myKey = ...;
Object myValue = ...;
producer.send(
new ProducerRecord<>("mytopic", myKey, myValue),
(event, exception) -> {
if (exception != null) {
exception.printStackTrace();
};
});
}
// Cleanup
producer.flush();
producer.close();
import org.apache.kafka.clients.consumer.*;
// Configure and connect.
Properties props = loadConfig(args[0]);
props.put("bootstrap.servers", "localhost:9092");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("group.id", "example-consumer-group");
props.put("auto.offset.reset", "earliest");
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("mytopic"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
String key = record.key();
String value = record.value();
System.out.println(String.format("Consumed event key = %s value = %s", key, value));
}
}
} finally {
// Cleanup.
consumer.close();
}
For more, see the full Getting Started with Apache Kafka and Java walkthrough.
Learn how Kafka works, how to use it, and how to get started.
This hands-on course will show you how to build event-driven applications with Spring Boot and Kafka Streams.
Build a scalable, streaming data pipeline in under 20 minutes using Kafka and Confluent.