Senior Developer Advocate (Presenter)
In practice, programmatically producing and consuming messages is an important way to interact with your Apache Kafka cluster and put data into motion. In the first exercise of this course, we gained experience consuming from and producing to a Kafka topic using the command line. With that experience under your belt, let’s explore a more programmatic way of consuming messages by writing a consumer script in Python.
We recommend following these exercises in order so that you have everything you need to successfully complete the exercise. If you haven’t already set up your CLI to connect to your Confluent Cloud cluster, take a look at the previous exercise to get up to speed.
confluent-kafka
library.virtualenv env
source env/bin/activate
pip install confluent-kafka
confluent kafka cluster describe
The output contains an endpoint field; take note of the hostname and port from this value, for example, pkc-abc12.us-central1.gcp.confluent.cloud:9092
.
confluent api-key create --resource {ID}
Then set the key using:
confluent api-key use {API Key} --resource {ID}
config.ini
and populate it with the following, substituting in your endpoint, key and secret values:
[default]
bootstrap.servers=< Endpoint >
security.protocol=SASL_SSL
sasl.mechanisms=PLAIN
sasl.username=< API Key >
sasl.password=< API Secret >
[consumer]
group.id=python_kafka101_group_1
# 'auto.offset.reset=earliest' to start reading from the beginning of
# the topic if no committed offsets exist.
auto.offset.reset=earliest
consumer.py
that contains the following:#!/usr/bin/env python
from argparse import ArgumentParser, FileType
from configparser import ConfigParser
from confluent_kafka import Consumer
if __name__ == '__main__':
# Parse the command line.
parser = ArgumentParser()
parser.add_argument('config_file', type=FileType('r'))
args = parser.parse_args()
# Parse the configuration.
config_parser = ConfigParser()
config_parser.read_file(args.config_file)
config = dict(config_parser['default'])
config.update(config_parser['consumer'])
# Create Consumer instance
consumer = Consumer(config)
# Subscribe to topic
topic = "poems"
consumer.subscribe([topic])
# Poll for new messages from Kafka and print them.
try:
while True:
msg = consumer.poll(1.0)
if msg is None:
print("Waiting...")
elif msg.error():
print("ERROR: %s".format(msg.error()))
else:
# Extract the (optional) key and value, and print.
print("Consumed event from topic {topic}: key = {key:12} value = {value:12}".format(topic=msg.topic(), key=msg.key().decode('utf-8'), value=msg.value().decode('utf-8')))
except KeyboardInterrupt:
pass
finally:
# Leave group and commit final offsets
consumer.close()
chmod u+x consumer.py
./consumer.py config.ini
This script was deliberately simple, but the steps of configuring your consumer, subscribing to a topic, and polling for events are common across all consumers. If you want more details on additional configuration changes that you may want to play around with in your consumer, feel free to check out the getting started guides on Confluent Developer.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.