Get Started Free
course: Apache Kafka® 101

Hands-on Exercise: Kafka Consumer

Gilles Philippart profile picture  (round 128px)

Gilles Philippart

Software Practice Lead

Hands On: Consumers

In practice, programmatically producing and consuming messages is an essential way to interact with your Apache Kafka® cluster and put data into motion. In the first exercise of this course, you gained experience consuming from and producing to a Kafka topic using the command line.

With that experience under your belt, let’s explore a more programmatic way of consuming messages by writing a Kafka Consumer in Python.

We recommend following these exercises in order to ensure you have everything you need to successfully complete the exercise. If you haven’t already set up your CLI to connect to your Confluent Cloud cluster, check out the first exercise to get up to speed.

Step-by-Step instructions

  1. Ensure prerequisites are installed: Confirm that Python 3 and pip are installed on your machine. If they’re not, install Python3. Using Windows? You might want to try Windows Subsystem for Linux.

    This guide assumes that you already have Python 3 installed.

  2. Set up a virtual environment and install the client library: Open a terminal and run:

    mkdir kafka-101-consumers && cd kafka-101-consumers
    python3 -m venv env
    source env/bin/activate
    pip install confluent-kafka
  3. Determine your cluster endpoint: Run the following command:

    confluent kafka cluster describe

    In the output, locate the endpoint field–note the hostname and port, eg: pkc-abc12.us-central1.gcp.confluent.cloud:9092, ignore the SASL_SSL part.

  4. Create or retrieve your API key and secret:

    You need an API key and secret in order to proceed. If you need a new one, make note of the cluster ID that was printed in step 3 and use it to run:

    confluent api-key create --resource {ID}

    Then set the key using:

    confluent api-key use {API Key} --resource {ID}
  5. Using your favorite text editor, create a configuration file named config.ini and populate it with the following, substituting in your endpoint, key and secret values:

    [default]
    bootstrap.servers=< Endpoint >
    security.protocol=SASL_SSL
    sasl.mechanisms=PLAIN
    sasl.username=< API Key >
    sasl.password=< API Secret >
    
    [consumer]
    group.id=python_kafka101_group_1
    # 'auto.offset.reset=earliest' to start reading from the beginning of
    # the topic if no committed offsets exist.
    auto.offset.reset=earliest
  6. Create another file named consumer.py that contains the following:

    #!/usr/bin/env python
    from argparse import ArgumentParser, FileType
    from configparser import ConfigParser
    from confluent_kafka import Consumer
    if __name__ == '__main__':
        # Parse the command line.
        parser = ArgumentParser()
        parser.add_argument('config_file', type=FileType('r'))
        args = parser.parse_args()
    
        # Parse the configuration.
        config_parser = ConfigParser()
        config_parser.read_file(args.config_file)
        config = dict(config_parser['default'])
        config.update(config_parser['consumer'])
    
        # Create Consumer instance
        consumer = Consumer(config)
    
        # Subscribe to topic
        topic = "thermostat_readings"
        consumer.subscribe([topic])
    
        # Poll for new messages from Kafka and print them.
        try:
            while True:
                msg = consumer.poll(1.0)
                if msg is None:
                    print("Waiting...")
                elif msg.error():
                    print("ERROR: %s".format(msg.error()))
                else:
                    # Extract the (optional) key and value, and print.
                    print("Consumed event from topic {topic}: key = {key} value = {value}".format(topic=msg.topic(), key=msg.key(), value=msg.value()))
        except KeyboardInterrupt:
            pass
        finally:
            # Leave group and commit final offsets
            consumer.close()
  7. Make the script executable and run:

    chmod u+x consumer.py
    ./consumer.py config.ini
  8. Observe the messages being output and stop the consumer script using ctrl+C.

This script was deliberately simple, but the steps of configuring your consumer, subscribing to a topic, and polling for events are common across all consumers. If you want more details on additional configuration changes that you may want to play around with in your consumer, feel free to check out the getting started guides on Confluent Developer.

Do you have questions or comments? Join us in the #confluent-developer community Slack channel to engage in discussions with the creators of this content.

Use the promo codes KAFKA101 & CONFLUENTDEV1 to get $25 of free Confluent Cloud storage and skip credit card entry.

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.