Console Producer and Consumer with (de)serializers

Question:

What is the simplest way to write messages to and read messages from Kafka, using (de)serializers and Schema Registry?

Edit this page

Example use case:

You'd like to produce and consume some basic messages, using (de)serializers and Schema Registry. In this tutorial, we'll show you how to produce and consume messages from the command line without any code. Unlike the CLI Basics tutorial, this tutorial uses Avro and Schema Registry.

Hands-on code example:

New to Confluent Cloud? Get started here.




Short Answer

With Confluent Cloud, you can use the Confluent CLI to produce and consume messages. In the following code snippet, substitute the topic name and schema file path.

Producer:

confluent kafka topic produce orders-avro --value-format avro --schema orders-avro-schema.json --parse-key --delimiter ":"

Consumer:

confluent kafka topic consume orders-avro --value-format avro --print-key --delimiter ":"

Run it

Provision your Kafka cluster

1

This tutorial requires access to an Apache Kafka cluster, and the quickest way to get started free is on Confluent Cloud, which provides Kafka as a fully managed service.

Take me to Confluent Cloud
  1. After you log in to Confluent Cloud, click on Add cloud environment and name the environment learn-kafka. Using a new environment keeps your learning resources separate from your other Confluent Cloud resources.

  2. From the Billing & payment section in the Menu, apply the promo code CC100KTS to receive an additional $100 free usage on Confluent Cloud (details).

  3. Click on LEARN and follow the instructions to launch a Kafka cluster and to enable Schema Registry.

Confluent Cloud

Download and setup the Confluent CLI

2

This tutorial has some steps for Kafka topic management and/or reading from or writing to Kafka topics, for which you can use the Confluent Cloud Console or install the Confluent CLI. Instructions for installing Confluent CLI and configuring it to your Confluent Cloud environment is available from within the Confluent Cloud Console: navigate to your Kafka cluster, click on the CLI and tools link, and run through the steps in the Confluent CLI tab.

The CLI clients for Confluent Cloud (ccloud) and Confluent Platform (confluent v1.0) have been unified into a single client Confluent CLI confluent v2.0. This tutorial uses the unified Confluent CLI confluent v2.0 (ccloud client will continue to work until sunset on May 9, 2022, and you can read the migration instructions to the unified confluent CLI at https://docs.confluent.io/confluent-cli/current/migrate.html).

Create the Kafka topic

3

Create a Kafka topic called orders-avro in Confluent Cloud.

confluent kafka topic create orders-avro --partitions 1

This should yield the following output:

Created topic "orders-avro".

Create a schema for your records

4

We are going to use the Confluent Cloud managed Schema Registry to control our record format. The first step is creating a schema definition which we will use when producing new records.

Create the following orders-avro-schema.json file:

{
"type": "record",
"namespace": "io.confluent.tutorial",
"name": "OrderDetail",
"fields": [
    {"name": "number", "type": "long", "doc": "The order number."},
    {"name": "date", "type": "long", "logicalType": "date", "doc": "The date the order was submitted."},
    {"name": "shipping_address", "type": "string", "doc": "The shipping address."},
    {"name": "subtotal", "type": "double", "doc": "The amount without shipping cost and tax."},
    {"name": "shipping_cost", "type": "double", "doc": "The shipping cost."},
    {"name": "tax", "type": "double", "doc": "The applicable tax."},
    {"name": "grand_total", "type": "double", "doc": "The order grand total ."}
    ]
}

Start a console consumer

5

Next, let’s open up a consumer to read records from the new topic.

From the same terminal you used to create the topic above, run the following command to start a console consumer with the ccloud CLI:

confluent kafka topic consume orders-avro --value-format avro --print-key --delimiter ":"

You will be prompted for the Confluent Cloud Schema Registry credentials as shown below. Enter the values you got from when you enabled Schema Registry in the Confluent Cloud Console.

Enter your Schema Registry API key:
Enter your Schema Registry API secret:

The consumer will start up and block waiting for records, you won’t see any output until after the next step.

Produce events to the Kafka topic

6

Now we are going to produce records to our new topic using the schema created a few steps back. Open a second terminal window and start the producer:

confluent kafka topic produce orders-avro --value-format avro --schema orders-avro-schema.json

The producer will start with some information and then wait for you to enter input.

Successfully registered schema with ID 100001
Starting Kafka Producer. ^C or ^D to exit

Below are example records in JSON format with each line representing a single record. In this case we are producing records in Avro format, however, first they are passed to the producer in JSON and the producer converts them to Avro based on the orders-avro-schema.json schema prior to sending them to Kafka.

Copy each line and paste it into the producer terminal, pressing enter after each one to produce the new record.

{"number":1,"date":18500,"shipping_address":"ABC Sesame Street,Wichita, KS. 12345","subtotal":110.00,"tax":10.00,"grand_total":120.00,"shipping_cost":0.00}
{"number":2,"date":18501,"shipping_address":"123 Cross Street,Irving, CA. 12345","subtotal":5.00,"tax":0.53,"grand_total":6.53,"shipping_cost":1.00}
{"number":3,"date":18502,"shipping_address":"5014  Pinnickinick Street, Portland, WA. 97205","subtotal":93.45,"tax":9.34,"grand_total":102.79,"shipping_cost":0.00}
{"number":4,"date":18503,"shipping_address":"4082 Elmwood Avenue, Tempe, AX. 85281","subtotal":50.00,"tax":1.00,"grand_total":51.00,"shipping_cost":0.00}
{"number":5,"date":18504,"shipping_address":"123 Cross Street,Irving, CA. 12345","subtotal":33.00,"tax":3.33,"grand_total":38.33,"shipping_cost":2.00}

As you produce records you can observe them in the consumer terminal.

Produce records with full key-value pairs

7

Kafka works with key-value pairs, but so far you’ve only sent records with values only. Well to be fair you’ve sent key-value pairs, but the keys are null. Sometimes you’ll need to send a valid key in addition to the value from the command line.

To enable sending full key-value pairs from the command line you add two properties to your Confluent CLI, parse-key and delimiter

Let’s try to send some full key-value records now. If your previous producer is still running close it with a CTRL+C and run the following command to start a new console producer:

confluent kafka topic produce orders-avro --value-format avro --schema orders-avro-schema.json --parse-key --delimiter ":"

Then enter these records either one at time or copy-paste all of them into the terminal and hit enter:

6:{"number":6,"date":18505,"shipping_address":"9182 Shipyard Drive, Raleigh, NC. 27609","subtotal":72.00,"tax":3.00,"grand_total":75.00,"shipping_cost":0.00}
7:{"number":7,"date":18506,"shipping_address":"644 Lagon Street, Chicago, IL. 07712","subtotal":11.00,"tax":1.00,"grand_total":14.00,"shipping_cost":2.00}

Start a consumer to show full key-value pairs

8

Next, let’s open up a consumer to read records from the new topic.

From the same terminal you used to create the topic above, run the following command to start a console consumer with the ccloud CLI:

confluent kafka topic consume orders-avro --value-format avro --print-key --delimiter ":"

After the consumer starts you should see the following output in a few seconds:

null-{"number":1,"date":18500,"shipping_address":"ABC Sesame Street,Wichita, KS. 12345","subtotal":110.00,"tax":10.00,"grand_total":120.00,"shipping_cost":0.00}
null-{"number":2,"date":18501,"shipping_address":"123 Cross Street,Irving, CA. 12345","subtotal":5.00,"tax":0.53,"grand_total":6.53,"shipping_cost":1.00}
null-{"number":3,"date":18502,"shipping_address":"5014  Pinnickinick Street, Portland, WA. 97205","subtotal":93.45,"tax":9.34,"grand_total":102.79,"shipping_cost":0.00}
null-{"number":4,"date":18503,"shipping_address":"4082 Elmwood Avenue, Tempe, AX. 85281","subtotal":50.00,"tax":1.00,"grand_total":51.00,"shipping_cost":0.00}
null-{"number":5,"date":18504,"shipping_address":"123 Cross Street,Irving, CA. 12345","subtotal":33.00,"tax":3.33,"grand_total":38.33,"shipping_cost":2.00}
6-{"number":6,"date":18505,"shipping_address":"9182 Shipyard Drive, Raleigh, NC. 27609","subtotal":72.00,"tax":3.00,"grand_total":75.00,"shipping_cost":0.00}
7-{"number":7,"date":18506,"shipping_address":"644 Lagon Street, Chicago, IL. 07712","subtotal":11.00,"tax":1.00,"grand_total":14.00,"shipping_cost":2.00}

Since we kept the --from-beginning property, you’ll see all the records sent to the topic. You’ll notice the results before you sent keys are formatted as null-<value>.

Teardown Confluent Cloud resources

9

You may try another tutorial, but if you don’t plan on doing other tutorials, use the Confluent Cloud Console or CLI to destroy all the resources you created. Verify they are destroyed to avoid unexpected charges.