confluent kafka topic create parallel-consumer-input-topic
How can I consume Kafka topics with a higher degree of parallelism than the partition count?
This tutorial requires access to an Apache Kafka cluster, and the quickest way to get started free is on Confluent Cloud, which provides Kafka as a fully managed service.
After you log in to Confluent Cloud, click Environments
in the lefthand navigation, click on Add cloud environment
, and name the environment learn-kafka
. Using a new environment keeps your learning resources separate from your other Confluent Cloud resources.
From the Billing & payment
section in the menu, apply the promo code CC100KTS
to receive an additional $100 free usage on Confluent Cloud (details). To avoid having to enter a credit card, add an additional promo code CONFLUENTDEV1
. With this promo code, you will not have to enter a credit card for 30 days or until your credits run out.
Click on LEARN and follow the instructions to launch a Kafka cluster and enable Schema Registry.
Make a local directory anywhere you’d like for this project:
mkdir confluent-parallel-consumer-application && cd confluent-parallel-consumer-application
Next, create a directory for configuration data:
mkdir configuration
From the Confluent Cloud Console, navigate to your Kafka cluster and then select Clients
in the lefthand navigation. From the Clients
view, create a new client and click Java
to get the connection information customized to your cluster.
Create new credentials for your Kafka cluster and Schema Registry, writing in appropriate descriptions so that the keys are easy to find and delete later. The Confluent Cloud Console will show a configuration similar to below with your new credentials automatically populated (make sure Show API keys
is checked).
Copy and paste it into a configuration/ccloud.properties
file on your machine.
# Required connection configs for Kafka producer, consumer, and admin
bootstrap.servers={{ BOOTSTRAP_SERVERS }}
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='{{ CLUSTER_API_KEY }}' password='{{ CLUSTER_API_SECRET }}';
sasl.mechanism=PLAIN
# Required for correctness in Apache Kafka clients prior to 2.6
client.dns.lookup=use_all_dns_ips
# Best practice for Kafka producer to prevent data loss
acks=all
# Required connection configs for Confluent Cloud Schema Registry
schema.registry.url={{ SR_URL }}
basic.auth.credentials.source=USER_INFO
basic.auth.user.info={{ SR_API_KEY }}:{{ SR_API_SECRET }}
Do not directly copy and paste the above configuration. You must copy it from the Confluent Cloud Console so that it includes your Confluent Cloud information and credentials. |
This tutorial has some steps for Kafka topic management and producing and consuming events, for which you can use the Confluent Cloud Console or the Confluent CLI. Follow the instructions here to install the Confluent CLI, and then follow these steps connect the CLI to your Confluent Cloud cluster.
In this step we’re going to create a topic for use during this tutorial. Use the following command to create the topic:
confluent kafka topic create parallel-consumer-input-topic
In order to build the project, first install Gradle 7.5 or later if you don’t already have it.
Create the following Gradle build file, named build.gradle
for the project. Note the parallel-consumer-core
dependency,
which is available in Maven Central. This artifact includes the Confluent Parallel Consumer’s core API.
There are also separate modules for using the Confluent Parallel Consumer with reactive API frameworks like Vert.x (parallel-consumer-vertx
)
and Reactor (parallel-consumer-reactor
). These modules are out of scope for this introductory tutorial.
buildscript {
repositories {
mavenCentral()
}
dependencies {
classpath "com.github.jengelman.gradle.plugins:shadow:6.1.0"
}
}
plugins {
id "java"
}
sourceCompatibility = JavaVersion.VERSION_17
targetCompatibility = JavaVersion.VERSION_17
version = "0.0.1"
repositories {
mavenCentral()
}
apply plugin: "com.github.johnrengelman.shadow"
dependencies {
implementation "io.confluent.parallelconsumer:parallel-consumer-core:0.5.2.4"
implementation "org.apache.commons:commons-lang3:3.12.0"
implementation "org.slf4j:slf4j-simple:2.0.0"
implementation "me.tongfei:progressbar:0.9.3"
implementation 'org.awaitility:awaitility:4.2.0'
testImplementation "junit:junit:4.13.2"
testImplementation 'org.awaitility:awaitility:4.2.0'
testImplementation "io.confluent.parallelconsumer:parallel-consumer-core:0.5.2.4:tests" // for LongPollingMockConsumer
}
test {
testLogging {
outputs.upToDateWhen { false }
showStandardStreams = true
exceptionFormat = "full"
}
}
jar {
manifest {
attributes(
"Class-Path": configurations.compileClasspath.collect { it.getName() }.join(" "),
)
}
}
shadowJar {
archiveBaseName = "confluent-parallel-consumer-application-standalone"
}
And be sure to run the following command to obtain the Gradle wrapper, which we will use to execute the build. The Gradle wrapper is a best practice ancillary build script that enables developers to more easily collaborate on Gradle projects by ensuring that developers all use the same correct Gradle version for the project (downloading Gradle at build time if necessary).
gradle wrapper
Then create a development configuration file at configuration/dev.properties
:
# Consumer properties
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
max.poll.interval.ms=300000
enable.auto.commit=false
auto.offset.reset=earliest
# Application-specific properties
input.topic.name=parallel-consumer-input-topic
file.path=topic-output.txt
Let’s do a quick overview of some of the more important properties here:
The key.deserializer
and value.deserializer
properties provide a class implementing the Deserializer
interface for converting byte
arrays into the expected object type of the key and value respectively.
The max.poll.interval.ms
is the maximum amount of time a consumer may take between calls to Consumer.poll()
. If a consumer instance takes longer than the specified time, it’s considered non-responsive and removed from the consumer-group triggering a rebalance.
Setting enable.auto.commit
configuration to false
is required because the Confluent Parallel Consumer handles committing offsets in order to achieve fault tolerance.
auto.offset.reset
- If a consumer instance can’t locate any offsets for its topic-partition assignment(s), it will resume processing from the earliest available offset.
Using the command below, append the contents of configuration/ccloud.properties
(with your Confluent Cloud configuration) to configuration/dev.properties
(with the application properties).
cat configuration/ccloud.properties >> configuration/dev.properties
Create a directory for the Java files in this project:
mkdir -p src/main/java/io/confluent/developer
To complete this introductory application, you’ll build a main application class and a couple of supporting classes.
First, you’ll create the main application,ParallelConsumerApplication
, which is the focal point of this tutorial; consuming records from a Kafka topic using the Confluent Parallel Consumer.
Go ahead and copy the following into a file src/main/java/io/confluent/developer/ParallelConsumerApplication.java
:
package io.confluent.developer;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelStreamProcessor;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.Properties;
import static io.confluent.parallelconsumer.ParallelConsumerOptions.CommitMode.PERIODIC_CONSUMER_SYNC;
import static io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder.KEY;
import static io.confluent.parallelconsumer.ParallelStreamProcessor.createEosStreamProcessor;
import static org.apache.commons.lang3.RandomUtils.nextInt;
/**
* Simple "hello world" Confluent Parallel Consumer application that simply consumes records from Kafka and writes the
* message values to a file.
*/
public class ParallelConsumerApplication {
private static final Logger LOGGER = LoggerFactory.getLogger(ParallelConsumerApplication.class.getName());
private final ParallelStreamProcessor<String, String> parallelConsumer;
private final ConsumerRecordHandler<String, String> recordHandler;
/**
* Application that runs a given Confluent Parallel Consumer, calling the given handler method per record.
*
* @param parallelConsumer the Confluent Parallel Consumer instance
* @param recordHandler record handler that implements method to run per record
*/
public ParallelConsumerApplication(final ParallelStreamProcessor<String, String> parallelConsumer,
final ConsumerRecordHandler<String, String> recordHandler) {
this.parallelConsumer = parallelConsumer;
this.recordHandler = recordHandler;
}
/**
* Close the parallel consumer on application shutdown
*/
public void shutdown() {
LOGGER.info("shutting down");
if (parallelConsumer != null) {
parallelConsumer.close();
}
}
/**
* Subscribes to the configured input topic and calls (blocking) `poll` method.
*
* @param appProperties application and consumer properties
*/
public void runConsume(final Properties appProperties) {
String topic = appProperties.getProperty("input.topic.name");
LOGGER.info("Subscribing Parallel Consumer to consume from {} topic", topic);
parallelConsumer.subscribe(Collections.singletonList(topic));
LOGGER.info("Polling for records. This method blocks", topic);
parallelConsumer.poll(context -> recordHandler.processRecord(context.getSingleConsumerRecord()));
}
public static void main(String[] args) throws Exception {
if (args.length < 1) {
throw new IllegalArgumentException(
"This program takes one argument: the path to an environment configuration file.");
}
final Properties appProperties = PropertiesUtil.loadProperties(args[0]);
// random consumer group ID for rerun convenience
String groupId = "parallel-consumer-app-group-" + nextInt();
appProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
// construct parallel consumer
final Consumer<String, String> consumer = new KafkaConsumer<>(appProperties);
final ParallelConsumerOptions options = ParallelConsumerOptions.<String, String>builder()
.ordering(KEY)
.maxConcurrency(16)
.consumer(consumer)
.commitMode(PERIODIC_CONSUMER_SYNC)
.build();
ParallelStreamProcessor<String, String> eosStreamProcessor = createEosStreamProcessor(options);
// create record handler that writes records to configured file
final String filePath = appProperties.getProperty("file.path");
final ConsumerRecordHandler<String, String> recordHandler = new FileWritingRecordHandler(Paths.get(filePath));
// run the consumer!
final ParallelConsumerApplication consumerApplication = new ParallelConsumerApplication(eosStreamProcessor, recordHandler);
Runtime.getRuntime().addShutdownHook(new Thread(consumerApplication::shutdown));
consumerApplication.runConsume(appProperties);
}
}
Let’s go over some of the key parts of the ParallelConsumerApplication
starting with the constructor:
public ParallelConsumerApplication(final ParallelStreamProcessor<String, String> parallelConsumer,
final ConsumerRecordHandler<String, String> recordHandler) {
this.parallelConsumer = parallelConsumer;
this.recordHandler = recordHandler;
}
Her we supply instances of the Confluent Parallel Consumer’s ParallelStreamProcessor
and the application’s ConsumerRecordHandler
via constructor parameters.
The abstract ConsumerRecordHandler
class makes it simple to change ConsumerRecord
handling without having to change much code.
In this tutorial you’ll inject the dependencies in the ParallelConsumerApplication.main()
method, but in practice you may want to use a dependency injection framework library, such as the Spring Framework.
Next, let’s review the ParallelConsumerApplication.runConsume()
method, which provides the core functionality of this tutorial.
public void runConsume(final Properties appProperties) {
String topic = appProperties.getProperty("input.topic.name");
LOGGER.info("Subscribing Parallel Consumer to consume from {} topic", topic);
parallelConsumer.subscribe(Collections.singletonList(topic)); (1)
LOGGER.info("Polling for records. This method blocks", topic);
parallelConsumer.poll(context -> recordHandler.processRecord(context.getSingleConsumerRecord())); (2)
}
1 | Subscribing to the Kafka topic. |
2 | Simply poll once. With the Confluent Parallel Consumer, you call poll only once and it will poll indefinitely,
calling the lambda that you supply for each message. The library handles everything for you subject to how you configure
the Parallel Consumer. |
Speaking of configuration, this snippet instantiates the ParallelStreamProcessor
that our application’s
constructor expects:
final Consumer<String, String> consumer = new KafkaConsumer<>(appProperties); (1)
final ParallelConsumerOptions options = ParallelConsumerOptions.<String, String>builder() (2)
.ordering(KEY) (3)
.maxConcurrency(16) (4)
.consumer(consumer) (5)
.commitMode(PERIODIC_CONSUMER_SYNC) (6)
.build();
ParallelStreamProcessor<String, String> eosStreamProcessor = createEosStreamProcessor(options); (7)
1 | Create the Apache Kafka Consumer that the Confluent Parallel Consumer wraps. |
2 | Create the Parallel Consumer configuration via builder pattern. |
3 | Specify consumption ordering by key. |
4 | Specify the degree of parallelism. Here we specify 16 threads for illustrative purposes only (the application only consumes 3 records). |
5 | The Apache Kafka Consumer that we are wrapping. |
6 | Here we specify how to commit offsets. PERIODIC_CONSUMER_SYNC will block the Parallel Consumer’s processing loop until a successful commit response is received. Asynchronous is also supported, which optimizes for
consumption throughput (the downside being higher risk of needing to process duplicate messages in error recovery scenarios). |
7 | Create a ParallelStreamProcessor with the previously created configuration. This is the object we use to consume in lieu of a KafkaConsumer . |
Note that, by ordering by key, we can consume with a much higher degree of parallelism than we can with a vanilla consumer group (i.e., the number of topic partitions). While a given input topic may not have many partitions, it may have a large number of unique keys. Each of these key → message sets can actually be processed concurrently. In other words, regardless of the number of input partitions, the effective concurrency limit achievable with the Confluent Parallel Consumer is the number of unique keys across all messages in a topic.
To complete this tutorial, you’ll need to also create an abstract class that we will extend to process messages as we consume them. This
abstract class, ConsumerRecordHandler
, encapsulates tracking the number of records processed, which will be useful later on when we run
performance tests and want to terminate the test application after consuming an expected number of records.
First create the abstract class at src/main/java/io/confluent/developer/ConsumerRecordHandler.java
package io.confluent.developer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Abstract class for processing a record. This allows for easier experimentation with the application and perf tests
* in this package. It handles tracking the number of events so that apps can wait on consuming an expected number
* of events.
*/
public abstract class ConsumerRecordHandler<K, V> {
private final AtomicInteger numRecordsProcessed;
public ConsumerRecordHandler() {
numRecordsProcessed = new AtomicInteger(0);
}
protected abstract void processRecordImpl(ConsumerRecord<K, V> consumerRecord);
final void processRecord(final ConsumerRecord<K, V> consumerRecord) {
processRecordImpl(consumerRecord);
numRecordsProcessed.incrementAndGet();
}
final int getNumRecordsProcessed() {
return numRecordsProcessed.get();
}
}
Using this abstract class will make it easier to change how you want to work with a ConsumerRecord
without having to modify all of your existing code.
Next you’ll extend the ConsumerRecordHandler
abstract class with a concrete class named FileWritingRecordHandler
. Copy the following into file src/main/java/io/confluent/developer/FileWritingRecordHandler.java
:
package io.confluent.developer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import static java.nio.file.StandardOpenOption.APPEND;
import static java.nio.file.StandardOpenOption.CREATE;
import static java.nio.file.StandardOpenOption.WRITE;
import static java.util.Collections.singletonList;
/**
* Record handler that writes the Kafka message value to a file.
*/
public class FileWritingRecordHandler extends ConsumerRecordHandler<String, String> {
private final Path path;
public FileWritingRecordHandler(final Path path) {
this.path = path;
}
@Override
protected void processRecordImpl(final ConsumerRecord<String, String> consumerRecord) {
try {
Files.write(path, singletonList(consumerRecord.value()), CREATE, WRITE, APPEND);
} catch (IOException e) {
throw new RuntimeException("unable to write record to file", e);
}
}
}
Let’s take a peek under the hood at this class’s processRecordImpl
method, which gets calls for each record consumed:
@Override
protected void processRecordImpl(final ConsumerRecord<String, String> consumerRecord) {
try {
Files.write(path, singletonList(consumerRecord.value()), CREATE, WRITE, APPEND); (1)
} catch (IOException e) {
throw new RuntimeException("unable to write record to file", e);
}
}
1 | Simply write the record value to a file. |
In practice you’re certain to perform a more realistic task for each record.
Finally, create a utility class PropertiesUtil
that we use in our consumer application to load Kafka Consumer and
application-specific properties. We’ll also use this class in the two performance testing applications that we will create
later in this tutorial.
Go ahead and create the src/main/java/io/confluent/developer/PropertiesUtil.java
file:
package io.confluent.developer;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Properties;
/**
* Utility class for loading Properties from a file.
*/
public class PropertiesUtil {
public static Properties loadProperties(String fileName) throws IOException {
try (FileInputStream input = new FileInputStream(fileName)) {
final Properties props = new Properties();
props.load(input);
return props;
}
}
}
In your terminal, run:
./gradlew shadowJar
Now that you have an uberjar for the ParallelConsumerApplication
, you can launch it locally. When you run the following, the prompt won’t return, because the application will run until you exit it. There is always another message to process, so streaming applications don’t exit until you force them.
java -cp build/libs/confluent-parallel-consumer-application-standalone-0.0.1-all.jar io.confluent.developer.ParallelConsumerApplication configuration/dev.properties
Using a terminal window, run the following command to start a Confluent CLI producer:
confluent kafka topic produce parallel-consumer-input-topic --parse-key
Each line represents input data for the Confluent Parallel Consumer application. To send all of the events below, paste the following into the prompt and press enter:
fun-line:All streams lead to Kafka
event-promo:Go to Current
event-promo:Go to Kafka Summit
fun-line:Consume gently down the stream
Enter Ctrl-C
to exit.
Your Confluent Parallel Consumer application should have consumed all the records sent and written them out to a file.
In a new terminal, run this command to print the results to the console:
cat topic-output.txt
You should see something like this:
All streams lead to Kafka
Go to Current
Go to Kafka Summit
Consume gently down the stream
Note that because we configured the Confluent Parallel Consumer to use KEY
ordering, Go to Current
appears before Go to Kafka Summit
because these values have the same event-promo
key. Similarly, All streams lead to Kafka
appears before Consume gently down the stream
because these values have the same fun-line
key.
At this point you can stop the Confluent Parallel Consumer application with Ctrl-C
in the terminal window where it’s running.
First, create a test file at configuration/test.properties
:
input.topic.name=parallel-consumer-input-topic
Create a directory for the tests to live in:
mkdir -p src/test/java/io/confluent/developer
Testing a Confluent Parallel Consumer application is not too complicated thanks to the LongPollingMockConsumer that is based on Apache Kafka’s MockConsumer. Since the Confluent Parallel Consumer’s codebase is well tested, we don’t need to use a live consumer and Kafka broker to test our application. We can simply use a mock consumer to process some data you’ll feed into it.
There is only one method in KafkaConsumerApplicationTest
annotated with @Test
, and that is consumerTest()
. This method actually runs your ParallelConsumerApplication
with the mock consumer.
Now create the following file at src/test/java/io/confluent/developer/ParallelConsumerApplicationTest.java
:
package io.confluent.developer;
import io.confluent.csid.utils.LongPollingMockConsumer;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelStreamProcessor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.awaitility.Awaitility;
import org.junit.Test;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Collections;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
import static io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder.KEY;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.junit.Assert.assertEquals;
/**
* Tests for the ParallelConsumerApplication.
*/
public class ParallelConsumerApplicationTest {
private static final String TEST_CONFIG_FILE = "configuration/test.properties";
/**
* Test the app end to end with a few records consumable via a mock consumer. The app
* just writes records to a file, so we validate that the expected (mocked) events wind
* up written to a file.
*
* @throws Exception if unable to read properties or create the temp output file
*/
@Test
public void consumerTest() throws Exception {
final Path tempFilePath = Files.createTempFile("test-consumer-output", ".out");
final ConsumerRecordHandler<String, String> recordHandler = new FileWritingRecordHandler(tempFilePath);
final Properties appProperties = PropertiesUtil.loadProperties(TEST_CONFIG_FILE);
final String topic = appProperties.getProperty("input.topic.name");
final LongPollingMockConsumer<String, String> mockConsumer = new LongPollingMockConsumer<>(OffsetResetStrategy.EARLIEST);
final ParallelStreamProcessor<String, String> eosStreamProcessor = setupParallelConsumer(mockConsumer, topic);
final ParallelConsumerApplication consumerApplication = new ParallelConsumerApplication(eosStreamProcessor, recordHandler);
Runtime.getRuntime().addShutdownHook(new Thread(consumerApplication::shutdown));
mockConsumer.subscribeWithRebalanceAndAssignment(Collections.singletonList(topic), 1);
mockConsumer.addRecord(new ConsumerRecord<>(topic, 0, 0, null, "foo"));
mockConsumer.addRecord(new ConsumerRecord<>(topic, 0, 1, null, "bar"));
mockConsumer.addRecord(new ConsumerRecord<>(topic, 0, 2, null, "baz"));
consumerApplication.runConsume(appProperties);
final Set<String> expectedRecords = new HashSet<>();
expectedRecords.add("foo");
expectedRecords.add("bar");
expectedRecords.add("baz");
final Set<String> actualRecords = new HashSet<>();
Awaitility.await()
.atMost(5, SECONDS)
.pollInterval(Duration.ofSeconds(1))
.until(() -> {
actualRecords.clear();
actualRecords.addAll(Files.readAllLines(tempFilePath));
return actualRecords.size() == 3;
});
assertEquals(actualRecords, expectedRecords);
}
private ParallelStreamProcessor<String, String> setupParallelConsumer(MockConsumer<String, String> mockConsumer, final String topic) {
ParallelConsumerOptions options = ParallelConsumerOptions.<String, String>builder()
.ordering(KEY) // <2>
.maxConcurrency(1000) // <3>
.consumer(mockConsumer)
.build();
ParallelStreamProcessor<String, String> parallelConsumer = ParallelStreamProcessor.createEosStreamProcessor(options);
parallelConsumer.subscribe(Collections.singletonList(topic));
return parallelConsumer;
}
}
Now let’s build a test for the ConsumerRecordHandler
implementation used in your application. Even though we have a test for the ParallelConsumerApplication
, it’s
important that you can test this helper class in isolation.
Create the following file at src/test/java/io/confluent/developer/FileWritingRecordHandlerTest.java
:
package io.confluent.developer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.Test;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.List;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
public class FileWritingRecordHandlerTest {
@Test
public void testProcess() throws IOException {
final Path tempFilePath = Files.createTempFile("test-handler", ".out");
try {
final ConsumerRecordHandler<String, String> recordHandler = new FileWritingRecordHandler(tempFilePath);
ConsumerRecord record = new ConsumerRecord<>("test", 0, 0, null, "my record");
recordHandler.processRecord(record);
final List<String> expectedWords = Arrays.asList("my record");
List<String> actualRecords = Files.readAllLines(tempFilePath);
assertThat(actualRecords, equalTo(expectedWords));
} finally {
Files.deleteIfExists(tempFilePath);
}
}
}
Now run the test, which is as simple as:
./gradlew test
Use the following command to create a topic that we’ll use for performance testing:
confluent kafka topic create perftest-parallel-consumer-input-topic
Using a terminal window, run the following command to write 10,000 small dummy records to the input topic:
seq 1 10000 | confluent kafka topic produce perftest-parallel-consumer-input-topic
Let’s kick off this command and let it run. It’ll take a few minutes to produce all 10,000 records. In the meantime, let’s continue with the tutorial.
Then create two performance test configuration files. The first is for performance testing a multi-threaded KafkaConsumer
-based
performance test that we’ll use to set a baseline. Create this file at configuration/perftest-kafka-consumer.properties
:
# KafkaConsumer properties
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
max.poll.interval.ms=300000
enable.auto.commit=false
auto.offset.reset=earliest
# increase max.poll.records from default of 500 to ensure we can download many records
max.poll.records=10000
# large fetch.min.bytes to optimize for throughput
fetch.min.bytes=100000
# Application-specific properties
input.topic=perftest-parallel-consumer-input-topic
records.to.consume=10000
record.handler.sleep.ms=20
Then create this file at configuration/perftest-parallel-consumer.properties
:
# KafkaConsumer properties
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
max.poll.interval.ms=300000
enable.auto.commit=false
auto.offset.reset=earliest
# increase max.poll.records from default of 500 to ensure we can download many records
max.poll.records=10000
# large fetch.min.bytes to optimize for throughput
fetch.min.bytes=100000
# Confluent Parallel Consumer properties
parallel.consumer.max.concurrency=256
parallel.consumer.order=UNORDERED
parallel.consumer.commit.mode=PERIODIC_CONSUMER_ASYNCHRONOUS
parallel.consumer.seconds.between.commits=60
# Application-specific properties
input.topic=perftest-parallel-consumer-input-topic
records.to.consume=10000
record.handler.sleep.ms=20
Let’s look at some of the more important properties in these configuration files:
We specify fetch.min.bytes
to be 100000 in order to optimize for consumer throughput
The application-specific property records.to.consume
is set to 10000
to match the number of records that we produced in the previous step. This will cause the application to terminate upon consuming this many records.
The application-specific property record.handler.sleep.ms
is used to simulate a nontrivial amount of work to perform per record. In this case, we sleep for 20ms to simulate a low-but-nontrivial latency operation like a call to a database or REST API.
In the configuration file for the Confluent Parallel Consumer performance test, there are a few Confluent Parallel Consumer-specific properties.
parallel.consumer.max.concurrency
is set to 256
, much higher than the number of partitions in our topic
We use UNORDERED
ordering, PERIODIC_CONSUMER_ASYNCHRONOUS
offset commit mode, and a high parallel.consumer.seconds.between.commits
value of 60 seconds.
Together, these values optimize for throughput. This keeps our test analogous to the KafkaConsumer
-based baseline. You may have noticed that,
because we are aiming to maximize throughput in these performance tests while ignoring the overhead of offsets handling, the baseline doesn’t even commit offsets!
Using the command below, append the contents of configuration/ccloud.properties
(with your Confluent Cloud configuration)
to configuration/perftest-kafka-consumer.properties
and configuration/perftest-parallel-consumer.properties
:
cat configuration/ccloud.properties >> configuration/perftest-kafka-consumer.properties
cat configuration/ccloud.properties >> configuration/perftest-parallel-consumer.properties
Here you’ll build a performance test application and supporting classes that implement
multi-threaded consuming (one KafkaConsumer
per-partition to maximize parallelism).
First, you’ll create the main performance test application, src/main/java/io/confluent/developer/MultithreadedKafkaConsumerPerfTest.java
:
package io.confluent.developer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.io.IOException;
import java.util.Properties;
import static org.apache.commons.lang3.RandomUtils.nextInt;
/**
* Consumption throughput test that runs a KafkaConsumer per thread, synchronously sleeping `record.handler.sleep.ms`
* per event. This simulates the performance characteristics of applications that do something of nontrivial latency
* per event, e.g., call out to a DB or REST API per event. It is the KafkaConsumer analogue to
* ParallelConsumerPerfTest, which is based on the Confluent Parallel Consumer.
*/
public class MultithreadedKafkaConsumerPerfTest {
public static void main(String[] args) {
if (args.length < 1) {
throw new IllegalArgumentException(
"This program takes one argument: the path to an environment configuration file.");
}
// load app and consumer specific properties from command line arg
final Properties appProperties;
try {
appProperties = PropertiesUtil.loadProperties(args[0]);
} catch (IOException e) {
throw new RuntimeException("Unable to load application properties", e);
}
// random group ID for rerun convenience
String groupId = "mt-kafka-consumer-perf-test-group-" + nextInt();
appProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
// create handler that sleeps configured number of ms per record
final int recordHandlerSleepMs = Integer.parseInt(appProperties.getProperty("record.handler.sleep.ms"));
final SleepingRecordHandler recordHandler = new SleepingRecordHandler(recordHandlerSleepMs);
// create and run MultithreadedKafkaConsumer, which runs an ExecutorService with a consumer thread per partition
// and terminates when the total expected number of records have been consumed across the threads
try (MultithreadedKafkaConsumer mtConsumer = new MultithreadedKafkaConsumer(appProperties, recordHandler)) {
mtConsumer.runConsume();
}
}
}
Second, create the class that implements multi-threaded consuming, src/main/java/io/confluent/developer/MultithreadedKafkaConsumer.java
:
package io.confluent.developer;
import me.tongfei.progressbar.ProgressBar;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.text.DecimalFormat;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
/**
* This class consumes a specified number of records from a Kafka topic with a consumer thread per explicitly
* assigned partition.
*/
public class MultithreadedKafkaConsumer implements Closeable {
private static final Logger LOGGER = LoggerFactory.getLogger(MultithreadedKafkaConsumer.class.getName());
private final Properties appProperties;
private final SleepingRecordHandler recordHandler;
private final AtomicInteger recordsConsumed; // total records consumed across all threads
private ExecutorService executor;
/**
* Multithreaded Kafka consumer that runs a KafkaConsumer per partition.
*
* @param appProperties application and consumer properties
* @param recordHandler records handler to run per record
*/
public MultithreadedKafkaConsumer(Properties appProperties, SleepingRecordHandler recordHandler) {
this.appProperties = appProperties;
this.recordHandler = recordHandler;
this.recordsConsumed = new AtomicInteger(0);
}
/**
* Main consumption method that instantiates consumers per partition, runs the consumers in a thread pool, and outputs
* progress to the user as it consumes.
*/
public void runConsume() {
int recordsToConsume = Integer.parseInt(appProperties.getProperty("records.to.consume"));
// instantiate all consumers before marking start time to keep init time out of the reported duration
List<KafkaConsumer> consumers = getConsumersPerPartition(appProperties);
// kick off consumers to process all records, updating a progress bar as we go
ProgressBar progressBar = new ProgressBar("Progress", recordsToConsume);
// create thread pool and shutdown hook to clean up in case of exception
executor = Executors.newFixedThreadPool(consumers.size());
Thread shutdownExecutor = new Thread(() -> this.close());
Runtime.getRuntime().addShutdownHook(shutdownExecutor);
// mark the consumption start time and kick off all consumer threads
long startTimeMs = System.currentTimeMillis();
List<Future> futures = new ArrayList<>();
for (KafkaConsumer consumer: consumers) {
futures.add(executor.submit(new ConsumerTask(consumer, recordsToConsume, recordsConsumed, progressBar)));
}
// wait for all tasks to complete, which happens when all records have been consumed
LOGGER.info("Waiting for consumers to consume all {} records", recordsToConsume);
for (Future future : futures) {
try {
future.get();
} catch (InterruptedException|ExecutionException e) {
throw new RuntimeException(e);
}
}
// done! report total time to consume all records
double durationSeconds = (System.currentTimeMillis() - startTimeMs) / 1_000.0;
DecimalFormat df = new DecimalFormat("0.00");
LOGGER.info("Total time to consume {} records: {} seconds", recordsToConsume, df.format(durationSeconds));
}
/**
* Create KafkaConsumer instances per partition, and explicitly assign a partition to each.
*
* @param appProperties consumer and application properties
* @return list of consumers
*/
private List<KafkaConsumer> getConsumersPerPartition(Properties appProperties) {
String topic = appProperties.getProperty("input.topic");
// use temp consumer to inspect the number of partitions
int numPartitions;
try (KafkaConsumer tempConsumer = new KafkaConsumer<>(appProperties)) {
numPartitions = tempConsumer.partitionsFor(topic).size();
LOGGER.info("{} partitions detected for {} topic", numPartitions, topic);
}
List<KafkaConsumer> consumers = new ArrayList<>();
for (int i = 0; i < numPartitions; i++) {
KafkaConsumer consumer = new KafkaConsumer<String, String>(appProperties);
TopicPartition tp = new TopicPartition(topic, i);
consumer.assign(Collections.singletonList(tp));
consumers.add(consumer);
}
return consumers;
}
/**
* Close the multithreaded consumer, which entails shutting down the executor thread pool.
*/
public void close() {
if (executor != null) {
executor.shutdown();
}
}
/**
* Inner class that runs each consumer.
*/
class ConsumerTask implements Runnable {
private final KafkaConsumer consumer;
private final int recordsToConsume; // records expected to be consumed across all threads
private final AtomicInteger recordsConsumed; // records consumed across all threads
private final ProgressBar progressBar;
/**
* Runnable task to consume a partition.
*
* @param consumer the consumer already assigned a specific partition
* @param recordsToConsume total number of records to consume across all threads
* @param recordsConsumed running tally of records consumed across all threads
* @param progressBar progress bar that we update to recordsConsumed / recordsToConsume during consumption
*/
public ConsumerTask(KafkaConsumer consumer, int recordsToConsume, AtomicInteger recordsConsumed, ProgressBar progressBar) {
this.consumer = consumer;
this.recordsToConsume = recordsToConsume;
this.recordsConsumed = recordsConsumed;
this.progressBar = progressBar;
}
/**
* Each task polls until the total number of records consumed across all threads is what we expect. Simply calls
* the record handler for each record.
*/
@Override
public void run() {
int numConsumed;
do {
// Use a poll timeout high enough to not saturate CPU, but fine enough to get interesting comparison numbers.
// Since the perf tests typically take many seconds to run, use 0.5 second poll timeout to strike this balance.
ConsumerRecords<String, String> records = consumer.poll(Duration.of(500, ChronoUnit.MILLIS));
for (ConsumerRecord record : records) {
recordHandler.processRecord(record);
numConsumed = recordsConsumed.incrementAndGet();
progressBar.stepTo(numConsumed);
}
} while (recordsConsumed.get() < recordsToConsume); // tasks block until all records consumed
}
}
}
Finally, create the record handler that sleeps 20ms per record consumed, src/main/java/io/confluent/developer/SleepingRecordHandler.java
:
package io.confluent.developer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
/**
* Record handler that sleeps a given number of milliseconds.
*/
public class SleepingRecordHandler extends ConsumerRecordHandler<String, String> {
private final int numMilliseconds;
public SleepingRecordHandler(final int numMilliseconds) {
this.numMilliseconds = numMilliseconds;
}
@Override
protected void processRecordImpl(final ConsumerRecord consumerRecord) {
try {
Thread.sleep(numMilliseconds);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
Let’s rebuild the uberjar to include this performance test. In your terminal, run:
./gradlew shadowJar
Now that you have an uberjar containing MultithreadedKafkaConsumerPerfTest
, you can launch it locally.
This will run until the expected 10,000 records have been consumed. Ensure that the seq
command that you ran previously to
produce 10,000 records has completed before running this so that we can accurately test consumption throughput.
java -cp build/libs/confluent-parallel-consumer-application-standalone-0.0.1-all.jar io.confluent.developer.MultithreadedKafkaConsumerPerfTest configuration/perftest-kafka-consumer.properties
While the performance test runs, take a few sips of the beverage that you previously poured. It will take a minute or two to complete, and the final line output will show you the latency for consuming all 10,000 records, e.g.:
[main] INFO io.confluent.developer.MultithreadedKafkaConsumer - Total time to consume 10000 records: 40.46 seconds
Before we build and run a Confluent Parallel Consumer analogue to this KafkaConsumer
baseline, let’s summarize what we’ve seen so far:
We populated a topic with default properties and produced 10,000 small records to it
We maxed out the size of our consumer group by running a KafkaConsumer
per partition, with each instance explicitly assigned to one partition
We optimized each KafkaConsumer
for throughput by setting high values for max.poll.records
and fetch.min.bytes
We struck a balance between latency accuracy and instrumentation overhead needed to track progress and
end when expected by using a 0.5 second poll
timeout. (We want to report consumption latency shortly after consumption finishes,
but we also want to minimize busy waiting of the KafkaConsumer
instances that finish first.)
We scratched our head writing some tricky multi-threaded code. By the way, is any multi-threaded code not tricky?
The reported performance test latency was 40.46 seconds in our case (your number is surely different).
Here you’ll build a performance test application based on the Confluent Parallel Consumer. This test reuses a couple of classes
that we created previously: PropertiesUtil
for loading consumer and application-specific properties, and SleepingRecordHandler
for simulating a nontrivial workload per-record just as we did in MultithreadedKafkaConsumerPerfTest
. Please rewind
and create these if you skipped the parts of the tutorial that create these two classes.
Because the Confluent Parallel Consumer API is much lighter weight than the lift required to multi-thread KafkaConsumer
instances
per partition, let’s knock out the entire thing in one class. Create the file src/main/java/io/confluent/developer/ParallelConsumerPerfTest.java
:
package io.confluent.developer;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.ParallelStreamProcessor;
import me.tongfei.progressbar.ProgressBar;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.text.DecimalFormat;
import java.util.Collections;
import java.util.Properties;
import static java.time.Duration.ofSeconds;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.commons.lang3.RandomUtils.nextInt;
import static org.awaitility.Awaitility.await;
/**
* Consumption throughput test that runs the Confluent Parallel Consumer, synchronously sleeping
* `record.handler.sleep.ms` per event. This simulates the performance characteristics of applications that do
* something of nontrivial latency per event, e.g., call out to a DB or REST API per event. It is the Confluent
* Parallel Consumer analogue to MultithreadedKafkaConsumerPerfTest, which is based on KafkaConsumer.
*/
public class ParallelConsumerPerfTest {
private static final Logger LOGGER = LoggerFactory.getLogger(ParallelConsumerPerfTest.class.getName());
private final ParallelStreamProcessor<String, String> parallelConsumer;
private final SleepingRecordHandler recordHandler;
/**
* Throughput performance test for the Confluent Parallel Consumer.
*
* @param parallelConsumer the parallel consumer
* @param recordHandler a handler to call per record
*/
public ParallelConsumerPerfTest(final ParallelStreamProcessor<String, String> parallelConsumer,
final SleepingRecordHandler recordHandler) {
this.parallelConsumer = parallelConsumer;
this.recordHandler = recordHandler;
}
/**
* Close the parallel consumer on application shutdown
*/
public void shutdown() {
LOGGER.info("shutting down");
if (parallelConsumer != null) {
parallelConsumer.close();
}
}
/**
* Subscribes to the configured input topic and calls (blocking) `poll` method.
*
* @param appProperties application and consumer properties
*/
private void runConsume(final Properties appProperties) {
parallelConsumer.subscribe(Collections.singletonList(appProperties.getProperty("input.topic")));
parallelConsumer.poll(context -> {
recordHandler.processRecord(context.getSingleConsumerRecord());
});
}
public static void main(String[] args) {
if (args.length < 1) {
throw new IllegalArgumentException(
"This program takes one argument: the path to an environment configuration file.");
}
// load app and consumer specific properties from command line arg
final Properties appProperties;
try {
appProperties = PropertiesUtil.loadProperties(args[0]);
} catch (IOException e) {
throw new RuntimeException("Unable to load application properties", e);
}
// random consumer group ID for rerun convenience
String groupId = "parallel-consumer-perf-test-group-" + nextInt();
appProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
// create parallel consumer configured based on properties from app configuration
final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(appProperties);
final int maxConcurrency = Integer.parseInt(appProperties.getProperty("parallel.consumer.max.concurrency"));
final ParallelConsumerOptions.ProcessingOrder processingOrder = ParallelConsumerOptions.ProcessingOrder.
valueOf(appProperties.getProperty("parallel.consumer.order"));
final ParallelConsumerOptions.CommitMode commitMode = ParallelConsumerOptions.CommitMode.
valueOf(appProperties.getProperty("parallel.consumer.commit.mode"));
final int secondsBetweenCommits = Integer.parseInt(appProperties.getProperty("parallel.consumer.seconds.between.commits"));
final ParallelEoSStreamProcessor parallelConsumer = new ParallelEoSStreamProcessor<>(ParallelConsumerOptions.<String, String>builder()
.ordering(processingOrder)
.maxConcurrency(maxConcurrency)
.consumer(consumer)
.commitMode(commitMode)
.build());
parallelConsumer.setTimeBetweenCommits(ofSeconds(secondsBetweenCommits));
// create handler that sleeps configured number of ms per record
final int recordHandlerSleepMs = Integer.parseInt(appProperties.getProperty("record.handler.sleep.ms"));
final SleepingRecordHandler recordHandler = new SleepingRecordHandler(recordHandlerSleepMs);
// kick off the consumer, marking the start time so that we can report total runtime to consume all records
final ParallelConsumerPerfTest consumerApplication = new ParallelConsumerPerfTest(parallelConsumer, recordHandler);
Runtime.getRuntime().addShutdownHook(new Thread(consumerApplication::shutdown));
long startTimeMs = System.currentTimeMillis();
consumerApplication.runConsume(appProperties);
// wait for the consumer to process all records, updating the progress bar as we go
int recordsToConsume = Integer.parseInt(appProperties.getProperty("records.to.consume"));
ProgressBar progressBar = new ProgressBar("Progress", recordsToConsume);
await()
.forever()
.pollInterval(250, MILLISECONDS)
.until(() -> {
int numRecordsProcessed = recordHandler.getNumRecordsProcessed();
progressBar.stepTo(numRecordsProcessed);
return numRecordsProcessed >= recordsToConsume;
});
// done! report total time to consume all records
progressBar.close();
double durationSeconds = (System.currentTimeMillis() - startTimeMs) / 1_000.0;
DecimalFormat df = new DecimalFormat("0.00");
LOGGER.info("Time to consume {} records: {} seconds", recordsToConsume, df.format(durationSeconds));
consumerApplication.shutdown();
}
}
Take a look at the code and note the simplicity. Most of the code is for properties file handling and tracking progress. The interesting part relevant to the Confluent Parallel Consumer
is in the four-line runConsume()
method:
private void runConsume(final Properties appProperties) {
parallelConsumer.subscribe(Collections.singletonList(appProperties.getProperty("input.topic.name")));
parallelConsumer.poll(context -> {
recordHandler.processRecord(context.getSingleConsumerRecord());
});
}
Bellisimo!
Let’s rebuild the uberjar to include this performance test. In your terminal, run:
./gradlew shadowJar
Now that you have an uberjar containing ParallelConsumerPerfTest
, you can launch it locally.
This will run until the expected 10,000 records have been consumed. Ensure that the seq
command that you ran previously to
produce 10,000 records has completed before running this so that we can accurately test consumption throughput.
As you kick this off, bear in mind the latency that you recorded when you ran MultithreadedKafkaConsumerPerfTest
(40.46 seconds in the run performed for the tutorial).
java -cp build/libs/confluent-parallel-consumer-application-standalone-0.0.1-all.jar io.confluent.developer.ParallelConsumerPerfTest configuration/perftest-parallel-consumer.properties
While the performance test runs, take a few sips of the beverage… actually never mind. It’s done:
[main] INFO io.confluent.developer.ParallelConsumerPerfTest - Time to consume 10000 records: 1.78 seconds
Your latency will surely be different from the 1.78 seconds
shown here. But, assuming you are running the test on reasonable hardware and you aren’t running any
extremely noisy neighbors on your machine, it should be just a few seconds.
In this section of the tutorial, we created a performance test for the Confluent Parallel Consumer, and a KafkaConsumer
baseline to which to compare.
This gave us a couple of data points, but only for one specific test context: each test aimed to consume records as quickly as possible in a single JVM while simulating a 20ms workload per-record.
We can turn a few knobs and pull some levers to gather more performance test results in other application contexts. Since we used helper classes and parameterized configuration in this tutorial, you can easily choose other performance test adventures. Some questions you might explore:
How does performance compare if we increase or decrease the simulated workload time?
What if we commit offsets more frequently or even synchronously or transactionally in each test?
In the case of the Confluent Parallel Consumer, this entails setting parallel.consumer.seconds.between.commits
to a value lower than 60 seconds,
and using a parallel.consumer.commit.mode
of PERIODIC_CONSUMER_SYNC
or PERIODIC_TRANSACTIONAL_PRODUCER
.
These commit modes better simulate an application designed to more easily pick up where it left off when recovering from an error.
What if we change the properties of the KafkaConsumer
instance(s) most relevant to throughput (fetch.min.bytes
and max.poll.records
)?
What if we use KEY
or PARTITION
ordering when configuring the Confluent Parallel Consumer (as opposed to UNORDERED
)?
How does the throughput comparison change if we create perftest-parallel-consumer-input-topic
with more (or fewer) partitions?
What if we use larger, more realistic records and not just integers from 1 to 10,000? What if we also play with different key spaces?
Have fun with it!
You may try another tutorial, but if you don’t plan on doing other tutorials, use the Confluent Cloud Console or CLI to destroy all of the resources you created. Verify they are destroyed to avoid unexpected charges.