Confluent Cloud provides a set of REST APIs for interacting with cloud resources. The Produce API allows clients to produce records to a Kafka topic via an HTTP POST request. However, this API currently does not support a built-in way to produce records that use schemas from Schema Registry. Despite this limitation, clients can still produce such records by sending binary data that conforms to the message wire format documented here. This tutorial walks through the necessary steps.
git clone git@github.com:confluentinc/tutorials.git
cd tutorials/confluent-cloud-rest-produce-schema-registry
Log in to Confluent Cloud using the Confluent CLI:
confluent login --prompt
Next, set the active environment and Kafka cluster to make running later commands easier.
Find your environment ID of the form env-abcdef:
confluent environment list
Set it as the active environment:
confluent environment use <ENVIRONMENT ID>
Similarly, find your Kafka cluster ID of the form lkc-abcdef:
confluent kafka cluster list
Set it as the active cluster:
confluent kafka cluster use <CLUSTER ID>
Create a Kafka topic named orders:
confluent kafka topic create orders
Associate the schema in the file order.avsc with the record values in the orders topic:
confluent schema-registry schema create --subject orders-value --schema order.avsc --type avro
The schema defines an order with a customer ID, product ID, and product name:
{
"type": "record",
"namespace": "io.confluent.developer",
"name": "Order",
"fields": [
{
"name": "product_id",
"type": "int"
},
{
"name": "customer_id",
"type": "int"
},
{
"name": "product_name",
"type": "string"
}
]
}
Following the Produce API data payload specification, we must send base64-encoded binary data that conforms to this wire format.
The GitHub repo contains a sample Python script to help with this.
First, instantiate a byte buffer and write the magic byte (0), followed by the schema ID in the next 4 bytes:
buffer = io.BytesIO()
# write magic byte (zero)
buffer.write(b'\x00')
# write schema ID in next 4 bytes
buffer.write(args.schema_id.to_bytes(4))
Next, write the Avro-formatted data from payload.json into the same buffer. The sample payload looks like:
{
"product_id": 19287,
"customer_id": 8737,
"product_name": "AirSwift Light Sneakers"
}
To write this binary-encoded data using the Avro Python API:
# write the Avro-formatted data
DatumWriter(schema).write(record, BinaryEncoder(buffer))
Finally, base64-encode the entire payload and print it:
print(base64.b64encode(buffer.getvalue()).decode("utf-8"))
To run the script and generate the encoded payload:
confluent schema-registry schema describe --subject orders-value --version latest | grep "Schema ID"
pip install avro
PAYLOAD=$(./generate_sample_avro_value.py --schema-path ./order.avsc \
--payload-path ./payload.json \
--schema-id <SCHEMA ID>)
$ echo $PAYLOAD
AAABhqOurQLCiAEuQWlyU3dpZnQgTGlnaHQgU25lYWtlcnM=
With the base64-encoded message ready, set the environment variables REST_ENDPOINT and CLUSTER_ID by describing the cluster:
read -r CLUSTER_ID REST_ENDPOINT < \
<(confluent kafka cluster describe -o json | jq -cr '"\(.id) \(.rest_endpoint)"')
Example output:
$ echo $CLUSTER_ID
lkc-onq9jx
$ echo $REST_ENDPOINT
https://pkc-921jm.us-east-2.aws.confluent.cloud:443
Next, generate base64-encoded Basic Auth credentials:
BASIC_CREDENTIALS=$(confluent api-key create --resource $CLUSTER_ID -o json \
| jq -cr '"\(.api_key):\(.api_secret)"' \
| base64)
Send the POST request using curl:
curl --request POST \
-H 'Content-Type: application/json' \
--url "$REST_ENDPOINT/kafka/v3/clusters/$CLUSTER_ID/topics/orders/records" \
--header "Authorization: Basic $BASIC_CREDENTIALS" \
--data-raw "{\"key\":{\"type\":\"STRING\",\"data\":\"k1\"}, \"value\":{\"type\":\"BINARY\",\"data\":\"$PAYLOAD\"}}"
To consume the message, set the previously created API key as active in your CLI session:
API_KEY=$(confluent api-key list --current-user --resource $CLUSTER_ID -o json \
| jq -cr '.[0].key')
confluent api-key use $API_KEY
Use the Confluent CLI and specify the value format as Avro:
confluent kafka topic consume orders \
--value-format avro \
--key-format string \
--print-key \
--from-beginning
This prints:
k1 {"customer_id":8737,"product_id":19287,"product_name":"AirSwift Light Sneakers"}
confluent api-key delete $API_KEY
confluent kafka cluster delete $CLUSTER_ID