How to use the Confluent Parallel Consumer

Question:

How can I consume Kafka topics with a higher degree of parallelism than the partition count?

Edit this page

Example use case:

The Confluent Parallel Consumer is an open source Apache 2.0-licensed Java library that enables you to consume from a Kafka topic with a higher degree of parallelism than the number of partitions for the input data (the effective parallelism limit achievable via an Apache Kafka consumer group). This is desirable in many situations, e.g., when partition counts are fixed for a reason beyond your control, or if you need to make a high-latency call out to a database or microservice while consuming and want to increase throughput.

In this tutorial, you'll first build a small "hello world" application that uses the Confluent Parallel Consumer library to read a handful of records from Kafka. Then you'll write and execute performance tests at a larger scale in order to compare the Confluent Parallel Consumer with a baseline built using a vanilla Apache Kafka consumer group.

Prepare to meet the Confluent Parallel Consumer!

Hands-on code example:

Run it

Prerequisites

1

This tutorial installs Confluent Platform using Docker. Before proceeding:

  • • Install Docker Desktop (version 4.0.0 or later) or Docker Engine (version 19.03.0 or later) if you don’t already have it

  • • Install the Docker Compose plugin if you don’t already have it. This isn’t necessary if you have Docker Desktop since it includes Docker Compose.

  • • Start Docker if it’s not already running, either by starting Docker Desktop or, if you manage Docker Engine with systemd, via systemctl

  • • Verify that Docker is set up properly by ensuring no errors are output when you run docker info and docker compose version on the command line

Initialize the project

2

Make a local directory anywhere you’d like for this project:

mkdir confluent-parallel-consumer-application && cd confluent-parallel-consumer-application

Get Confluent Platform

3

Next, create the following docker-compose.yml file to obtain Confluent Platform (for Kafka in the cloud, see Confluent Cloud):

---
version: '2'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.3.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-kafka:7.3.0
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "29092:29092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_TOOLS_LOG4J_LOGLEVEL: ERROR

And launch it by running:

docker compose up -d

Create a topic

4

In this step we’re going to create a topic for use during this tutorial.

But first, you’re going to open a shell on the broker docker container.

Open a new terminal and window then run this command:

docker exec -it broker bash

Now use the following command to create the topic:

kafka-topics --create --topic parallel-consumer-input-topic --bootstrap-server broker:9092 --replication-factor 1 --partitions 1

Keep this terminal window open as you’ll need to run a console-producer in a few steps.

Configure the project

5

In order to build the project, first install Gradle 7.5 or later if you don’t already have it. Create the following Gradle build file, named build.gradle for the project. Note the parallel-consumer-core dependency, which is available in Maven Central. This artifact includes the Confluent Parallel Consumer’s core API. There are also separate modules for using the Confluent Parallel Consumer with reactive API frameworks like Vert.x (parallel-consumer-vertx) and Reactor (parallel-consumer-reactor). These modules are out of scope for this introductory tutorial.

buildscript {
    repositories {
        mavenCentral()
    }
    dependencies {
        classpath "com.github.jengelman.gradle.plugins:shadow:6.1.0"
    }
}

plugins {
    id "java"
}

sourceCompatibility = "1.8"
targetCompatibility = "1.8"
version = "0.0.1"

repositories {
    mavenCentral()
}

apply plugin: "com.github.johnrengelman.shadow"

dependencies {
    implementation "io.confluent.parallelconsumer:parallel-consumer-core:0.5.2.2"
    implementation "org.apache.commons:commons-lang3:3.12.0"
    implementation "org.slf4j:slf4j-simple:2.0.0"
    implementation "me.tongfei:progressbar:0.9.3"
    implementation 'org.awaitility:awaitility:4.2.0'

    testImplementation "junit:junit:4.13.2"
    testImplementation 'org.awaitility:awaitility:4.2.0'
    testImplementation "io.confluent.parallelconsumer:parallel-consumer-core:0.5.2.2:tests" // for LongPollingMockConsumer
}

test {
    testLogging {
        outputs.upToDateWhen { false }
        showStandardStreams = true
        exceptionFormat = "full"
    }
}

jar {
  manifest {
    attributes(
      "Class-Path": configurations.compileClasspath.collect { it.getName() }.join(" "),
    )
  }
}

shadowJar {
    archiveBaseName = "confluent-parallel-consumer-application-standalone"
}

And be sure to run the following command to obtain the Gradle wrapper, which we will use to execute the build. The Gradle wrapper is a best practice ancillary build script that enables developers to more easily collaborate on Gradle projects by ensuring that developers all use the same correct Gradle version for the project (downloading Gradle at build time if necessary).

gradle wrapper

Next, create a directory for configuration data:

mkdir configuration

Then create a development configuration file at configuration/dev.properties:

# Consumer properties
bootstrap.servers=localhost:29092
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
max.poll.interval.ms=300000
enable.auto.commit=false
auto.offset.reset=earliest

# Application-specific properties
input.topic.name=parallel-consumer-input-topic
file.path=topic-output.txt

Let’s do a quick overview of some of the more important properties here:

The key.deserializer and value.deserializer properties provide a class implementing the Deserializer interface for converting byte arrays into the expected object type of the key and value respectively.

The max.poll.interval.ms is the maximum amount of time a consumer may take between calls to Consumer.poll(). If a consumer instance takes longer than the specified time, it’s considered non-responsive and removed from the consumer-group triggering a rebalance.

Setting enable.auto.commit configuration to false is required because the Confluent Parallel Consumer handles committing offsets in order to achieve fault tolerance.

auto.offset.reset - If a consumer instance can’t locate any offsets for its topic-partition assignment(s), it will resume processing from the earliest available offset.

Create the Confluent Parallel Consumer Application

6

Create a directory for the Java files in this project:

mkdir -p src/main/java/io/confluent/developer

To complete this introductory application, you’ll build a main application class and a couple of supporting classes.

First, you’ll create the main application,ParallelConsumerApplication, which is the focal point of this tutorial; consuming records from a Kafka topic using the Confluent Parallel Consumer.

Go ahead and copy the following into a file src/main/java/io/confluent/developer/ParallelConsumerApplication.java:

package io.confluent.developer;


import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelStreamProcessor;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.file.Paths;
import java.util.Collections;
import java.util.Properties;

import static io.confluent.parallelconsumer.ParallelConsumerOptions.CommitMode.PERIODIC_CONSUMER_SYNC;
import static io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder.KEY;
import static io.confluent.parallelconsumer.ParallelStreamProcessor.createEosStreamProcessor;
import static org.apache.commons.lang3.RandomUtils.nextInt;



/**
 * Simple "hello world" Confluent Parallel Consumer application that simply consumes records from Kafka and writes the
 * message values to a file.
 */
public class ParallelConsumerApplication {

  private static final Logger LOGGER = LoggerFactory.getLogger(ParallelConsumerApplication.class.getName());
  private final ParallelStreamProcessor<String, String> parallelConsumer;
  private final ConsumerRecordHandler<String, String> recordHandler;

  /**
   * Application that runs a given Confluent Parallel Consumer, calling the given handler method per record.
   *
   * @param parallelConsumer the Confluent Parallel Consumer instance
   * @param recordHandler    record handler that implements method to run per record
   */
  public ParallelConsumerApplication(final ParallelStreamProcessor<String, String> parallelConsumer,
                                     final ConsumerRecordHandler<String, String> recordHandler) {
    this.parallelConsumer = parallelConsumer;
    this.recordHandler = recordHandler;
  }

  /**
   * Close the parallel consumer on application shutdown
   */
  public void shutdown() {
    LOGGER.info("shutting down");
    if (parallelConsumer != null) {
      parallelConsumer.close();
    }
  }

  /**
   * Subscribes to the configured input topic and calls (blocking) `poll` method.
   *
   * @param appProperties application and consumer properties
   */
  public void runConsume(final Properties appProperties) {
    String topic = appProperties.getProperty("input.topic.name");

    LOGGER.info("Subscribing Parallel Consumer to consume from {} topic", topic);
    parallelConsumer.subscribe(Collections.singletonList(topic));

    LOGGER.info("Polling for records. This method blocks", topic);
    parallelConsumer.poll(context -> recordHandler.processRecord(context.getSingleConsumerRecord()));
  }

  public static void main(String[] args) throws Exception {
    if (args.length < 1) {
      throw new IllegalArgumentException(
          "This program takes one argument: the path to an environment configuration file.");
    }

    final Properties appProperties = PropertiesUtil.loadProperties(args[0]);

    // random consumer group ID for rerun convenience
    String groupId = "parallel-consumer-app-group-" + nextInt();
    appProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

    // construct parallel consumer
    final Consumer<String, String> consumer = new KafkaConsumer<>(appProperties);
    final ParallelConsumerOptions options = ParallelConsumerOptions.<String, String>builder()
        .ordering(KEY)
        .maxConcurrency(16)
        .consumer(consumer)
        .commitMode(PERIODIC_CONSUMER_SYNC)
        .build();
    ParallelStreamProcessor<String, String> eosStreamProcessor = createEosStreamProcessor(options);

    // create record handler that writes records to configured file
    final String filePath = appProperties.getProperty("file.path");
    final ConsumerRecordHandler<String, String> recordHandler = new FileWritingRecordHandler(Paths.get(filePath));

    // run the consumer!
    final ParallelConsumerApplication consumerApplication = new ParallelConsumerApplication(eosStreamProcessor, recordHandler);
    Runtime.getRuntime().addShutdownHook(new Thread(consumerApplication::shutdown));
    consumerApplication.runConsume(appProperties);
  }

}

Let’s go over some of the key parts of the ParallelConsumerApplication starting with the constructor:

ParallelConsumerApplication constructor
  public ParallelConsumerApplication(final ParallelStreamProcessor<String, String> parallelConsumer,
                                     final ConsumerRecordHandler<String, String> recordHandler) {
    this.parallelConsumer = parallelConsumer;
    this.recordHandler = recordHandler;
  }

Her we supply instances of the Confluent Parallel Consumer’s ParallelStreamProcessor and the application’s ConsumerRecordHandler via constructor parameters.

The abstract ConsumerRecordHandler class makes it simple to change ConsumerRecord handling without having to change much code.

In this tutorial you’ll inject the dependencies in the ParallelConsumerApplication.main() method, but in practice you may want to use a dependency injection framework library, such as the Spring Framework.

Next, let’s review the ParallelConsumerApplication.runConsume() method, which provides the core functionality of this tutorial.

ParallelConsumerApplication.runConsume
  public void runConsume(final Properties appProperties) {
    String topic = appProperties.getProperty("input.topic.name");

    LOGGER.info("Subscribing Parallel Consumer to consume from {} topic", topic);
    parallelConsumer.subscribe(Collections.singletonList(topic)); (1)

    LOGGER.info("Polling for records. This method blocks", topic);
    parallelConsumer.poll(context -> recordHandler.processRecord(context.getSingleConsumerRecord())); (2)
  }
1 Subscribing to the Kafka topic.
2 Simply poll once. With the Confluent Parallel Consumer, you call poll only once and it will poll indefinitely, calling the lambda that you supply for each message. The library handles everything for you subject to how you configure the Parallel Consumer.

Speaking of configuration, this snippet instantiates the ParallelStreamProcessor that our application’s constructor expects:

    final Consumer<String, String> consumer = new KafkaConsumer<>(appProperties);  (1)
    final ParallelConsumerOptions options = ParallelConsumerOptions.<String, String>builder()  (2)
        .ordering(KEY)  (3)
        .maxConcurrency(16)  (4)
        .consumer(consumer)  (5)
        .commitMode(PERIODIC_CONSUMER_SYNC)  (6)
        .build();
    ParallelStreamProcessor<String, String> eosStreamProcessor = createEosStreamProcessor(options); (7)
1 Create the Apache Kafka Consumer that the Confluent Parallel Consumer wraps.
2 Create the Parallel Consumer configuration via builder pattern.
3 Specify consumption ordering by key.
4 Specify the degree of parallelism. Here we specify 16 threads for illustrative purposes only (the application only consumes 3 records).
5 The Apache Kafka Consumer that we are wrapping.
6 Here we specify how to commit offsets. PERIODIC_CONSUMER_SYNC will block the Parallel Consumer’s processing loop until a successful commit response is received. Asynchronous is also supported, which optimizes for consumption throughput (the downside being higher risk of needing to process duplicate messages in error recovery scenarios).
7 Create a ParallelStreamProcessor with the previously created configuration. This is the object we use to consume in lieu of a KafkaConsumer.

Note that, by ordering by key, we can consume with a much higher degree of parallelism than we can with a vanilla consumer group (i.e., the number of topic partitions). While a given input topic may not have many partitions, it may have a large number of unique keys. Each of these key → message sets can actually be processed concurrently. In other words, regardless of the number of input partitions, the effective concurrency limit achievable with the Confluent Parallel Consumer is the number of unique keys across all messages in a topic.

Create supporting classes

7

To complete this tutorial, you’ll need to also create an abstract class that we will extend to process messages as we consume them. This abstract class, ConsumerRecordHandler, encapsulates tracking the number of records processed, which will be useful later on when we run performance tests and want to terminate the test application after consuming an expected number of records.

First create the abstract class at src/main/java/io/confluent/developer/ConsumerRecordHandler.java

package io.confluent.developer;


import org.apache.kafka.clients.consumer.ConsumerRecord;

import java.util.concurrent.atomic.AtomicInteger;


/**
 * Abstract class for processing a record. This allows for easier experimentation with the application and perf tests
 * in this package. It handles tracking the number of events so that apps can wait on consuming an expected number
 * of events.
 */
public abstract class ConsumerRecordHandler<K, V> {

  private final AtomicInteger numRecordsProcessed;

  public ConsumerRecordHandler() {
    numRecordsProcessed = new AtomicInteger(0);
  }

  protected abstract void processRecordImpl(ConsumerRecord<K, V> consumerRecord);

  final void processRecord(final ConsumerRecord<K, V> consumerRecord) {
    processRecordImpl(consumerRecord);
    numRecordsProcessed.incrementAndGet();
  }

  final int getNumRecordsProcessed() {
    return numRecordsProcessed.get();
  }

}

Using this abstract class will make it easier to change how you want to work with a ConsumerRecord without having to modify all of your existing code.

Next you’ll extend the ConsumerRecordHandler abstract class with a concrete class named FileWritingRecordHandler. Copy the following into file src/main/java/io/confluent/developer/FileWritingRecordHandler.java:

package io.confluent.developer;


import org.apache.kafka.clients.consumer.ConsumerRecord;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;

import static java.nio.file.StandardOpenOption.APPEND;
import static java.nio.file.StandardOpenOption.CREATE;
import static java.nio.file.StandardOpenOption.WRITE;
import static java.util.Collections.singletonList;


/**
 * Record handler that writes the Kafka message value to a file.
 */
public class FileWritingRecordHandler extends ConsumerRecordHandler<String, String> {

  private final Path path;

  public FileWritingRecordHandler(final Path path) {
    this.path = path;
  }

  @Override
  protected void processRecordImpl(final ConsumerRecord<String, String> consumerRecord) {
    try {
      Files.write(path, singletonList(consumerRecord.value()), CREATE, WRITE, APPEND);
    } catch (IOException e) {
      throw new RuntimeException("unable to write record to file", e);
    }
  }

}

Let’s take a peek under the hood at this class’s processRecordImpl method, which gets calls for each record consumed:

FileWritingRecordHandler.processRecordImpl
  @Override
  protected void processRecordImpl(final ConsumerRecord<String, String> consumerRecord) {
    try {
      Files.write(path, singletonList(consumerRecord.value()), CREATE, WRITE, APPEND); (1)
    } catch (IOException e) {
      throw new RuntimeException("unable to write record to file", e);
    }
  }
1 Simply write the record value to a file.

In practice you’re certain to perform a more realistic task for each record.

Finally, create a utility class PropertiesUtil that we use in our consumer application to load Kafka Consumer and application-specific properties. We’ll also use this class in the two performance testing applications that we will create later in this tutorial.

Go ahead and create the src/main/java/io/confluent/developer/PropertiesUtil.java file:

package io.confluent.developer;


import java.io.FileInputStream;
import java.io.IOException;
import java.util.Properties;


/**
 * Utility class for loading Properties from a file.
 */
public class PropertiesUtil {

  public static Properties loadProperties(String fileName) throws IOException {
    try (FileInputStream input = new FileInputStream(fileName)) {
      final Properties props = new Properties();
      props.load(input);
      return props;
    }
  }

}

Compile and run the Confluent Parallel Consumer program

8

In your terminal, run:

./gradlew shadowJar

Now that you have an uberjar for the ParallelConsumerApplication, you can launch it locally. When you run the following, the prompt won’t return, because the application will run until you exit it. There is always another message to process, so streaming applications don’t exit until you force them.

java -cp build/libs/confluent-parallel-consumer-application-standalone-0.0.1-all.jar io.confluent.developer.ParallelConsumerApplication configuration/dev.properties

Produce sample data to the input topic

9

Using the terminal window you opened in step three, run the following command to start a console-producer:

kafka-console-producer --topic parallel-consumer-input-topic --bootstrap-server broker:9092 --property "parse.key=true" --property "key.separator=:"

Each line represents input data for the Confluent Parallel Consumer application. To send all of the events below, paste the following into the prompt and press enter:

fun-line:All streams lead to Kafka
event-promo:Go to Current
event-promo:Go to Kafka Summit
fun-line:Consume gently down the stream

Inspect the consumed records

10

Your Confluent Parallel Consumer application should have consumed all the records sent and written them out to a file.

In a new terminal, run this command to print the results to the console:

cat topic-output.txt

You should see something like this:

All streams lead to Kafka
Go to Current
Go to Kafka Summit
Consume gently down the stream

Note that because we configured the Confluent Parallel Consumer to use KEY ordering, Go to Current appears before Go to Kafka Summit because these values have the same event-promo key. Similarly, All streams lead to Kafka appears before Consume gently down the stream because these values have the same fun-line key.

At this point you can stop the Confluent Parallel Consumer application by entering Ctrl+C in the terminal window where it’s running.

Test it

Create a test configuration file

1

First, create a test file at configuration/test.properties:

input.topic.name=parallel-consumer-input-topic

Write a test for the consumer application

2

Create a directory for the tests to live in:

mkdir -p src/test/java/io/confluent/developer

Testing a Confluent Parallel Consumer application is not too complicated thanks to the LongPollingMockConsumer that is based on Apache Kafka’s MockConsumer. Since the Confluent Parallel Consumer’s codebase is well tested, we don’t need to use a live consumer and Kafka broker to test our application. We can simply use a mock consumer to process some data you’ll feed into it.

There is only one method in KafkaConsumerApplicationTest annotated with @Test, and that is consumerTest(). This method actually runs your ParallelConsumerApplication with the mock consumer.

Now create the following file at src/test/java/io/confluent/developer/ParallelConsumerApplicationTest.java:

package io.confluent.developer;

import io.confluent.csid.utils.LongPollingMockConsumer;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelStreamProcessor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.awaitility.Awaitility;
import org.junit.Test;

import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Collections;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;

import static io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder.KEY;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.junit.Assert.assertEquals;


/**
 * Tests for the ParallelConsumerApplication.
 */
public class ParallelConsumerApplicationTest {

  private static final String TEST_CONFIG_FILE = "configuration/test.properties";

  /**
   * Test the app end to end with a few records consumable via a mock consumer. The app
   * just writes records to a file, so we validate that the expected (mocked) events wind
   * up written to a file.
   *
   * @throws Exception if unable to read properties or create the temp output file
   */
  @Test
  public void consumerTest() throws Exception {
    final Path tempFilePath = Files.createTempFile("test-consumer-output", ".out");
    final ConsumerRecordHandler<String, String> recordHandler = new FileWritingRecordHandler(tempFilePath);
    final Properties appProperties = PropertiesUtil.loadProperties(TEST_CONFIG_FILE);
    final String topic = appProperties.getProperty("input.topic.name");

    final LongPollingMockConsumer<String, String> mockConsumer = new LongPollingMockConsumer<>(OffsetResetStrategy.EARLIEST);
    final ParallelStreamProcessor<String, String> eosStreamProcessor = setupParallelConsumer(mockConsumer, topic);
    final ParallelConsumerApplication consumerApplication = new ParallelConsumerApplication(eosStreamProcessor, recordHandler);
    Runtime.getRuntime().addShutdownHook(new Thread(consumerApplication::shutdown));

    mockConsumer.subscribeWithRebalanceAndAssignment(Collections.singletonList(topic), 1);
    mockConsumer.addRecord(new ConsumerRecord<>(topic, 0, 0, null, "foo"));
    mockConsumer.addRecord(new ConsumerRecord<>(topic, 0, 1, null, "bar"));
    mockConsumer.addRecord(new ConsumerRecord<>(topic, 0, 2, null, "baz"));

    consumerApplication.runConsume(appProperties);

    final Set<String> expectedRecords = new HashSet<>();
    expectedRecords.add("foo");
    expectedRecords.add("bar");
    expectedRecords.add("baz");
    final Set<String> actualRecords = new HashSet<>();
    Awaitility.await()
        .atMost(5, SECONDS)
        .pollInterval(Duration.ofSeconds(1))
        .until(() -> {
          actualRecords.clear();
          actualRecords.addAll(Files.readAllLines(tempFilePath));
          return actualRecords.size() == 3;
        });
    assertEquals(actualRecords, expectedRecords);
  }

  private ParallelStreamProcessor<String, String> setupParallelConsumer(MockConsumer<String, String> mockConsumer, final String topic) {
    ParallelConsumerOptions options = ParallelConsumerOptions.<String, String>builder()
        .ordering(KEY) // <2>
        .maxConcurrency(1000) // <3>
        .consumer(mockConsumer)
        .build();

    ParallelStreamProcessor<String, String> parallelConsumer = ParallelStreamProcessor.createEosStreamProcessor(options);
    parallelConsumer.subscribe(Collections.singletonList(topic));

    return parallelConsumer;
  }

}

Write a test for the record handler

3

Now let’s build a test for the ConsumerRecordHandler implementation used in your application. Even though we have a test for the ParallelConsumerApplication, it’s important that you can test this helper class in isolation.

Create the following file at src/test/java/io/confluent/developer/FileWritingRecordHandlerTest.java:

package io.confluent.developer;


import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.Test;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.List;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;


public class FileWritingRecordHandlerTest {

  @Test
  public void testProcess() throws IOException {
    final Path tempFilePath = Files.createTempFile("test-handler", ".out");
    try {
      final ConsumerRecordHandler<String, String> recordHandler = new FileWritingRecordHandler(tempFilePath);
      ConsumerRecord record = new ConsumerRecord<>("test", 0, 0, null, "my record");
      recordHandler.processRecord(record);
      final List<String> expectedWords = Arrays.asList("my record");
      List<String> actualRecords = Files.readAllLines(tempFilePath);
      assertThat(actualRecords, equalTo(expectedWords));
    } finally {
      Files.deleteIfExists(tempFilePath);
    }
  }

}

Invoke the tests

4

Now run the test, which is as simple as:

./gradlew test

Performance test it

Create a topic for performance testing

1

In this step we’re going to create a topic for use during this tutorial.

But first, you’re going to open a shell on the broker docker container.

Open a new terminal and window then run this command:

docker exec -it broker bash

Now use the following command to create the topic:

kafka-topics --create --topic perftest-parallel-consumer-input-topic --bootstrap-server broker:9092 --replication-factor 1 --partitions 6

Keep this terminal window open as you’ll need to run a console-producer in a few steps.

Produce 10,000 records to the topic

2

Using the terminal window you opened in step three, run the following command to write 10,000 small dummy records to the input topic:

seq 1 10000 | kafka-console-producer --topic perftest-parallel-consumer-input-topic --bootstrap-server broker:9092

Let’s kick off this command and let it run. It’ll take a few minutes to produce all 10,000 records. In the meantime, let’s continue with the tutorial.

Add performance test application and consumer properties

3

Then create two performance test configuration files. The first is for performance testing a multi-threaded KafkaConsumer-based performance test that we’ll use to set a baseline. Create this file at configuration/perftest-kafka-consumer.properties:


# KafkaConsumer properties
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
max.poll.interval.ms=300000
enable.auto.commit=false
auto.offset.reset=earliest
# increase max.poll.records from default of 500 to ensure we can download many records
max.poll.records=10000
# large fetch.min.bytes to optimize for throughput
fetch.min.bytes=100000

# Application-specific properties
input.topic=perftest-parallel-consumer-input-topic
records.to.consume=10000
record.handler.sleep.ms=20

Then create this file at configuration/perftest-parallel-consumer.properties:


# KafkaConsumer properties
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
max.poll.interval.ms=300000
enable.auto.commit=false
auto.offset.reset=earliest
# increase max.poll.records from default of 500 to ensure we can download many records
max.poll.records=10000
# large fetch.min.bytes to optimize for throughput
fetch.min.bytes=100000

# Confluent Parallel Consumer properties
parallel.consumer.max.concurrency=256
parallel.consumer.order=UNORDERED
parallel.consumer.commit.mode=PERIODIC_CONSUMER_ASYNCHRONOUS
parallel.consumer.seconds.between.commits=60

# Application-specific properties
input.topic=perftest-parallel-consumer-input-topic
records.to.consume=10000
record.handler.sleep.ms=20

Let’s look at some of the more important properties in these configuration files:

  1. We specify fetch.min.bytes to be 100000 in order to optimize for consumer throughput

  2. The application-specific property records.to.consume is set to 10000 to match the number of records that we produced in the previous step. This will cause the application to terminate upon consuming this many records.

  3. The application-specific property record.handler.sleep.ms is used to simulate a nontrivial amount of work to perform per record. In this case, we sleep for 20ms to simulate a low-but-nontrivial latency operation like a call to a database or REST API.

In the configuration file for the Confluent Parallel Consumer performance test, there are a few Confluent Parallel Consumer-specific properties.

  1. parallel.consumer.max.concurrency is set to 256, much higher than the number of partitions in our topic

  2. We use UNORDERED ordering, PERIODIC_CONSUMER_ASYNCHRONOUS offset commit mode, and a high parallel.consumer.seconds.between.commits value of 60 seconds. Together, these values optimize for throughput. This keeps our test analogous to the KafkaConsumer-based baseline. You may have noticed that, because we are aiming to maximize throughput in these performance tests while ignoring the overhead of offsets handling, the baseline doesn’t even commit offsets!

Update properties file with Kafka cluster information

4

Using the command below, append the contents of configuration/dev.properties to configuration/perftest-kafka-consumer.properties and configuration/perftest-parallel-consumer.properties:

cat configuration/dev.properties >> configuration/perftest-kafka-consumer.properties
cat configuration/dev.properties >> configuration/perftest-parallel-consumer.properties

Create the Multi-threaded KafkaConsumer-based performance test

5

Here you’ll build a performance test application and supporting classes that implement multi-threaded consuming (one KafkaConsumer per-partition to maximize parallelism).

First, you’ll create the main performance test application, src/main/java/io/confluent/developer/MultithreadedKafkaConsumerPerfTest.java:

package io.confluent.developer;


import org.apache.kafka.clients.consumer.ConsumerConfig;

import java.io.IOException;
import java.util.Properties;

import static org.apache.commons.lang3.RandomUtils.nextInt;


/**
 * Consumption throughput test that runs a KafkaConsumer per thread, synchronously sleeping `record.handler.sleep.ms`
 * per event. This simulates the performance characteristics of applications that do something of nontrivial latency
 * per event, e.g., call out to a DB or REST API per event. It is the KafkaConsumer analogue to
 * ParallelConsumerPerfTest, which is based on the Confluent Parallel Consumer.
 */
public class MultithreadedKafkaConsumerPerfTest {

  public static void main(String[] args) {
    if (args.length < 1) {
      throw new IllegalArgumentException(
              "This program takes one argument: the path to an environment configuration file.");
    }

    // load app and consumer specific properties from command line arg
    final Properties appProperties;
    try {
      appProperties = PropertiesUtil.loadProperties(args[0]);
    } catch (IOException e) {
      throw new RuntimeException("Unable to load application properties", e);
    }

    // random group ID for rerun convenience
    String groupId = "mt-kafka-consumer-perf-test-group-" + nextInt();
    appProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

    // create handler that sleeps configured number of ms per record
    final int recordHandlerSleepMs = Integer.parseInt(appProperties.getProperty("record.handler.sleep.ms"));
    final SleepingRecordHandler recordHandler = new SleepingRecordHandler(recordHandlerSleepMs);

    // create and run MultithreadedKafkaConsumer, which runs an ExecutorService with a consumer thread per partition
    // and terminates when the total expected number of records have been consumed across the threads
    try (MultithreadedKafkaConsumer mtConsumer = new MultithreadedKafkaConsumer(appProperties, recordHandler)) {
      mtConsumer.runConsume();
    }
  }


}

Second, create the class that implements multi-threaded consuming, src/main/java/io/confluent/developer/MultithreadedKafkaConsumer.java:

package io.confluent.developer;


import me.tongfei.progressbar.ProgressBar;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.text.DecimalFormat;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;


/**
 * This class consumes a specified number of records from a Kafka topic with a consumer thread per explicitly
 * assigned partition.
 */
public class MultithreadedKafkaConsumer implements Closeable {

  private static final Logger LOGGER = LoggerFactory.getLogger(MultithreadedKafkaConsumer.class.getName());

  private final Properties appProperties;
  private final SleepingRecordHandler recordHandler;
  private final AtomicInteger recordsConsumed;  // total records consumed across all threads
  private ExecutorService executor;

  /**
   * Multithreaded Kafka consumer that runs a KafkaConsumer per partition.
   *
   * @param appProperties application and consumer properties
   * @param recordHandler records handler to run per record
   */
  public MultithreadedKafkaConsumer(Properties appProperties, SleepingRecordHandler recordHandler) {
    this.appProperties = appProperties;
    this.recordHandler = recordHandler;
    this.recordsConsumed = new AtomicInteger(0);
  }

  /**
   * Main consumption method that instantiates consumers per partition, runs the consumers in a thread pool, and outputs
   * progress to the user as it consumes.
   */
  public void runConsume() {
    int recordsToConsume = Integer.parseInt(appProperties.getProperty("records.to.consume"));

    // instantiate all consumers before marking start time to keep init time out of the reported duration
    List<KafkaConsumer> consumers = getConsumersPerPartition(appProperties);

    // kick off consumers to process all records, updating a progress bar as we go
    ProgressBar progressBar = new ProgressBar("Progress", recordsToConsume);

    // create thread pool and shutdown hook to clean up in case of exception
    executor = Executors.newFixedThreadPool(consumers.size());
    Thread shutdownExecutor = new Thread(() -> this.close());
    Runtime.getRuntime().addShutdownHook(shutdownExecutor);

    // mark the consumption start time and kick off all consumer threads
    long startTimeMs = System.currentTimeMillis();
    List<Future> futures = new ArrayList<>();
    for (KafkaConsumer consumer: consumers) {
      futures.add(executor.submit(new ConsumerTask(consumer, recordsToConsume, recordsConsumed, progressBar)));
    }

    // wait for all tasks to complete, which happens when all records have been consumed
    LOGGER.info("Waiting for consumers to consume all {} records", recordsToConsume);
    for (Future future : futures) {
      try {
        future.get();
      } catch (InterruptedException|ExecutionException e) {
        throw new RuntimeException(e);
      }
    }

    // done! report total time to consume all records
    double durationSeconds = (System.currentTimeMillis() - startTimeMs) / 1_000.0;
    DecimalFormat df = new DecimalFormat("0.00");
    LOGGER.info("Total time to consume {} records: {} seconds", recordsToConsume, df.format(durationSeconds));
  }

  /**
   * Create KafkaConsumer instances per partition, and explicitly assign a partition to each.
   *
   * @param appProperties consumer and application properties
   * @return list of consumers
   */
  private List<KafkaConsumer> getConsumersPerPartition(Properties appProperties) {
    String topic = appProperties.getProperty("input.topic");

    // use temp consumer to inspect the number of partitions
    int numPartitions;
    try (KafkaConsumer tempConsumer = new KafkaConsumer<>(appProperties)) {
      numPartitions = tempConsumer.partitionsFor(topic).size();
      LOGGER.info("{} partitions detected for {} topic", numPartitions, topic);
    }

    List<KafkaConsumer> consumers = new ArrayList<>();
    for (int i = 0; i < numPartitions; i++) {
      KafkaConsumer consumer = new KafkaConsumer<String, String>(appProperties);
      TopicPartition tp = new TopicPartition(topic, i);
      consumer.assign(Collections.singletonList(tp));
      consumers.add(consumer);
    }

    return consumers;
  }

  /**
   * Close the multithreaded consumer, which entails shutting down the executor thread pool.
   */
  public void close() {
    if (executor != null) {
      executor.shutdown();
    }
  }

  /**
   * Inner class that runs each consumer.
   */
  class ConsumerTask implements Runnable {
    private final KafkaConsumer consumer;
    private final int recordsToConsume;           // records expected to be consumed across all threads
    private final AtomicInteger recordsConsumed;  // records consumed across all threads
    private final ProgressBar progressBar;

    /**
     * Runnable task to consume a partition.
     *
     * @param consumer          the consumer already assigned a specific partition
     * @param recordsToConsume  total number of records to consume across all threads
     * @param recordsConsumed   running tally of records consumed across all threads
     * @param progressBar       progress bar that we update to recordsConsumed / recordsToConsume during consumption
     */
    public ConsumerTask(KafkaConsumer consumer, int recordsToConsume, AtomicInteger recordsConsumed, ProgressBar progressBar) {
      this.consumer = consumer;
      this.recordsToConsume = recordsToConsume;
      this.recordsConsumed = recordsConsumed;
      this.progressBar = progressBar;
    }

    /**
     * Each task polls until the total number of records consumed across all threads is what we expect. Simply calls
     * the record handler for each record.
     */
    @Override
    public void run() {
      int numConsumed;
      do {
        // Use a poll timeout high enough to not saturate CPU, but fine enough to get interesting comparison numbers.
        // Since the perf tests typically take many seconds to run, use 0.5 second poll timeout to strike this balance.
        ConsumerRecords<String, String> records = consumer.poll(Duration.of(500, ChronoUnit.MILLIS));
        for (ConsumerRecord record : records) {
          recordHandler.processRecord(record);
          numConsumed = recordsConsumed.incrementAndGet();
          progressBar.stepTo(numConsumed);
        }
      } while (recordsConsumed.get() < recordsToConsume); // tasks block until all records consumed
    }
  }

}

Finally, create the record handler that sleeps 20ms per record consumed, src/main/java/io/confluent/developer/SleepingRecordHandler.java:

package io.confluent.developer;


import org.apache.kafka.clients.consumer.ConsumerRecord;


/**
 * Record handler that sleeps a given number of milliseconds.
 */
public class SleepingRecordHandler extends ConsumerRecordHandler<String, String> {

  private final int numMilliseconds;

  public SleepingRecordHandler(final int numMilliseconds) {
    this.numMilliseconds = numMilliseconds;
  }

  @Override
  protected void processRecordImpl(final ConsumerRecord consumerRecord) {
    try {
      Thread.sleep(numMilliseconds);
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
    }
  }

}

Compile and run the KafkaConsumer-based performance test

6

Let’s rebuild the uberjar to include this performance test. In your terminal, run:

./gradlew shadowJar

Now that you have an uberjar containing MultithreadedKafkaConsumerPerfTest, you can launch it locally. This will run until the expected 10,000 records have been consumed. Ensure that the seq command that you ran previously to produce 10,000 records has completed before running this so that we can accurately test consumption throughput.

java -cp build/libs/confluent-parallel-consumer-application-standalone-0.0.1-all.jar io.confluent.developer.MultithreadedKafkaConsumerPerfTest configuration/perftest-kafka-consumer.properties

While the performance test runs, take a few sips of the beverage that you previously poured. It will take a minute or two to complete, and the final line output will show you the latency for consuming all 10,000 records, e.g.:

[main] INFO io.confluent.developer.MultithreadedKafkaConsumer - Total time to consume 10000 records: 40.46 seconds

Before we build and run a Confluent Parallel Consumer analogue to this KafkaConsumer baseline, let’s summarize what we’ve seen so far:

  1. We populated a topic with default properties and produced 10,000 small records to it

  2. We maxed out the size of our consumer group by running a KafkaConsumer per partition, with each instance explicitly assigned to one partition

  3. We optimized each KafkaConsumer for throughput by setting high values for max.poll.records and fetch.min.bytes

  4. We struck a balance between latency accuracy and instrumentation overhead needed to track progress and end when expected by using a 0.5 second poll timeout. (We want to report consumption latency shortly after consumption finishes, but we also want to minimize busy waiting of the KafkaConsumer instances that finish first.)

  5. We scratched our head writing some tricky multi-threaded code. By the way, is any multi-threaded code not tricky?

  6. The reported performance test latency was 40.46 seconds in our case (your number is surely different).

Create the Confluent Parallel Consumer performance test

7

Here you’ll build a performance test application based on the Confluent Parallel Consumer. This test reuses a couple of classes that we created previously: PropertiesUtil for loading consumer and application-specific properties, and SleepingRecordHandler for simulating a nontrivial workload per-record just as we did in MultithreadedKafkaConsumerPerfTest. Please rewind and create these if you skipped the parts of the tutorial that create these two classes.

Because the Confluent Parallel Consumer API is much lighter weight than the lift required to multi-thread KafkaConsumer instances per partition, let’s knock out the entire thing in one class. Create the file src/main/java/io/confluent/developer/ParallelConsumerPerfTest.java:

package io.confluent.developer;


import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.ParallelStreamProcessor;
import me.tongfei.progressbar.ProgressBar;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.text.DecimalFormat;
import java.util.Collections;
import java.util.Properties;

import static java.time.Duration.ofSeconds;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.commons.lang3.RandomUtils.nextInt;
import static org.awaitility.Awaitility.await;


/**
 * Consumption throughput test that runs the Confluent Parallel Consumer, synchronously sleeping
 * `record.handler.sleep.ms` per event. This simulates the performance characteristics of applications that do
 * something of nontrivial latency per event, e.g., call out to a DB or REST API per event. It is the Confluent
 * Parallel Consumer analogue to MultithreadedKafkaConsumerPerfTest, which is based on KafkaConsumer.
 */
public class ParallelConsumerPerfTest {

  private static final Logger LOGGER = LoggerFactory.getLogger(ParallelConsumerPerfTest.class.getName());
  private final ParallelStreamProcessor<String, String> parallelConsumer;
  private final SleepingRecordHandler recordHandler;

  /**
   * Throughput performance test for the Confluent Parallel Consumer.
   *
   * @param parallelConsumer the parallel consumer
   * @param recordHandler    a handler to call per record
   */
  public ParallelConsumerPerfTest(final ParallelStreamProcessor<String, String> parallelConsumer,
                                  final SleepingRecordHandler recordHandler) {
    this.parallelConsumer = parallelConsumer;
    this.recordHandler = recordHandler;
  }

  /**
   * Close the parallel consumer on application shutdown
   */
  public void shutdown() {
    LOGGER.info("shutting down");
    if (parallelConsumer != null) {
      parallelConsumer.close();
    }
  }

  /**
   * Subscribes to the configured input topic and calls (blocking) `poll` method.
   *
   * @param appProperties application and consumer properties
   */
  private void runConsume(final Properties appProperties) {
    parallelConsumer.subscribe(Collections.singletonList(appProperties.getProperty("input.topic")));
    parallelConsumer.poll(context -> {
      recordHandler.processRecord(context.getSingleConsumerRecord());
    });
  }

  public static void main(String[] args) {
    if (args.length < 1) {
      throw new IllegalArgumentException(
              "This program takes one argument: the path to an environment configuration file.");
    }

    // load app and consumer specific properties from command line arg
    final Properties appProperties;
    try {
      appProperties = PropertiesUtil.loadProperties(args[0]);
    } catch (IOException e) {
      throw new RuntimeException("Unable to load application properties", e);
    }

    // random consumer group ID for rerun convenience
    String groupId = "parallel-consumer-perf-test-group-" + nextInt();
    appProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

    // create parallel consumer configured based on properties from app configuration
    final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(appProperties);
    final int maxConcurrency = Integer.parseInt(appProperties.getProperty("parallel.consumer.max.concurrency"));
    final ParallelConsumerOptions.ProcessingOrder processingOrder = ParallelConsumerOptions.ProcessingOrder.
            valueOf(appProperties.getProperty("parallel.consumer.order"));
    final ParallelConsumerOptions.CommitMode commitMode = ParallelConsumerOptions.CommitMode.
            valueOf(appProperties.getProperty("parallel.consumer.commit.mode"));
    final int secondsBetweenCommits = Integer.parseInt(appProperties.getProperty("parallel.consumer.seconds.between.commits"));
    final ParallelEoSStreamProcessor parallelConsumer = new ParallelEoSStreamProcessor<>(ParallelConsumerOptions.<String, String>builder()
            .ordering(processingOrder)
            .maxConcurrency(maxConcurrency)
            .consumer(consumer)
            .commitMode(commitMode)
            .build());
    parallelConsumer.setTimeBetweenCommits(ofSeconds(secondsBetweenCommits));

    // create handler that sleeps configured number of ms per record
    final int recordHandlerSleepMs = Integer.parseInt(appProperties.getProperty("record.handler.sleep.ms"));
    final SleepingRecordHandler recordHandler = new SleepingRecordHandler(recordHandlerSleepMs);

    // kick off the consumer, marking the start time so that we can report total runtime to consume all records
    final ParallelConsumerPerfTest consumerApplication = new ParallelConsumerPerfTest(parallelConsumer, recordHandler);
    Runtime.getRuntime().addShutdownHook(new Thread(consumerApplication::shutdown));
    long startTimeMs = System.currentTimeMillis();
    consumerApplication.runConsume(appProperties);

    // wait for the consumer to process all records, updating the progress bar as we go
    int recordsToConsume = Integer.parseInt(appProperties.getProperty("records.to.consume"));
    ProgressBar progressBar = new ProgressBar("Progress", recordsToConsume);
    await()
            .forever()
            .pollInterval(250, MILLISECONDS)
            .until(() -> {
              int numRecordsProcessed = recordHandler.getNumRecordsProcessed();
              progressBar.stepTo(numRecordsProcessed);
              return numRecordsProcessed >= recordsToConsume;
            });

    // done! report total time to consume all records
    progressBar.close();
    double durationSeconds = (System.currentTimeMillis() - startTimeMs) / 1_000.0;
    DecimalFormat df = new DecimalFormat("0.00");
    LOGGER.info("Time to consume {} records: {} seconds", recordsToConsume, df.format(durationSeconds));

    consumerApplication.shutdown();
  }

}

Take a look at the code and note the simplicity. Most of the code is for properties file handling and tracking progress. The interesting part relevant to the Confluent Parallel Consumer is in the four-line runConsume() method:

ParallelConsumerPerfTest.runConsume
  private void runConsume(final Properties appProperties) {
    parallelConsumer.subscribe(Collections.singletonList(appProperties.getProperty("input.topic.name")));
    parallelConsumer.poll(context -> {
      recordHandler.processRecord(context.getSingleConsumerRecord());
    });
  }

Bellisimo!

Compile and run the Confluent Parallel Consumer performance test

8

Let’s rebuild the uberjar to include this performance test. In your terminal, run:

./gradlew shadowJar

Now that you have an uberjar containing ParallelConsumerPerfTest, you can launch it locally. This will run until the expected 10,000 records have been consumed. Ensure that the seq command that you ran previously to produce 10,000 records has completed before running this so that we can accurately test consumption throughput.

As you kick this off, bear in mind the latency that you recorded when you ran MultithreadedKafkaConsumerPerfTest (40.46 seconds in the run performed for the tutorial).

java -cp build/libs/confluent-parallel-consumer-application-standalone-0.0.1-all.jar io.confluent.developer.ParallelConsumerPerfTest configuration/perftest-parallel-consumer.properties

While the performance test runs, take a few sips of the beverage…​ actually never mind. It’s done:

[main] INFO io.confluent.developer.ParallelConsumerPerfTest - Time to consume 10000 records: 1.78 seconds

Your latency will surely be different from the 1.78 seconds shown here. But, assuming you are running the test on reasonable hardware and you aren’t running any extremely noisy neighbors on your machine, it should be just a few seconds.

Experiment with the performance test classes

9

In this section of the tutorial, we created a performance test for the Confluent Parallel Consumer, and a KafkaConsumer baseline to which to compare.

This gave us a couple of data points, but only for one specific test context: each test aimed to consume records as quickly as possible in a single JVM while simulating a 20ms workload per-record.

We can turn a few knobs and pull some levers to gather more performance test results in other application contexts. Since we used helper classes and parameterized configuration in this tutorial, you can easily choose other performance test adventures. Some questions you might explore:

  1. How does performance compare if we increase or decrease the simulated workload time?

  2. What if we commit offsets more frequently or even synchronously or transactionally in each test? In the case of the Confluent Parallel Consumer, this entails setting parallel.consumer.seconds.between.commits to a value lower than 60 seconds, and using a parallel.consumer.commit.mode of PERIODIC_CONSUMER_SYNC or PERIODIC_TRANSACTIONAL_PRODUCER. These commit modes better simulate an application designed to more easily pick up where it left off when recovering from an error.

  3. What if we change the properties of the KafkaConsumer instance(s) most relevant to throughput (fetch.min.bytes and max.poll.records)?

  4. What if we use KEY or PARTITION ordering when configuring the Confluent Parallel Consumer (as opposed to UNORDERED)?

  5. How does the throughput comparison change if we create perftest-parallel-consumer-input-topic with more (or fewer) partitions?

  6. What if we use larger, more realistic records and not just integers from 1 to 10,000? What if we also play with different key spaces?

Have fun with it!

Deploy on Confluent Cloud

Run your app to Confluent Cloud

1

Instead of running a local Kafka cluster, you may use Confluent Cloud, a fully-managed Apache Kafka service.

  1. Sign up for Confluent Cloud, a fully-managed Apache Kafka service.

  2. After you log in to Confluent Cloud Console, click on Add cloud environment and name the environment learn-kafka. Using a new environment keeps your learning resources separate from your other Confluent Cloud resources.

  3. From the Billing & payment section in the Menu, apply the promo code CC100KTS to receive an additional $100 free usage on Confluent Cloud (details).

  4. Click on LEARN and follow the instructions to launch a Kafka cluster and to enable Schema Registry.

Confluent Cloud

Next, from the Confluent Cloud Console, click on Clients to get the cluster-specific configurations, e.g. Kafka cluster bootstrap servers and credentials, Confluent Cloud Schema Registry and credentials, etc., and set the appropriate parameters in your client application. In the case of this tutorial, add the following properties to the client application’s input properties file, substituting all curly braces with your Confluent Cloud values.

# Required connection configs for Kafka producer, consumer, and admin
bootstrap.servers={{ BROKER_ENDPOINT }}
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='{{ CLUSTER_API_KEY }}' password='{{ CLUSTER_API_SECRET }}';
sasl.mechanism=PLAIN
# Required for correctness in Apache Kafka clients prior to 2.6
client.dns.lookup=use_all_dns_ips

# Best practice for Kafka producer to prevent data loss
acks=all

# Required connection configs for Confluent Cloud Schema Registry
schema.registry.url=https://{{ SR_ENDPOINT }}
basic.auth.credentials.source=USER_INFO
schema.registry.basic.auth.user.info={{ SR_API_KEY }}:{{ SR_API_SECRET }}

Now you’re all set to run your streaming application locally, backed by a Kafka cluster fully managed by Confluent Cloud.