Create a directory for the Java files in this project:
mkdir -p src/main/java/io/confluent/developer
Before you create your application file, let’s look at some of the key points of this program:
KafkaProducerCallbackApplication constructor
public class KafkaProducerCallbackApplication {
private final Producer<String, String> producer;
final String outTopic;
public KafkaProducerCallbackApplication(final Producer<String, String> producer, (1)
final String topic) { (2)
this.producer = producer;
outTopic = topic;
}
1 |
Passing in the Producer instance as a constructor parameter. |
2 |
The topic to write records to |
In this tutorial you’ll inject the dependencies in the KafkaProducerCallbackApplication.main()
method.
Having this thin wrapper class around a Producer
is not required, but it does help with making our code easier to test. We’ll go into more details in the testing section of the tutorial.
(In practice you may want to use a dependency injection framework library, such as the Spring Framework).
Next let’s take a look at the KafkaProducerCallbackApplication.produce
method
KafkaProducerCallbackApplication.produce
public void produce(final String message) {
final String[] parts = message.split("-"); (1)
final String key, value;
if (parts.length > 1) {
key = parts[0];
value = parts[1];
} else {
key = "NO-KEY";
value = parts[0];
}
final ProducerRecord<String, String> producerRecord = new ProducerRecord<>(outTopic, key, value); (2)
producer.send(producerRecord, (recordMetadata, exception) -> { (3)
if (exception == null) { (4)
System.out.println("Record written to offset " +
recordMetadata.offset() + " timestamp " +
recordMetadata.timestamp());
} else {
System.err.println("An error occurred"); (5)
exception.printStackTrace(System.err);
}
});
}
1 |
Process the String for sending message |
2 |
Create the ProducerRecord |
3 |
Send the record to the broker specifying a Callback instance as a lambda function |
4 |
If there’s no exceptions print the offset and timestamp of the acknowledged record |
5 |
Error handling portion-in this case printing the stacktrace to System.err |
The KafkaProducerCallbackApplication.produce
method does some processing on a String
, and then sends it as a ProducerRecord
. While this code is a trivial example, it’s enough to show the example of using a KafkaProducer
.
The Callback
provides a way of handling any actions you want to take on request completion asynchronously. Note that the Callback
code executes on the producer’s I/O thread and any time consuming tasks could cause a delay in sending new records, so any code here should be designed to execute quickly.
The KafkaProducer.send
method is asynchronous and returns as soon as the provided record is placed in the buffer of records to be sent to the broker. Once the broker acknowledges that the record has been appended to its log, the broker completes the produce request, which the application receives as RecordMetadata
—information about the committed message.
In this example, the code in the callback just prints information from each record’s RecordMetadata
object, specifically the timestamp
and offset
.
Now go ahead and create the following file at src/main/java/io/confluent/developer/KafkaProducerCallbackApplication.java
.
package io.confluent.developer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;
import java.util.Properties;
public class KafkaProducerCallbackApplication {
private final Producer<String, String> producer;
final String outTopic;
public KafkaProducerCallbackApplication(final Producer<String, String> producer,
final String topic) {
this.producer = producer;
outTopic = topic;
}
public void produce(final String message) {
final String[] parts = message.split("-");
final String key, value;
if (parts.length > 1) {
key = parts[0];
value = parts[1];
} else {
key = null;
value = parts[0];
}
final ProducerRecord<String, String> producerRecord = new ProducerRecord<>(outTopic, key, value);
producer.send(producerRecord, (recordMetadata, exception) -> {
if (exception == null) {
System.out.println("Record written to offset " +
recordMetadata.offset() + " timestamp " +
recordMetadata.timestamp());
} else {
System.err.println("An error occurred");
exception.printStackTrace(System.err);
}
});
}
public void shutdown() {
producer.close();
}
public static Properties loadProperties(String fileName) throws IOException {
final Properties envProps = new Properties();
final FileInputStream input = new FileInputStream(fileName);
envProps.load(input);
input.close();
return envProps;
}
public static void main(String[] args) throws Exception {
if (args.length < 2) {
throw new IllegalArgumentException(
"This program takes two arguments: the path to an environment configuration file and" +
"the path to the file with records to send");
}
final Properties props = KafkaProducerCallbackApplication.loadProperties(args[0]);
final String topic = props.getProperty("output.topic.name");
final Producer<String, String> producer = new KafkaProducer<>(props);
final KafkaProducerCallbackApplication producerApp = new KafkaProducerCallbackApplication(producer, topic);
String filePath = args[1];
try {
List<String> linesToProduce = Files.readAllLines(Paths.get(filePath));
linesToProduce.stream()
.filter(l -> !l.trim().isEmpty())
.forEach(producerApp::produce);
} catch (IOException e) {
System.err.printf("Error reading file %s due to %s %n", filePath, e);
} finally {
producerApp.shutdown();
}
}
}