Versioned KTables for temporal join accuracy

Question:

How can you ensure proper temporal semantics in stream-table joins?

Edit this page

Example use case:

It used to be when Kafka Streams executes a stream-table join the stream side event would join the the latest available record with the same key on the table side. But sometimes it's important for the stream event to match up with a table record by timestamp as well as key, think of a stream of stock transactions and a table of stock prices - it's essential the transaction joins with the stock price at the time of the transaction, not the latest price. A versioned state store tracks multiple record versions for the same key, rather than the single latest record per key, as is the case for standard non-versioned stores.

Hands-on code example:

Short Answer

You can run your application with Confluent Cloud.

In the Kafka Streams application use a VersionedKeyValueStore when creating your KTable. The [history] retention time shown is just a placeholder - you are free to use any time that suits your event streaming requirements.

    
    final VersionedBytesStoreSupplier versionedStoreSupplier =
              Stores.persistentVersionedKeyValueStore("versioned-ktable-store",
                                                       Duration.ofMinutes(10));


    final KTable<String, String> tableInput = builder.table(tableInputTopic,
                Materialized.<String, String>as(versionedStoreSupplier)
                        .withKeySerde(stringSerde)
                        .withValueSerde(stringSerde));
    

Run it

Prerequisites

1

This tutorial installs Confluent Platform using Docker. Before proceeding:

  • • Install Docker Desktop (version 4.0.0 or later) or Docker Engine (version 19.03.0 or later) if you don’t already have it

  • • Install the Docker Compose plugin if you don’t already have it. This isn’t necessary if you have Docker Desktop since it includes Docker Compose.

  • • Start Docker if it’s not already running, either by starting Docker Desktop or, if you manage Docker Engine with systemd, via systemctl

  • • Verify that Docker is set up properly by ensuring no errors are output when you run docker info and docker compose version on the command line

Initialize the project

2

To get started, make a new directory anywhere you’d like for this project:

mkdir versioned-ktables && cd versioned-ktables

Get Confluent Platform

3

Next, create the following docker-compose.yml file to obtain Confluent Platform (for Kafka in the cloud, see Confluent Cloud):

---
version: '3'

services:

  broker:
    image: confluentinc/cp-kafka:7.4.1
    hostname: broker
    container_name: broker
    ports:
      - 9092:9092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092'
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_PROCESS_ROLES: 'broker,controller'
      KAFKA_NODE_ID: 1
      KAFKA_CONTROLLER_QUORUM_VOTERS: '1@broker:29093'
      KAFKA_LISTENERS: 'PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092'
      KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
      KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
      # This is done for running a single broker in combined mode for local development only
      # For multi-node deployments you should generate using the following
      # Replace CLUSTER_ID with a unique base64 UUID using "bin/kafka-storage.sh random-uuid"
      # See https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-storage-sh
      CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'

And launch it by running:

docker compose up -d

Configure the project

4

Create the following Gradle build file, named build.gradle for the project:

buildscript {
    repositories {
        mavenCentral()
    }
    dependencies {
        classpath "gradle.plugin.com.github.jengelman.gradle.plugins:shadow:7.0.0"
    }
}

plugins {
    id "java"
    id "idea"
    id "eclipse"
}

sourceCompatibility = JavaVersion.VERSION_17
targetCompatibility = JavaVersion.VERSION_17
version = "0.0.1"

repositories {
    mavenCentral()

    maven {
        url "https://packages.confluent.io/maven"
    }
}

apply plugin: "com.github.johnrengelman.shadow"

dependencies {
    implementation "org.apache.avro:avro:1.11.1"
    implementation "org.slf4j:slf4j-simple:2.0.7"
    implementation 'org.apache.kafka:kafka-streams:3.5.1'
    implementation ('org.apache.kafka:kafka-clients') {
       version {
           strictly '3.5.1'
        }
      }
    implementation "io.confluent:kafka-streams-avro-serde:7.4.0"

    testImplementation "org.apache.kafka:kafka-streams-test-utils:3.5.1"
    testImplementation "junit:junit:4.13.2"
    testImplementation 'org.hamcrest:hamcrest:2.2'
}

test {
    testLogging {
        outputs.upToDateWhen { false }
        showStandardStreams = true
        exceptionFormat = "full"
    }
}

jar {
  manifest {
    attributes(
      "Class-Path": configurations.compileClasspath.collect { it.getName() }.join(" "),
      "Main-Class": "io.confluent.developer.VersionedKTableExample"
    )
  }
}

shadowJar {
    archiveBaseName = "versioned-katable-standalone"
    archiveClassifier = ''
}

Note that the sourceCompatibility and targetCompatibility are set to Java 17, so make sure that java -version displays 17 before proceeding.

And be sure to run the following command to obtain the Gradle wrapper:

gradle wrapper

Next, create a directory for configuration data:

mkdir configuration

Then create a development file at configuration/dev.properties:

application.id=versioned-ktables
bootstrap.servers=localhost:9092

stream.topic.name=stream-input-topic
stream.topic.partitions=1
stream.topic.replication.factor=1

table.topic.name=table-input-topic
table.topic.partitions=1
table.topic.replication.factor=1

output.topic.name=output-topic
output.topic.partitions=1
output.topic.replication.factor=1

Create the Kafka Streams topology

5

Create a directory for the Java files in this project:

mkdir -p src/main/java/io/confluent/developer

Before you create the Java class to run the VersionedKTable example, let’s dive into the main point of this tutorial, creating a versioned KTable:

final VersionedBytesStoreSupplier versionedStoreSupplier = (1)
                            Stores.persistentVersionedKeyValueStore("versioned-ktable-store",
                                                                    Duration.ofMinutes(10)) (2)


final KTable<String, String> tableInput = builder.table(tableInputTopic,
                Materialized.<String, String>as(versionedStoreSupplier) (3)
                        .withKeySerde(stringSerde)
                        .withValueSerde(stringSerde));



streamInput.join(tableInput, valueJoiner) (4)
                .to(totalResultOutputTopic,
                        Produced.with(stringSerde, stringSerde));
1 Creating the versioned state store
2 Specifying the length of time the table keeps previous versions of a record for querying
3 Creating the source KTable backed by a versioned state store
4 Using the versioned table in a stream-table join

So for using a versioned KTable you first create a VersionedBytesStoreSupplier with the Stores.persistentVersionedKeyValueStore factory method providing parameters for the name of the store and the amount of time the store retains previous versions.

Then you’ll use the newly created supplier when creating your source KTable with the StreamBuilder.build(String topic, Materialized) method as shown above at annotation three.

Now if your KStream out-of-order records joining with a KTable using a versioned store, the join should result in a temporal correct result as the join of the stream record with a table record is aligned by timestamps instead of simply using the latest record for the key.

For more background on versioned state stores read KIP-889.

Now go ahead and create the Java file at src/main/java/io/confluent/developer/VersionedKTableExample.java.

package io.confluent.developer;


import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.VersionedBytesStoreSupplier;

import java.io.FileInputStream;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.*;
import java.util.concurrent.CountDownLatch;

public class VersionedKTableExample {


    public Topology buildTopology(Properties allProps) {
        final StreamsBuilder builder = new StreamsBuilder();
        final String streamInputTopic = allProps.getProperty("stream.topic.name");
        final String tableInputTopic = allProps.getProperty("table.topic.name");
        final String totalResultOutputTopic = allProps.getProperty("output.topic.name");

        final Serde<String> stringSerde = Serdes.String();

        final VersionedBytesStoreSupplier versionedStoreSupplier = Stores.persistentVersionedKeyValueStore("versioned-ktable-store", Duration.ofMinutes(10));
        final KeyValueBytesStoreSupplier persistentStoreSupplier = Stores.persistentKeyValueStore("non-versioned-table");

        final KStream<String, String> streamInput = builder.stream(streamInputTopic, Consumed.with(stringSerde, stringSerde));

        final KTable<String, String> tableInput = builder.table(tableInputTopic,
                Materialized.<String, String>as(versionedStoreSupplier)
                        .withKeySerde(stringSerde)
                        .withValueSerde(stringSerde));
        final ValueJoiner<String, String, String> valueJoiner = (val1, val2) -> val1 + " " + val2;

        streamInput.join(tableInput, valueJoiner)
                .peek((key, value) -> System.out.println("Joined value: " + value))
                .to(totalResultOutputTopic,
                        Produced.with(stringSerde, stringSerde));

        return builder.build();
    }

    public void createTopics(final Properties allProps) {
        try (final AdminClient client = AdminClient.create(allProps)) {

            final List<NewTopic> topics = new ArrayList<>();

            topics.add(new NewTopic(
                    allProps.getProperty("stream.topic.name"),
                    Integer.parseInt(allProps.getProperty("stream.topic.partitions")),
                    Short.parseShort(allProps.getProperty("stream.topic.replication.factor"))));

            topics.add(new NewTopic(
                    allProps.getProperty("table.topic.name"),
                    Integer.parseInt(allProps.getProperty("table.topic.partitions")),
                    Short.parseShort(allProps.getProperty("table.topic.replication.factor"))));

            topics.add(new NewTopic(
                    allProps.getProperty("output.topic.name"),
                    Integer.parseInt(allProps.getProperty("output.topic.partitions")),
                    Short.parseShort(allProps.getProperty("output.topic.replication.factor"))));

            client.createTopics(topics);
        }
    }

    public Properties loadEnvProperties(String fileName) throws IOException {
        final Properties allProps = new Properties();
        try (final FileInputStream input = new FileInputStream(fileName)) {
            allProps.load(input);
        }
        return allProps;
    }

    public static void main(String[] args) throws Exception {

        if (args.length < 1) {
            throw new IllegalArgumentException("This program takes one argument: the path to an environment configuration file.");
        }

        final VersionedKTableExample instance = new VersionedKTableExample();
        final Properties allProps = instance.loadEnvProperties(args[0]);
        final Topology topology = instance.buildTopology(allProps);

        instance.createTopics(allProps);

        TutorialDataGenerator dataGenerator = new TutorialDataGenerator(allProps);
        dataGenerator.generate();

        final KafkaStreams streams = new KafkaStreams(topology, allProps);
        final CountDownLatch latch = new CountDownLatch(1);

        // Attach shutdown handler to catch Control-C.
        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
            @Override
            public void run() {
                streams.close(Duration.ofSeconds(5));
                latch.countDown();
            }
        });

        try {
            streams.start();
            latch.await();
        } catch (Throwable e) {
            System.exit(1);
        }
        System.exit(0);
    }

    record TutorialDataGenerator(Properties properties) {

        public void generate() {
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

            try (Producer<String, String> producer = new KafkaProducer<>(properties)) {
                HashMap<String, List<KeyValue<String, String>>> entryData = new HashMap<>();
                HashMap<String, List<Long>> dataTimestamps = new HashMap<>();
                Instant now = Instant.now();

                List<KeyValue<String, String>> streamMessagesOutOfOrder = Arrays.asList(
                        KeyValue.pair("one", "peanut butter and"),
                        KeyValue.pair("two", "ham and"),
                        KeyValue.pair("three", "cheese and"),
                        KeyValue.pair("four", "tea and"),
                        KeyValue.pair("five", "coffee with")
                );
                final String topic1 = properties.getProperty("stream.topic.name");
                entryData.put(topic1, streamMessagesOutOfOrder);

                List<Long> timestamps = Arrays.asList(
                        now.minus(50, ChronoUnit.SECONDS).toEpochMilli(),
                        now.minus(40, ChronoUnit.SECONDS).toEpochMilli(),
                        now.minus(30, ChronoUnit.SECONDS).toEpochMilli(),
                        now.minus(20, ChronoUnit.SECONDS).toEpochMilli(),
                        now.minus(10, ChronoUnit.SECONDS).toEpochMilli()
                );
                dataTimestamps.put(topic1, timestamps);

                List<KeyValue<String, String>> tableMessagesOriginal = Arrays.asList(
                        KeyValue.pair("one", "jelly"),
                        KeyValue.pair("two", "eggs"),
                        KeyValue.pair("three", "crackers"),
                        KeyValue.pair("four", "crumpets"),
                        KeyValue.pair("five", "cream"));
                final String topic2 = properties.getProperty("table.topic.name");
                entryData.put(topic2, tableMessagesOriginal);
                dataTimestamps.put(topic2, timestamps);


                produceRecords(entryData, producer, dataTimestamps);
                entryData.clear();
                dataTimestamps.clear();

                List<KeyValue<String, String>> tableMessagesLater = Arrays.asList(
                        KeyValue.pair("one", "sardines"),
                        KeyValue.pair("two", "an old tire"),
                        KeyValue.pair("three", "fish eyes"),
                        KeyValue.pair("four", "moldy bread"),
                        KeyValue.pair("five", "lots of salt"));
                entryData.put(topic2, tableMessagesLater);

                List<Long> forwardTimestamps = Arrays.asList(
                        now.plus(50, ChronoUnit.SECONDS).toEpochMilli(),
                        now.plus(40, ChronoUnit.SECONDS).toEpochMilli(),
                        now.plus(30, ChronoUnit.SECONDS).toEpochMilli(),
                        now.plus(30, ChronoUnit.SECONDS).toEpochMilli(),
                        now.plus(30, ChronoUnit.SECONDS).toEpochMilli()
                );
                dataTimestamps.put(topic2, forwardTimestamps);

                produceRecords(entryData, producer, dataTimestamps);

            }
        }

        private static void produceRecords(HashMap<String, List<KeyValue<String, String>>> entryData,
                                           Producer<String, String> producer,
                                           HashMap<String, List<Long>> timestampsMap) {
            entryData.forEach((topic, list) ->
                    {
                        List<Long> timestamps = timestampsMap.get(topic);
                        for (int i = 0; i < list.size(); i++) {
                            long timestamp = timestamps.get(i);
                            String key = list.get(i).key;
                            String value = list.get(i).value;
                            producer.send(new ProducerRecord<>(topic, 0, timestamp, key, value), (metadata, exception) -> {
                                if (exception != null) {
                                    exception.printStackTrace(System.out);
                                } else {
                                    System.out.printf("Produced record at offset %d to topic %s %n", metadata.offset(), metadata.topic());
                                }
                            });
                        }
                    }
            );
        }
    }

}

Compile and run the Kafka Streams program

6

In your terminal, run:

./gradlew shadowJar

The application for this tutorial includes a record generator to populate the topics for a stream and table. To demonstrate how the versioned KTable works, the application will perform a simple KStream-KTable join. The stream contains some classic food combinations that aren’t complete - the join will the table will fill in the details.

Here are the records produced to the source topic for the KStream

KeyValue.pair("one", "peanut butter and"),
KeyValue.pair("two", "ham and"),
KeyValue.pair("three", "cheese and"),
KeyValue.pair("four", "tea and"),
KeyValue.pair("five", "coffee with")

The application will produce two sets of records to the topic for the KTable. The first set contains the correct pairings:

KeyValue.pair("one", "jelly"),
KeyValue.pair("two", "eggs"),
KeyValue.pair("three", "crackers"),
KeyValue.pair("four", "crumpets"),
KeyValue.pair("five", "cream")

Then a second set of answers is produced to the KTable topic, after the initial batch, that don’t quite match:

KeyValue.pair("one", "sardines"),
KeyValue.pair("two", "an old tire"),
KeyValue.pair("three", "fish eyes"),
KeyValue.pair("four", "moldy bread"),
KeyValue.pair("five", "lots of salt")

Even though there’s a second round of records sent to the KTable, you’ll still get the expected results from the join since your KTable is using a versioned store and the timestamps of the stream records and the first batch of table records align.

Now that you have an uberjar for the Kafka Streams application, you can launch it locally. When you run the following, the prompt won’t return, because the application will run until you exit it. There is always another message to process, so streaming applications don’t exit until you force them.

java -jar build/libs/versioned-katable-standalone-0.0.1.jar configuration/dev.properties

Consume data from the output topic

7

Now that you have sent the login events, let’s run a consumer to read the output from your streams application

docker exec -it broker /usr/bin/kafka-console-consumer \
 --topic output-topic \
 --bootstrap-server broker:9092 \
 --from-beginning \
 --property print.key=true \
 --property key.separator=" : " \
 --property print.timestamp=true \
 --max-messages 5

You should see something like this

CreateTime:1691166221891 : one : peanut butter and jelly
CreateTime:1691166231891 : two : ham and eggs
CreateTime:1691166241891 : three : cheese and crackers
CreateTime:1691166251891 : four : tea and crumpets
CreateTime:1691166261891 : five : coffee with cream
Processed a total of 5 messages

To prove that the topic backing the KTable does contain the invalid entries run another console consumer command to inspect the contents:

docker exec -it broker /usr/bin/kafka-console-consumer \
 --topic table-input-topic \
 --bootstrap-server broker:9092 \
 --from-beginning \
 --property print.key=true \
 --property key.separator=" : " \
 --property print.timestamp=true \
 --max-messages 10

You should see something similar to this output. Take note of the timestamps of the entries, they show our invalid entries arrived after the correct ones and taking into account the application populates the topics before starting the Kafka Streams application, you can see how the versioned KTable ensures a semantically correct temporal join.

CreateTime:1691166221891 : one : jelly
CreateTime:1691166231891 : two : eggs
CreateTime:1691166241891 : three : crackers
CreateTime:1691166251891 : four : crumpets
CreateTime:1691166261891 : five : cream
CreateTime:1691166321891 : one : sardines
CreateTime:1691166311891 : two : an old tire
CreateTime:1691166301891 : three : fish eyes
CreateTime:1691166301891 : four : moldy bread
CreateTime:1691166301891 : five : lots of salt
Processed a total of 10 messages

Test it

Create a test configuration file

1

First, create a test file at configuration/test.properties:

application.id=versioned-ktables
state.dir=versioned-ktables-test-state

stream.topic.name=stream-input-topic
stream.topic.partitions=1
stream.topic.replication.factor=1

table.topic.name=table-input-topic
table.topic.partitions=1
table.topic.replication.factor=1

output.topic.name=output-topic
output.topic.partitions=1
output.topic.replication.factor=1

Test the versioned KTable topology

2

First, create a directory for the tests to live in:

mkdir -p src/test/java/io/confluent/developer

Then create the following file at src/test/java/io/confluent/developer/VersionedKTableExampleTest.java. Testing a Kafka streams application requires a bit of test harness code, but happily the org.apache.kafka.streams.TopologyTestDriver class makes this much more pleasant than it otherwise would be.

There is only one method in VersionedKTableExampleTest annotated with @Test, and that is versionedKTableTest(). This method actually runs our Streams topology using the TopologyTestDriver and some mocked data that is set up inside the test method.

package io.confluent.developer;

import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.*;
import org.junit.Test;

import java.io.IOException;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;

import static org.junit.Assert.assertEquals;


public class VersionedKTableExampleTest {

    private final static String TEST_CONFIG_FILE = "configuration/test.properties";

    @Test
    public void versionedKTableTest() throws IOException {
        final VersionedKTableExample instance = new VersionedKTableExample();
        final Properties allProps = instance.loadEnvProperties(TEST_CONFIG_FILE);

        final String streamInputTopicName = allProps.getProperty("stream.topic.name");
        final String tableInputTopicName = allProps.getProperty("table.topic.name");
        final String totalResultOutputTopicName = allProps.getProperty("output.topic.name");

        final Topology topology = instance.buildTopology(allProps);
        try (final TopologyTestDriver testDriver = new TopologyTestDriver(topology, allProps);
             final Serde<String> stringSerde = Serdes.String()) {
            final Serializer<String> stringSerializer = stringSerde.serializer();
            final Deserializer<String> keyDeserializer = stringSerde.deserializer();

            final TestInputTopic<String, String> streamInputTopic = testDriver.createInputTopic(streamInputTopicName, stringSerializer, stringSerializer);
            final TestInputTopic<String, String> tableInputTopic = testDriver.createInputTopic(tableInputTopicName, stringSerializer, stringSerializer);

            final TestOutputTopic<String, String> outputTopic = testDriver.createOutputTopic(totalResultOutputTopicName, keyDeserializer, stringSerde.deserializer());

            Instant now = Instant.now();

            List<KeyValue<String, String>> streamMessages = Arrays.asList(
                    KeyValue.pair("one", "peanut butter and"),
                    KeyValue.pair("two", "ham and"),
                    KeyValue.pair("three", "cheese and"),
                    KeyValue.pair("four", "tea and"),
                    KeyValue.pair("five", "coffee with")
            );

            List<Long> timestamps = Arrays.asList(
                    now.minus(50, ChronoUnit.SECONDS).toEpochMilli(),
                    now.minus(40, ChronoUnit.SECONDS).toEpochMilli(),
                    now.minus(30, ChronoUnit.SECONDS).toEpochMilli(),
                    now.minus(20, ChronoUnit.SECONDS).toEpochMilli(),
                    now.minus(10, ChronoUnit.SECONDS).toEpochMilli()
            );

            List<KeyValue<String, String>> tableMessagesOriginal = Arrays.asList(
                    KeyValue.pair("one", "jelly"),
                    KeyValue.pair("two", "cheese"),
                    KeyValue.pair("three", "crackers"),
                    KeyValue.pair("four", "biscuits"),
                    KeyValue.pair("five", "cream"));

            List<KeyValue<String, String>> tableMessagesLater = Arrays.asList(
                    KeyValue.pair("one", "sardines"),
                    KeyValue.pair("two", "an old tire"),
                    KeyValue.pair("three", "fish eyes"),
                    KeyValue.pair("four", "moldy bread"),
                    KeyValue.pair("five", "lots of salt"));

            List<Long> forwardTimestamps = Arrays.asList(
                    now.plus(50, ChronoUnit.SECONDS).toEpochMilli(),
                    now.plus(40, ChronoUnit.SECONDS).toEpochMilli(),
                    now.plus(30, ChronoUnit.SECONDS).toEpochMilli(),
                    now.plus(30, ChronoUnit.SECONDS).toEpochMilli(),
                    now.plus(30, ChronoUnit.SECONDS).toEpochMilli()
            );
            sendEvents(tableInputTopic, tableMessagesOriginal, timestamps);
            sendEvents(tableInputTopic, tableMessagesLater, forwardTimestamps);
            sendEvents(streamInputTopic, streamMessages, timestamps);

            final List<KeyValue<String, String>> actualEvents = outputTopic.readKeyValuesToList();
            final List<KeyValue<String, String>> expectedEvents = Arrays.asList(
                    KeyValue.pair("one", "peanut butter and jelly"),
                    KeyValue.pair("two", "ham and cheese"),
                    KeyValue.pair("three", "cheese and crackers"),
                    KeyValue.pair("four", "tea and biscuits"),
                    KeyValue.pair("five", "coffee with cream")
            );

            assertEquals(expectedEvents, actualEvents);
        }
    }

    private void sendEvents(final TestInputTopic<String, String> topic,
                            final List<KeyValue<String, String>> input,
                            final List<Long> timestamps) {
        for (int i = 0; i < input.size(); i++) {
            final long timestamp = timestamps.get(i);
            final String key = input.get(i).key;
            final String value = input.get(i).value;
            topic.pipeInput(key, value, timestamp);
        }
    }
}

Invoke the tests

3

Now run the test, which is as simple as:

./gradlew test

Deploy on Confluent Cloud

Run your app with Confluent Cloud

1

Instead of running a local Kafka cluster, you may use Confluent Cloud, a fully managed Apache Kafka service.

  1. Sign up for Confluent Cloud, a fully managed Apache Kafka service.

  2. After you log in to Confluent Cloud Console, click Environments in the lefthand navigation, click on Add cloud environment, and name the environment learn-kafka. Using a new environment keeps your learning resources separate from your other Confluent Cloud resources.

  3. From the Billing & payment section in the menu, apply the promo code CC100KTS to receive an additional $100 free usage on Confluent Cloud (details). To avoid having to enter a credit card, add an additional promo code CONFLUENTDEV1. With this promo code, you will not have to enter a credit card for 30 days or until your credits run out.

  4. Click on LEARN and follow the instructions to launch a Kafka cluster and enable Schema Registry.

Confluent Cloud

Next, from the Confluent Cloud Console, click on Clients to get the cluster-specific configurations, e.g., Kafka cluster bootstrap servers and credentials, Confluent Cloud Schema Registry and credentials, etc., and set the appropriate parameters in your client application. In the case of this tutorial, add the following properties to the client application’s input properties file, substituting all curly braces with your Confluent Cloud values.

# Required connection configs for Kafka producer, consumer, and admin
bootstrap.servers={{ BROKER_ENDPOINT }}
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='{{ CLUSTER_API_KEY }}' password='{{ CLUSTER_API_SECRET }}';
sasl.mechanism=PLAIN
# Required for correctness in Apache Kafka clients prior to 2.6
client.dns.lookup=use_all_dns_ips

# Best practice for Kafka producer to prevent data loss
acks=all

# Required connection configs for Confluent Cloud Schema Registry
schema.registry.url=https://{{ SR_ENDPOINT }}
basic.auth.credentials.source=USER_INFO
schema.registry.basic.auth.user.info={{ SR_API_KEY }}:{{ SR_API_SECRET }}

Now you’re all set to run your streaming application locally, backed by a Kafka cluster fully managed by Confluent Cloud.