Create a directory for the Java files in this project:
mkdir -p src/main/java/io/confluent/developer
Before you create your full application code, let’s highlight some of the most important ProducerConfig
configuration parameters relevant to the idempotent producer:
enable.idempotence=true
acks=all
max.in.flight.requests.per.connection=5
retries=2147483647
The following parameter is required to be explicitly configured:
-
enable.idempotence: when set to true
, it enables an idempotent producer which ensures that exactly one copy of each message is written to the brokers, and in order.
The default value is enable.idempotence=false
, so you must explicitly set this to enable.idempotence=true
.
The other parameters may not be required to be explicitly set, but there are some noteworthy caveats:
-
acks: the KafkaProducer
uses the acks
configuration to tell the leader broker how many acknowledgments to wait for to consider a produce request complete.
This value must be acks=all
for the idempotent producer to work, otherwise the producer cannot guarantee idempotence.
The default value is acks=1
, so you have two choices: (a) do not explicitly set it in the configuration and allow the producer automatically override it, or (b) explicitly set this to acks=all
.
The producer will fail to start if enable.idempotence=true
and your application configures this to anything but acks=all
.
-
max.in.flight.requests.per.connection: the maximum number of unacknowledged requests the producer sends on a single connection before blocking.
The idempotent producer still maintains message order even with pipelining (i.e., max.in.flight.requests.per.connection
can be greater than 1), and the maximum value supported with idempotency is 5. The default value is already max.in.flight.requests.per.connection=5
, so no change is required for the idempotent producer.
-
retries: setting a value greater than zero will cause the producer to resend any record whose send fails with a potentially transient error.
The only requirement for idempotency is that this is greater than zero.
The default value is already retries=2147483647
, so no change is required for the idempotent producer.
This is only a small subset of producer configuration parameters focused on idempotent producer semantics.
For further reading, please see KIP-98.
This KIP also discusses other elements of exactly once semantics (EOS), including transactional guarantees which enable applications to produce to multiple partitions atomically, ie. all writes across multiple partitions can succeed or fail as a unit.
Now let’s create the application source code at src/main/java/io/confluent/developer/KafkaProducerApplication.java
.
package io.confluent.developer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.common.serialization.StringSerializer;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
public class KafkaProducerApplication {
private final Producer<String, String> producer;
final String outTopic;
public KafkaProducerApplication(final Producer<String, String> producer,
final String topic) {
this.producer = producer;
outTopic = topic;
}
public void produce(final String message) {
final String[] parts = message.split("-");
final String key, value;
if (parts.length > 1) {
key = parts[0];
value = parts[1];
} else {
key = null;
value = parts[0];
}
final ProducerRecord<String, String> producerRecord = new ProducerRecord<>(outTopic, key, value);
producer.send(producerRecord,
(recordMetadata, e) -> {
if(e != null) {
e.printStackTrace();
} else {
System.out.println("key/value " + key + "/" + value + "\twritten to topic[partition] " + recordMetadata.topic() + "[" + recordMetadata.partition() + "] at offset " + recordMetadata.offset());
}
}
);
}
public void shutdown() {
producer.close();
}
public static Properties loadProperties(String fileName) throws IOException {
final Properties envProps = new Properties();
final FileInputStream input = new FileInputStream(fileName);
envProps.load(input);
input.close();
return envProps;
}
public static void main(String[] args) throws Exception {
if (args.length < 2) {
throw new IllegalArgumentException(
"This program takes two arguments: the path to an environment configuration file and" +
"the path to the file with records to send");
}
final Properties props = KafkaProducerApplication.loadProperties(args[0]);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.CLIENT_ID_CONFIG, "myApp");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
final String topic = props.getProperty("output.topic.name");
final Producer<String, String> producer = new KafkaProducer<>(props);
final KafkaProducerApplication producerApp = new KafkaProducerApplication(producer, topic);
String filePath = args[1];
try {
List<String> linesToProduce = Files.readAllLines(Paths.get(filePath));
linesToProduce.stream()
.filter(l -> !l.trim().isEmpty())
.forEach(producerApp::produce);
System.out.println("Offsets and timestamps committed in batch from " + filePath);
} catch (IOException e) {
System.err.printf("Error reading file %s due to %s %n", filePath, e);
} finally {
producerApp.shutdown();
}
}
}