In this tutorial, you'll learn how to route events that result in exceptions to a dead-letter topic from a Kafka Streams application.
git clone git@github.com:confluentinc/tutorials.git
cd tutorialsLogin to your Confluent Cloud account:
confluent login --prompt --saveInstall a CLI plugin that will streamline the creation of resources in Confluent Cloud:
confluent plugin install confluent-quickstartRun the plugin from the top-level directory of the tutorials repository to create the Confluent Cloud resources needed for this tutorial. Note that you may specify a different cloud provider (gcp or azure) or region. You can find supported regions in a given cloud provider by running confluent kafka region list --cloud <CLOUD>.
confluent quickstart \
--environment-name kafka-streams-dlq-env \
--kafka-cluster-name kafka-streams-dlq-cluster \
--create-kafka-key \
--kafka-java-properties-file ./kafka-streams-dead-letter-queue/kstreams/src/main/resources/cloud.propertiesThe plugin should complete in under a minute.
Create the input and output topics for the application:
confluent kafka topic create input
confluent kafka topic create output
confluent kafka topic create dlq-topicStart a console producer:
confluent kafka topic produce inputProduce events with a ball attribute:
{"sport": "baseball", "ball": {"shape": "round", "dimensions": {"diameter": "2.9in", "weight": "5oz"}}}
{"sport": "tennis", "ball": {"shape": "round", "dimensions": {"diameter": "6.7cm", "weight": "58g"}}}And a few without a ball attribute:
{"sport": "swimming", "details": {"style": "backstroke", "distance": "400m"}}
{"sport": "gymnastics", "details": {"style": "floor routine"}}KIP-1034 introduced a new configuration for dead-letter queues in Kafka Streams applications. This feature provides a new configuration parameter that — when provided — instructs the default exception handler to send the erroneous event to this dead-letter queue topic. Here, the topology will just log the exception while sending an event to the dead-letter queue topic.
Properties properties = new Properties();
// KIP-1034: Configure Dead Letter Queue
properties.put(StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG, DLQ_TOPIC);
properties.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, "org.apache.kafka.streams.errors.LogAndContinueProcessingExceptionHandler");
The example topology throws a RuntimeException for any events where the ball attribute is null or missing:
builder.stream(INPUT_TOPIC, Consumed.with(stringSerde, stringSerde))
.mapValues(value -> {
try {
SportEvent event = objectMapper.readValue(value, SportEvent.class);
if (null == event.getBall() || event.getBall().isEmpty()) {
LOG.error("Sport '{}' is missing ball field - routing to DLQ", event.getSport());
throw new RuntimeException("Sport event missing required 'ball' field");
}
LOG.info("Successfully processed event - sport: {}, ball: {}",
event.getSport(), event.getBall().get());
return value;
} catch (IOException e) {
LOG.error("Failed to parse JSON value: {}", value, e);
throw new RuntimeException("Failed to parse JSON", e);
}
})
.to(OUTPUT_TOPIC, Produced.with(stringSerde, stringSerde));Compile the application:
./gradlew kafka-streams-dead-letter-queue:kstreams:shadowJarNavigate into the application directory:
cd kafka-streams-dead-letter-queue/kstreams/Run the application, passing the Kafka client configuration file generated when you created Confluent Cloud resources:
java -cp ./build/libs/kstreams-dlq-standalone.jar \
io.confluent.developer.KafkaStreamsDLQApplication \
cloud.propertiesObserve that the two events with a ball attribute show up in the output topic:
confluent kafka topic consume output -bSimilarly, consume events from the dlq-topic and you will see that they include headers for the exception that triggered the routing, along with the partition, offset, timestamp, key, and value of the original event. For example:
{
"partition_id": 0,
"offset": 2,
"timestamp": 1772480434575,
"headers": [
{
"key": "__streams.errors.exception",
"value": "java.lang.RuntimeException"
},
{
"key": "__streams.errors.message",
"value": "Sport event missing required 'ball' field"
},
{
"key": "__streams.errors.stacktrace",
"value": "java.lang.RuntimeException: Sport event missing required 'ball' field\n\tat io.confluent.developer.KafkaStreamsDLQApplication.lambda$buildTopology$0(KafkaStreamsApplication.java:40)\n\tat org.apache.kafka.streams.kstream.internals.AbstractStream.lambda$withKey$0(AbstractStream.java:104)\n\tat org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)\n\tat org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:294)\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:273)\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:229)\n\tat org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:95)\n\tat org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$0(StreamTask.java:907)\n\tat org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:954)\n\tat org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:907)\n\tat org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:811)\n\tat org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:95)\n\tat org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:76)\n\tat org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:2084)\n\tat org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:1265)\n\tat org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:952)\n\tat org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:912)\n"
},
{
"key": "__streams.errors.topic",
"value": "input"
},
{
"key": "__streams.errors.partition",
"value": "0"
},
{
"key": "__streams.errors.offset",
"value": "0"
}
],
"key": null,
"value": {
"sport": "swimming",
"details": {
"style": "backstroke",
"distance": "400m"
}
},
"metadata": {
"value_metadata": {
"data_format": "JSON"
}
}
}When you are finished, delete the kafka-streams-dlq-env environment by first getting the environment ID of the form env-123456 corresponding to it:
confluent environment listDelete the environment, including all resources created for this tutorial:
confluent environment delete <ENVIRONMENT ID>