Handling multiple event types in a topic

Question:

How can you have multiple event types in a topic and maintain topic-name subject constraints?

Edit this page

Example use case:

You have distinct but related event types, and you want to produce them to the same topic, but you also want to maintain topic-name subject constraints. Why produce different events to the same topic? One reason is that you have low-traffic topics that you'd like to consolidate in order to reduce overhead on the brokers. Or, you need to get the exact order of different events — by producing them to the same topic, you are guaranteed correct ordering per partition.
To do multiple events with topic-name constraints, you'll need to use schema references, which is a schema that contains a field representing an object that is reference to another schema. In this tutorial, you'll learn about using schema references with both Protobuf and Avro.
For more information on multiple event topics, you can read Put several event types in a Kafka Topic by Martin Kleppmann and Putting Several Event Types in the Same Topic – Revisited by Robert Yokota.

Hands-on code example:

Run it

Prerequisites

1

This tutorial installs Confluent Platform using Docker. Before proceeding:

  • • Install Docker Desktop (version 4.0.0 or later) or Docker Engine (version 19.03.0 or later) if you don’t already have it

  • • Install the Docker Compose plugin if you don’t already have it. This isn’t necessary if you have Docker Desktop since it includes Docker Compose.

  • • Start Docker if it’s not already running, either by starting Docker Desktop or, if you manage Docker Engine with systemd, via systemctl

  • • Verify that Docker is set up properly by ensuring no errors are output when you run docker info and docker compose version on the command line

Initialize the project

2

To get started, make a new directory anywhere you’d like for this project:

mkdir multiple-event-types && cd multiple-event-types

Next, create a directory for configuration data:

mkdir configuration

Get Confluent Platform

3

Next, create the following docker-compose.yml file to obtain Confluent Platform:

version: '2'
services:
  broker:
    image: confluentinc/cp-kafka:7.4.1
    hostname: broker
    container_name: broker
    ports:
    - 29092:29092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,CONTROLLER:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_NODE_ID: 1
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@broker:29093
      KAFKA_LISTENERS: PLAINTEXT://broker:9092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:29092
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LOG_DIRS: /tmp/kraft-combined-logs
      CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk
  schema-registry:
    image: confluentinc/cp-schema-registry:7.3.0
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
    - broker
    ports:
    - 8081:8081
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: broker:9092
      SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: WARN

And launch it by running:

docker compose up -d

Create topics

4

In this step, we’re going to create a topics needed for this tutorial.

Since you are going to produce records using Protobuf and Avro serialization, you’ll need two topics.

But first, you’re going to open a shell on the broker docker container.

Open a new terminal and window then run this command:

docker exec broker bash

Now use the following commands to create the topics:

kafka-topics --create --topic proto-events --bootstrap-server broker:9092 --replication-factor 1 --partitions 1
kafka-topics --create --topic avro-events --bootstrap-server broker:9092 --replication-factor 1 --partitions 1

Keep this terminal window open as you’ll need to run a console-producer in a few steps.

Configure the project

5

Create the following Gradle build file, named build.gradle, for the project:

buildscript {
    repositories {
        mavenCentral()
        maven {
            url = uri("https://packages.confluent.io/maven/")
        }
        maven {
            url = uri("https://plugins.gradle.org/m2/")
        }
        maven {
            url = uri("https://jitpack.io")
        }
    }
    dependencies {
        classpath "gradle.plugin.com.github.jengelman.gradle.plugins:shadow:7.0.0"
    }
}

plugins {
    id "java"
    id "idea"
    id "eclipse"
    id "com.github.imflog.kafka-schema-registry-gradle-plugin" version "1.9.1"
    id "com.google.protobuf" version "0.9.2"
    id "com.github.davidmc24.gradle.plugin.avro" version "1.7.0"
}


sourceCompatibility = JavaVersion.VERSION_17
targetCompatibility = JavaVersion.VERSION_17
version = "0.0.1"

repositories {
    mavenCentral()
    maven {
        url = uri("https://packages.confluent.io/maven/")
    }

    maven {
        url = uri("https://jitpack.io")
    }
}

apply plugin: "com.github.johnrengelman.shadow"

dependencies {
    implementation 'com.google.protobuf:protobuf-java:3.22.2'
    implementation 'org.apache.avro:avro:1.11.1'
    implementation 'org.slf4j:slf4j-simple:2.0.7'
    implementation 'org.apache.kafka:kafka-streams:3.4.0'
    implementation ('org.apache.kafka:kafka-clients') {
       version {
           strictly '3.4.0'
        }
      }
    testImplementation "junit:junit:4.13.2"
    testImplementation 'org.hamcrest:hamcrest:2.2'
    implementation "io.confluent:kafka-avro-serializer:7.1.4"
    implementation "io.confluent:kafka-protobuf-serializer:7.2.2"
    implementation "io.confluent:kafka-protobuf-provider:7.3.0"
}

protobuf {
    generatedFilesBaseDir = "${project.buildDir}/generated-main-proto-java"

    protoc {
        artifact = 'com.google.protobuf:protoc:3.22.2'
    }
}

test {
    testLogging {
        outputs.upToDateWhen { false }
        showStandardStreams = true
        exceptionFormat = "full"
    }
}

jar {
  manifest {
    attributes(
      "Class-Path": configurations.compileClasspath.collect { it.getName() }.join(" "),
      "Main-Class": "io.confluent.developer.MultiEventProtobufProduceConsumeApp"
    )
  }
}

shadowJar {
    archiveBaseName = "multiple-event-types-standalone"
    archiveClassifier = ''
}

schemaRegistry {
    def props = new Properties()
    def configs = file("configuration/ccloud.properties")
    if (configs.exists()) {
        configs.withInputStream { props.load(it) }
        def srUrl = props.getProperty("schema.registry.url")
        def auth = props.getProperty("basic.auth.user.info").split(":")
        println "Using Confluent properties Schema Registry endpoint:${srUrl}, username:${auth[0]},password:${auth[1]}"

        url = srUrl

        credentials {
            // username is the characters up to the ':' in the basic.auth.user.info property
            username = auth[0]
            // password is everything after ':' in the basic.auth.user.info property
            password = auth[1]
        }
    } else if (file("configuration/dev.properties").exists()) {
        configs = file("configuration/dev.properties")
        configs.withInputStream { props.load(it) }
        def srUrl = props.getProperty("schema.registry.url")
        println "Using local dev properties Schema Registry endpoint:${srUrl}"
    } else {
        println "No configs to parse yet"
    }


    // Possible types are ["JSON", "PROTOBUF", "AVRO"]
    register {
        subject('pageview', 'src/main/avro/pageview.avsc', 'AVRO')
        subject('purchase', 'src/main/avro/purchase.avsc', 'AVRO')
        subject('avro-events-value', 'src/main/avro/all-events.avsc', 'AVRO')
                .addReference("io.confluent.developer.avro.Pageview", "pageview", 1)
                .addReference("io.confluent.developer.avro.Purchase", "purchase", 1)
    }

}

And be sure to run the following command to obtain the Gradle wrapper:

gradle wrapper

Then, create a development configuration file at configuration/dev.properties:

bootstrap.servers=localhost:29092
schema.registry.url=http://localhost:8081
max.poll.interval.ms=300000
enable.auto.commit=true
auto.offset.reset=earliest
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
key.serializer=org.apache.kafka.common.serialization.StringSerializer

# Application specific properties
proto.topic.name=proto-events
proto.topic.partitions=1
proto.topic.replication.factor=1

avro.topic.name=avro-events
avro.topic.partitions=1
avro.topic.replication.factor=1

Create the Protobuf Schemas

6

Let’s say you have a microservice for an e-commerce site, and you track both customer pageviews and purchases. Since the pageviews could be highly related to any purchase, you’d like to capture the exact order of both of these event types as they occur, so producing the events to the same topic makes sense.

Since you use the customer ID for the key, you are guaranteed to see the exact order of events since both types will live in the same partition. Even though these two events are similar, you represent them as distinct domain objects as it fits in well with others in your organization that need the same data.

For the Protobuf portion of the tutorial, you’ll need to create three protobuf schemas. Two of the schemas represent the domain objects in the example scenario, and the third schema contains references to the other two schemas.

To get started, create a directory to place the schemas:

mkdir -p src/main/proto

Then, create this schema file for the purchase domain object at src/main/proto/purchase.proto

syntax = "proto3";

package io.confluent.developer.proto;
option java_outer_classname = "PurchaseProto";

message Purchase {
  string item = 1;
  double amount = 2;
  string customer_id = 3;
}

For this tutorial, we won’t go into the specifics of Protocol Buffers, but you can read the Proto 3 language guide and the Protobuf Java tutorial for details beyond the scope of this tutorial.

Next, create the schema for the pageview object at src/main/proto/pageview.proto

syntax = "proto3";

package io.confluent.developer.proto;
option java_outer_classname = "PageviewProto";

message Pageview {
  string url = 1;
  bool is_special = 2;
  string customer_id = 3;
}

Now that you have the schemas in place for your two domain objects, you’ll create the schema that references the other two.

Go ahead and create the file src/main/proto/customer-event.proto, and then we’ll review the important parts of it.

syntax = "proto3";

package io.confluent.developer.proto;

import "purchase.proto";
import "pageview.proto";  (1)

option java_outer_classname = "CustomerEventProto";

message CustomerEvent {  (2)

  oneof action {   (3)
    Purchase purchase = 1;
    Pageview pageview = 2;
  }
  string id = 3;
}
1 Importing the other existing proto schema
2 The outer "container" event
3 A oneof field named action which will contain exactly one of the referenced types

This is where the "rubber hits the road" regarding schema references. Here you have the CustomerEvent object containing either a Purchase or a Pageview object in the action field. Instead of nesting schemas for these two objects, we reference existing ones. In addition to allowing for an effective way to combine multiple event types in the same topic while maintaining the TopicName subject name strategy, by using a reference you get the same benefits that you only have one place you need to go when you need to make schema updates.

Note that, with Protobuf, oneof can’t be a top-level field. It has to exist inside a "wrapper" class. This has implications when producing and consuming, which we will cover when creating the KafkaProducer and KafkaConsumer for this tutorial.

Create the Avro Schemas

7

Now you’ll take a similar step and create the Avro schemas; this is done for comparison and is not strictly required. As with Protobuf, you’ll have two schemas for the domain objects and a third schema that will contain the references. Avro has a distinct difference regarding the reference schema, and you’ll see it as we go through this section.

To get started, create a directory for the Avro schemas:

mkdir -p src/main/avro

Then, create this schema file for the purchase domain object at src/main/avro/purchase.avsc

{
  "type":"record",
  "namespace": "io.confluent.developer.avro",
  "name":"Purchase",
  "fields": [
    {"name": "item", "type":"string"},
    {"name": "amount", "type": "double"},
    {"name": "customer_id", "type": "string"}
  ]
}

Right away, you’ll notice one difference is that you write Avro schemas in JSON while Protobuf more closely resembles a programming language.

In this tutorial we won’t go into details about Avro. For more information you can read the Apache Avro documentation, Getting Started (Java) guide, and the Avro Specification.

Next, create the schema for the pageview object at src/main/avro/pageview.avsc

{
  "type":"record",
  "namespace": "io.confluent.developer.avro",
  "name":"Pageview",
  "fields": [
    {"name": "url", "type":"string"},
    {"name": "is_special", "type": "boolean"},
    {"name": "customer_id", "type":  "string"}
  ]
}

Now that you have the schemas in place for your two domain objects, you’ll create a third schema that references the other two.

Go ahead and create the file src/main/avro/all-events.avsc now:

[
  "io.confluent.developer.avro.Purchase",
  "io.confluent.developer.avro.Pageview"
]

The all-events.avsc file contains an Avro Union. The union type in Avro is analogous to the Protobuf oneof field in that it indicates that a field might have more than one datatype.

On the other hand, with Avro, a union can be a top-level element, so you don’t have to create a wrapper or container class; the Avro schema itself is a union and it can represent either of the types listed in the union. To be clear, you could create an Avro schema for a wrapper class and provide a union field within the schema, but we’re not going to cover that approach in this tutorial. The GitHub repo for the Multiple Events in Schema Registry Kafka Summit Europe 2021 presentation contains an example of using an outer Avro class containing a union field.

Now that you have created all of the necessary schema files, you need to compile them so that you can work with them in the application. The build.gradle file contains plugins for both Avro and Protobuf, so all you need to do is run the following command to generate the Java code files:

./gradlew build

Register the schemas

8

Next, you’ll need to register some schemas. When you have an Avro schema where the top-level element is a union, you need to register the individual schemas in the union first. Then, you’ll register the parent schema itself along with references to the schemas making up the union element.

Fortunately, the gradle Schema Registry plugin makes this easy for us. Here’s the configuration that you already have in the build.gradle file:

Avro schemas configuration for using references
register {
        subject('pageview', 'src/main/avro/pageview.avsc', 'AVRO')  (1)
        subject('purchase', 'src/main/avro/purchase.avsc', 'AVRO')
        subject('avro-events-value', 'src/main/avro/all_events.avsc', 'AVRO')   (2)
                .addReference("io.confluent.developer.avro.Pageview", "pageview", 1) (3)
                .addReference("io.confluent.developer.avro.Purchase", "purchase", 1)
    }
1 Registering the schemas for the referenced objects
2 The parent schema containing the references
3 Adding the references which point to the schemas registered previously

To register these Avro schemas, run this in the command line:

./gradlew registerSchemasTask

This task runs quickly, and you should see some text followed by this result in the console:

BUILD SUCCESSFUL

We don’t have a corresponding command to register schemas for Protobuf. Instead, you are going to use the auto-registration feature for the Protobuf schemas because Protobuf will recursively register any proto files included in the main schema. Using the Confluent UI you can view the uploaded schemas by clicking in the Schema Registry tab and click on the individual schemas to inspect them.

We’ll get into some of the details more in the next section.

Create the Protobuf Multi Event application

9

Create a directory for the Java files in this project:

mkdir -p src/main/java/io/confluent/developer

To complete this tutorial, you’ll build an application that uses a KafkaProducer and KafkaConsumer instance for producing both Avro and Protobuf. The approach you’ll take in this tutorial is not typical of applications you’ll build in a production setting. But, by using multiple clients, you can compare how to handle multiple event types for each serializer format.

To that end, the point of this sample application is this: you want capture pageview and purchase events in the exact order that they occur and you feel the best option is to have these events produced to the same topic. Since the customer ID will be the message key, you are guaranteed to get per-customer events in the order that they occur.

Let’s go over some of the key parts of the KafkaMultiEventConsumerApplication starting with the producer for the Protobuf events.

Since this an advanced topic, the tutorial doesn’t go into the basics of using a KafkaProducer. For more details see the KafkaProducer tutorial
KafkaProducer for Protobuf events
public void produceProtobufEvents(final Supplier<Producer<String, CustomerEventProto.CustomerEvent>> producerSupplier,
                                  final String topic,
                                  final List<CustomerEventProto.CustomerEvent> protoCustomerEvents) {

        try (Producer<String, CustomerEventProto.CustomerEvent> producer = producerSupplier.get()) { (1)
            protoCustomerEvents.stream()    (2)
                    .map((event -> new ProducerRecord<>(topic, event.getId(), event))) (3)
                    .forEach(producerRecord -> producer.send(producerRecord, ((metadata, exception)

      //Details left out for clarity

   // Relevant configurations

 protoProduceConfigs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaProtobufSerializer.class);
}
1 Retrieving the producer instance from the Supplier
2 Using a java.util.stream to map each event into a ProducerRecord then send them to the broker
3 Creating the ProducerRecord instance.

There are two points to emphasize here. The first is the type of the producer — it’s using CustomerEventProto.CustomerEvent. Since you must use an outer class with Protobuf, the generics on the producer are a concrete type. As a result, to set the key to be the customer ID you can call the CustomerEvent#getId method directly. Note the use of the Supplier to provide the producer this is done to delay the creation until the Supplier.get() method is executed. Using a supplier also makes testing easier by simplifying the process of providing a different implementation.

The second point is that you can use auto-registration feature of Schema Registry with Protobuf and the referenced schemas get registered recursively.

Next, let’s move on to the KafkaConsumer for the Protobuf application.

KafkaConsumer for multiple events in Protobuf
   consumerRecords.forEach(consumerRec -> {
    CustomerEventProto.CustomerEvent customerEvent = consumerRec.value();
    switch (customerEvent.getActionCase()) { (1)
        case PURCHASE:
            eventTracker.add(customerEvent.getPurchase().getItem()); (2)
            break;
        case PAGEVIEW:
            eventTracker.add(customerEvent.getPageview().getUrl()); (3)
            break;



// details left out for clarity
1 Using a switch statement for the different enum types
2 Adding the purchased item to the event tracker
3 Adding the pageview link to the event tracker

With Protobuf, when you have a oneof field, it generates an enum for each message that could be in the field—determining which type to work with can be done by using a switch statement. To retrieve the correct enum, you’ll use a get<field-name>Case method (in this case, getActionCase since the oneof field is named action).

Before you go on to create the application, we should mention the deserialization configurations that you need to set:

Configurations needed by the Protobuf Consumer
 protoConsumeConfigs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaProtobufDeserializer.class); (1)
 protoConsumeConfigs.put(KafkaProtobufDeserializerConfig.SPECIFIC_PROTOBUF_VALUE_TYPE, CustomerEventProto.CustomerEvent.class); (2)
1 Configurations for the Protobuf consumer to use the Protobuf deserializer
2 Setting the specific class type for the Protobuf deserializer

It should come as no surprise that you need to set the deserializer class to KafkaProtobufDeserializer for the Protobuf consumers. But, when working with multiple types, you still need to set the configuration for a specific type. For Protobuf it’s straight forward, setting the specific type to the outer class makes sense since the proto deserialization process knows how to handle the embedded types due to the schema.

Now go ahead and create the src/main/java/io/confluent/developer/MultiEventProtobufProduceConsumeApp.java file:

package io.confluent.developer;

import io.confluent.developer.proto.CustomerEventProto;
import io.confluent.developer.proto.PageviewProto;
import io.confluent.developer.proto.PurchaseProto;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializerConfig;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.FileInputStream;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Supplier;

public class MultiEventProtobufProduceConsumeApp implements AutoCloseable {

    public static final String CUSTOMER_ID = "wilecoyote";
    private static final Logger LOG = LoggerFactory.getLogger(MultiEventProtobufProduceConsumeApp.class);
    private volatile boolean keepConsumingProto = true;
    final ExecutorService executorService = Executors.newFixedThreadPool(2);

    public void produceProtobufEvents(final Supplier<Producer<String, CustomerEventProto.CustomerEvent>> producerSupplier,
                                      final String topic,
                                      final List<CustomerEventProto.CustomerEvent> protoCustomerEvents) {

        try (Producer<String, CustomerEventProto.CustomerEvent> producer = producerSupplier.get()) {
            protoCustomerEvents.stream()
                    .map((event -> new ProducerRecord<>(topic, event.getId(), event)))
                    .forEach(producerRecord -> producer.send(producerRecord, ((metadata, exception) -> {
                        if (exception != null) {
                            LOG.error("Error Protobuf producing message", exception);
                        } else {
                            LOG.debug("Produced Protobuf record offset {} timestamp {}", metadata.offset(), metadata.timestamp());
                        }
                    })));
        }
    }

    public void consumeProtoEvents(final Supplier<Consumer<String, CustomerEventProto.CustomerEvent>> consumerSupplier,
                                   final String topic,
                                   final List<String> eventTracker) {

        try (Consumer<String, CustomerEventProto.CustomerEvent> eventConsumer = consumerSupplier.get()) {
            eventConsumer.subscribe(Collections.singletonList(topic));
            while (keepConsumingProto) {
                ConsumerRecords<String, CustomerEventProto.CustomerEvent> consumerRecords = eventConsumer.poll(Duration.ofSeconds(1));
                consumerRecords.forEach(consumerRec -> {
                    CustomerEventProto.CustomerEvent customerEvent = consumerRec.value();
                    switch (customerEvent.getActionCase()) {
                        case PURCHASE:
                            eventTracker.add(String.format("Protobuf Purchase event -> %s", customerEvent.getPurchase().getItem()));
                            break;
                        case PAGEVIEW:
                            eventTracker.add(String.format("Protobuf Pageview event -> %s", customerEvent.getPageview().getUrl()));
                            break;
                        default:
                            LOG.error("Unexpected type - this shouldn't happen");
                    }
                });
            }
        }
    }

    public List<CustomerEventProto.CustomerEvent> protobufEvents() {
        CustomerEventProto.CustomerEvent.Builder customerEventBuilder = CustomerEventProto.CustomerEvent.newBuilder();
        PageviewProto.Pageview.Builder pageViewBuilder = PageviewProto.Pageview.newBuilder();
        PurchaseProto.Purchase.Builder purchaseBuilder = PurchaseProto.Purchase.newBuilder();
        List<CustomerEventProto.CustomerEvent> events = new ArrayList<>();

        PageviewProto.Pageview pageView = pageViewBuilder.setCustomerId(CUSTOMER_ID).setUrl("http://acme/traps").setIsSpecial(false).build();
        PageviewProto.Pageview pageViewII = pageViewBuilder.setCustomerId(CUSTOMER_ID).setUrl("http://acme/bombs").setIsSpecial(false).build();
        PageviewProto.Pageview pageViewIII = pageViewBuilder.setCustomerId(CUSTOMER_ID).setUrl("http://acme/bait").setIsSpecial(true).build();
        PurchaseProto.Purchase purchase = purchaseBuilder.setCustomerId(CUSTOMER_ID).setItem("road-runner-bait").setAmount(99.99).build();

        events.add(customerEventBuilder.setId(CUSTOMER_ID).setPageview(pageView).build());
        events.add(customerEventBuilder.setId(CUSTOMER_ID).setPageview(pageViewII).build());
        events.add(customerEventBuilder.setId(CUSTOMER_ID).setPageview(pageViewIII).build());
        events.add((customerEventBuilder.setId(CUSTOMER_ID).setPurchase(purchase)).build());

        return events;
    }

    @Override
    public void close() {
        keepConsumingProto = false;
        executorService.shutdown();
    }


    public void createTopics(final Properties allProps) {
        try (final AdminClient client = AdminClient.create(allProps)) {

            final List<NewTopic> topics = new ArrayList<>();

            topics.add(new NewTopic(
                    allProps.getProperty("proto.topic.name"),
                    Integer.parseInt(allProps.getProperty("proto.topic.partitions")),
                    Short.parseShort(allProps.getProperty("proto.topic.replication.factor"))));

            client.createTopics(topics);
        }
    }


    public static void main(String[] args) throws Exception {
        if (args.length < 1) {
            LOG.error("Must provide the path to the properties");
        }
        Properties properties = new Properties();
        try (FileInputStream fis = new FileInputStream(args[0])) {
            properties.load(fis);
        }

        Map<String, Object> commonConfigs = new HashMap<>();
        properties.forEach((key, value) -> commonConfigs.put((String) key, value));


        try (MultiEventProtobufProduceConsumeApp multiEventApp = new MultiEventProtobufProduceConsumeApp()) {
            multiEventApp.createTopics(properties);
            String protobufTopic = (String) commonConfigs.get("proto.topic.name");

            LOG.info("Producing Protobuf events now");
            multiEventApp.produceProtobufEvents(() -> new KafkaProducer<>(protoProduceConfigs(commonConfigs)), protobufTopic, multiEventApp.protobufEvents());

            List<String> protoEvents = new ArrayList<>();
            multiEventApp.executorService.submit(() -> multiEventApp.consumeProtoEvents(() -> new KafkaConsumer<>(protoConsumeConfigs(commonConfigs)), protobufTopic, protoEvents));
            while (protoEvents.size() < 3) {
                Thread.sleep(100);
            }
            LOG.info("Consumed Proto Events {}", protoEvents);
        }
    }

    @NotNull
    static Map<String, Object> protoConsumeConfigs(Map<String, Object> commonConfigs) {
        Map<String, Object> protoConsumeConfigs = new HashMap<>(commonConfigs);
        protoConsumeConfigs.put(ConsumerConfig.GROUP_ID_CONFIG, "protobuf-consumer-group");
        protoConsumeConfigs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaProtobufDeserializer.class);
        protoConsumeConfigs.put(KafkaProtobufDeserializerConfig.SPECIFIC_PROTOBUF_VALUE_TYPE, CustomerEventProto.CustomerEvent.class);
        return protoConsumeConfigs;
    }

    @NotNull
    static Map<String, Object> protoProduceConfigs(Map<String, Object> commonConfigs) {
        Map<String, Object> protoProduceConfigs = new HashMap<>(commonConfigs);
        protoProduceConfigs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaProtobufSerializer.class);
        return protoProduceConfigs;
    }
}

Create the Avro Multi Event application

10

Next up in the tutorial is to create the application for the Avro multiple events application.

KafkaProducer for Avro events
public void produceAvroEvents(final Supplier<Producer<String, SpecificRecordBase>> producerSupplier,
                              final String topic,
                              final List<SpecificRecordBase> avroEvents) {

        try (Producer<String, SpecificRecordBase> producer = producerSupplier.get()) {  (1)
           avroEvents.stream()  (2)
                    .map((event -> new ProducerRecord<>(topic, (String) event.get("customer_id"), event))) (3)
                    .forEach(producerRecord -> producer.send(producerRecord,
//Details left out for clarity

//Relevant configurations

 avroProduceConfigs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
 avroProduceConfigs.put(AbstractKafkaSchemaSerDeConfig.AUTO_REGISTER_SCHEMAS, false); (4)
 avroProduceConfigs.put(AbstractKafkaSchemaSerDeConfig.USE_LATEST_VERSION, true) (5)
1 Getting the producer from the supplier
2 Streaming over the provided collection of Avro events to send
3 Creating the ProducerRecord instance, note the use of map-like access to get the required field for the key
4 Specifying to disable automatic schema registration
5 Setting to use the latest schema

You have a very similar approach with the Avro producer as you did with the Protobuf version, but take a took at the type at annotation one - it’s an abstract class, SpecificRecordBase that every Avro generated class inherits from. Since the schema for the Avro multi-event topic uses a union at the top level, you don’t know the concrete type. Since you want to use the customer ID as the key you need to access the field in a map-like fashion by using the field name as it exists in the schema. This is possible because SpecificRecordBase implements the GenericRecord interface which provides the get method for retrieving a field value by name.

But the biggest difference is the configurations you provide to the producer for the Avro serializer, namely disabling automatic schema registration, otherwise it would override the union schema as the latest one. Additionally since you’ve set use.latest.version to true the serializer looks up the latest version, the union schema, and will use that for serialization. This blog post by Robert Yokota explains this mechanism in detail.

Next we’ll move on to creating the Avro consumer.

KafkaConsumer for multiple events in Avro
consumerRecords.forEach(consumerRec -> {
  SpecificRecord avroRecord = consumerRec.value(); (1)
  if (avroRecord instanceof Purchase) {    (2)
      Purchase purchase = (Purchase) avroRecord;  (3)
      eventTracker.add(purchase.getItem());
  } else if (avroRecord instanceof Pageview) {
      Pageview pageView = (Pageview) avroRecord;
      eventTracker.add(pageView.getUrl());

// details left out for clarity
1 Getting the record
2 Doing an instanceof check to determine the type
3 Casting to the appropriate concrete type

With the Avro consumer you’ll need to use the Java instanceof operator to determine concrete type for the record. Notice that here you’re using the SpecificRecord interface which every Avro generated object implements. Once you find the correct concrete type you cast the record to that type and extract the required information.

Before you go on to create the application we should mention quickly about the deserialization configurations you need to set

Configurations needed by the Avro Consumer
 avroConsumeConfigs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class); (1)
 avroConsumeConfigs.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true); (2)
1 Specifying to use the Avro deserializer for the Avro consumer
2 Setting the Avro deserializer to use the specific reader

It should come as no surprise that you need to set the deserializer class to KafkaAvroDeserializer for the Avro consumer. But when working with multiple types, you still need to set the configuration for a specific type. With Avro, even with the union schema, you’ll still need to specify to set the SPECIFIC_AVRO_READER_CONFIG to true to get the concrete types.

Go ahead and create the src/main/java/io/confluent/developer/MultiEventAvroProduceConsumeApp.java file:

package io.confluent.developer;

import io.confluent.developer.avro.Pageview;
import io.confluent.developer.avro.Purchase;
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.apache.avro.specific.SpecificRecord;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.FileInputStream;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Supplier;

public class MultiEventAvroProduceConsumeApp implements AutoCloseable{

    public static final String CUSTOMER_ID = "wilecoyote";
    private static final Logger LOG = LoggerFactory.getLogger(MultiEventAvroProduceConsumeApp.class);
    private volatile boolean keepConsumingAvro = true;
    final ExecutorService executorService = Executors.newFixedThreadPool(1);

    public void produceAvroEvents(final Supplier<Producer<String, SpecificRecordBase>> producerSupplier,
                                  final String topic,
                                  final List<SpecificRecordBase> avroEvents) {

        try (Producer<String, SpecificRecordBase> producer = producerSupplier.get()) {
           avroEvents.stream()
                    .map((event -> new ProducerRecord<>(topic, (String) event.get("customer_id"), event)))
                    .forEach(producerRecord -> producer.send(producerRecord, ((metadata, exception) -> {
                        if (exception != null) {
                            LOG.error("Error Avro producing message", exception);
                        } else {
                            LOG.debug("Produced Avro record offset {} timestamp {}", metadata.offset(), metadata.timestamp());
                        }
                    })));
        }
    }

    public void consumeAvroEvents(final Supplier<Consumer<String, SpecificRecordBase>> consumerSupplier,
                                  final String topic,
                                  final List<String> eventTracker) {
        try (Consumer<String, SpecificRecordBase> eventConsumer = consumerSupplier.get()) {
            eventConsumer.subscribe(Collections.singletonList(topic));
            while (keepConsumingAvro) {
                ConsumerRecords<String, SpecificRecordBase> consumerRecords = eventConsumer.poll(Duration.ofSeconds(1));
                consumerRecords.forEach(consumerRec -> {
                    SpecificRecord avroRecord = consumerRec.value();
                    if (avroRecord instanceof Purchase) {
                        Purchase purchase = (Purchase) avroRecord;
                        eventTracker.add(String.format("Avro Purchase event -> %s",purchase.getItem()));
                    } else if (avroRecord instanceof Pageview) {
                        Pageview pageView = (Pageview) avroRecord;
                        eventTracker.add(String.format("Avro Pageview event -> %s",pageView.getUrl()));
                    } else {
                        LOG.error("Unexpected type - this shouldn't happen");
                    }
                });
            }
        }
    }


    public List<SpecificRecordBase> avroEvents() {
        Pageview.Builder pageViewBuilder = Pageview.newBuilder();
        Purchase.Builder purchaseBuilder = Purchase.newBuilder();
        List<SpecificRecordBase> events = new ArrayList<>();

        Pageview pageView = pageViewBuilder.setCustomerId(CUSTOMER_ID).setUrl("http://acme/traps").setIsSpecial(false).build();
        Pageview pageViewII = pageViewBuilder.setCustomerId(CUSTOMER_ID).setUrl("http://acme/bombs").setIsSpecial(false).build();
        Pageview pageViewIII = pageViewBuilder.setCustomerId(CUSTOMER_ID).setUrl("http://acme/bait").setIsSpecial(true).build();
        Purchase purchase = purchaseBuilder.setCustomerId(CUSTOMER_ID).setItem("road-runner-bait").setAmount(99.99).build();

        events.add(pageView);
        events.add(pageViewII);
        events.add(pageViewIII);
        events.add(purchase);

        return events;
    }

    @Override
    public void close() {
        keepConsumingAvro = false;
        executorService.shutdown();
    }


    public void createTopics(final Properties allProps) {
        try (final AdminClient client = AdminClient.create(allProps)) {

            final List<NewTopic> topics = new ArrayList<>();

            topics.add(new NewTopic(
                    allProps.getProperty("avro.topic.name"),
                    Integer.parseInt(allProps.getProperty("avro.topic.partitions")),
                    Short.parseShort(allProps.getProperty("avro.topic.replication.factor"))));

            client.createTopics(topics);
        }
    }


    public static void main(String[] args) throws Exception {
        if (args.length < 1) {
            LOG.error("Must provide the path to the properties");
        }
        Properties properties = new Properties();
        try (FileInputStream fis = new FileInputStream(args[0])) {
            properties.load(fis);
        }

        Map<String, Object> commonConfigs = new HashMap<>();
        properties.forEach((key, value) -> commonConfigs.put((String) key, value));


        try (MultiEventAvroProduceConsumeApp multiEventApp = new MultiEventAvroProduceConsumeApp()) {
            multiEventApp.createTopics(properties);
            String avroTopic = (String) commonConfigs.get("avro.topic.name");

            LOG.info("Producing Avro events");
            multiEventApp.produceAvroEvents(() -> new KafkaProducer<>(avroProduceConfigs(commonConfigs)), avroTopic, multiEventApp.avroEvents());

            List<String> avroEvents = new ArrayList<>();

            multiEventApp.executorService.submit(() -> multiEventApp.consumeAvroEvents(() -> new KafkaConsumer<>(avroConsumeConfigs(commonConfigs)), avroTopic, avroEvents));
            while (avroEvents.size() < 3) {
                Thread.sleep(100);
            }

            LOG.info("Consumed Avro Events {}", avroEvents);
        }
    }


    @NotNull
    static Map<String, Object> avroConsumeConfigs(Map<String, Object> commonConfigs) {
        Map<String, Object> avroConsumeConfigs = new HashMap<>(commonConfigs);
        avroConsumeConfigs.put(ConsumerConfig.GROUP_ID_CONFIG, "avro-consumer-group");
        avroConsumeConfigs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
        avroConsumeConfigs.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
        return avroConsumeConfigs;
    }

    @NotNull
    static Map<String, Object> avroProduceConfigs(Map<String, Object> commonConfigs) {
        Map<String, Object> avroProduceConfigs = new HashMap<>(commonConfigs);
        avroProduceConfigs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
        avroProduceConfigs.put(AbstractKafkaSchemaSerDeConfig.AUTO_REGISTER_SCHEMAS, false);
        avroProduceConfigs.put(AbstractKafkaSchemaSerDeConfig.USE_LATEST_VERSION, true);
        return avroProduceConfigs;
    }
}

Compile and run both the Multi Event applications

11

In your terminal, run:

./gradlew shadowJar

Now that you have an uberjar, you can launch each application locally. When you run the following, you’ll see some output as the producer sends records to the Kafka broker, and you’ll also see the results of the multiple event consumer.

We’ll use a pipe and grep at the end of the command to filter out the logging from the clients which will make the results harder to see

First run the multi-event application for Protobuf:

java -cp build/libs/multiple-event-types-standalone-0.0.1.jar io.confluent.developer.MultiEventProtobufProduceConsumeApp configuration/dev.properties 2>&1 | grep 'io.confluent.developer'

The output should look something like this:

[main] INFO io.confluent.developer.MultiEventProtobufProduceConsumeApp - Producing Protobuf events now
	specific.protobuf.value.type = class io.confluent.developer.proto.CustomerEventProto$CustomerEvent
[main] INFO io.confluent.developer.MultiEventProtobufProduceConsumeApp - Consumed Proto Events [Protobuf Pageview event -> http://acme/traps, Protobuf Pageview event -> http://acme/bombs, Protobuf Pageview event -> http://acme/bait, Protobuf Purchase event -> road-runner-bait]

Then run the multi-event application for Avro:

java -cp build/libs/multiple-event-types-standalone-0.0.1.jar io.confluent.developer.MultiEventAvroProduceConsumeApp configuration/dev.properties 2>&1 | grep 'io.confluent.developer'

The output should look something like this:

[main] INFO io.confluent.developer.MultiEventAvroProduceConsumeApp - Producing Avro events
[main] INFO io.confluent.developer.MultiEventAvroProduceConsumeApp - Consumed Avro Events [Avro Pageview event -> http://acme/traps, Avro Pageview event -> http://acme/bombs, Avro Pageview event -> http://acme/bait, Avro Purchase event -> road-runner-bait]

Test it

Create a test configuration file

1

First, create a test file at configuration/test.properties:

proto.topic=proto-records
avro.topic=avro-records
schema.registry.url=mock://multi-event-produce-consume-test

Write tests for the Avro and Protobuf applications

2

Create a directory for the tests to live in:

mkdir -p src/test/java/io/confluent/developer

Testing a KafkaProducer and KafkaConsumer used in an application is fairly easy to accomplish thanks to the MockProducer and the MockConsumer. Since both the KafkaProducer and KafkaConsumer are well tested, we don’t need to test the clients themselves. Instead, we’ll use mocks to verify that our logic executes as expected.

There are two test classes MultiEventAvroProduceConsumeAppTest and MultiEventProtobufProduceConsumeAppTest (one for the Avro application and the Protobuf application). Before you create the tests, let’s look at some of key parts of using a mock producer and consumer.

Replaying the history of produced records
// Details left out for clarity

MockProducer<String, CustomerEventProto.CustomerEvent> mockProtoProducer
                = new MockProducer<>(true, stringSerializer, protobufSerializer); (1)
List<CustomerEventProto.CustomerEvent> events = produceConsumeApp.protobufEvents();
produceConsumeApp.produceProtobufEvents(() -> mockProtoProducer, (String) commonConfigs.get("proto.topic"), events);(2)

 actualKeyValues = mockProtoProducer.history().stream().map(this::toKeyValue).collect(Collectors.toList()); (3)
assertThat(actualKeyValues, equalTo(expectedKeyValues));
1 Creating the MockProducer
2 Executing the produce of Protobuf records with the mock producer
3 Replaying the history of the producer

In annotation 3 above, we can use a mock producer in the test to validate that all the records we expected to be produced were sent to the producer correctly. The test for the Avro producer has identical logic so we won’t review it here, but you can view the full source code if you’d like to see it.

For testing the consumer, it’s a little tricky because the consumer polls for records and will continue polling until you close the application. The MockConsumer provides a method schedulePollTask where you provide the action you want to take at each poll call.

Driving the behavior of a consumer poll
  mockConsumer.schedulePollTask(() -> {  (1)
        addTopicPartitionsAssignment(topic, mockConsumer);
        addConsumerRecords(mockConsumer, produceConsumeApp.protobufEvents(), CustomerEventProto.CustomerEvent::getId, topic);
    });
  mockConsumer.schedulePollTask(() -> produceConsumeApp.close()); (2)
1 Assigning the topic-partitions and records in the first poll call
2 Shutting down the application in the next call

For the first poll call, we’ll assign the topic partitions and then provide the records to the consumer to process. In the next poll call, we simply shut the application down. Note that the methods in the first schedulePollTask are internal to the test. To fully understand what’s going on, you’ll need to look at the source code for the test. The test for the Avro multi-event application more or less uses the same logic, so we won’t review that test here.

Go ahead and create the following file for the Protobuf application test at src/test/java/io/confluent/developer/MultiEventProtobufProduceConsumeAppTest.java.

package io.confluent.developer;

import io.confluent.developer.proto.CustomerEventProto;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KeyValue;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;

public class MultiEventProtobufProduceConsumeAppTest {
    private static final Map<String, Object> commonConfigs = new HashMap<>();
    private static final Properties properties = new Properties();
    private final Serializer<String> stringSerializer = new StringSerializer();
    private MultiEventProtobufProduceConsumeApp produceConsumeApp;

    @BeforeClass
    public static void beforeAllTests() throws IOException {
        try (FileInputStream fis = new FileInputStream("configuration/test.properties")) {
            properties.load(fis);
            properties.forEach((key, value) -> commonConfigs.put((String) key, value));
        }
    }


    @Before
    public void setup() {
        produceConsumeApp = new MultiEventProtobufProduceConsumeApp();
    }

    @Test
    public void testProduceProtobufMultipleEvents() {
        KafkaProtobufSerializer<CustomerEventProto.CustomerEvent> protobufSerializer
                = new KafkaProtobufSerializer<>();
        protobufSerializer.configure(commonConfigs, false);
        MockProducer<String, CustomerEventProto.CustomerEvent> mockProtoProducer
                = new MockProducer<>(true, stringSerializer, protobufSerializer);
        List<CustomerEventProto.CustomerEvent> events = produceConsumeApp.protobufEvents();
        produceConsumeApp.produceProtobufEvents(() -> mockProtoProducer, (String) commonConfigs.get("proto.topic"), events);
        List<KeyValue<String, CustomerEventProto.CustomerEvent>> expectedKeyValues =
                produceConsumeApp.protobufEvents().stream().map((e -> KeyValue.pair(e.getId(), e))).collect(Collectors.toList());

        List<KeyValue<String, CustomerEventProto.CustomerEvent>> actualKeyValues =
                mockProtoProducer.history().stream().map(this::toKeyValue).collect(Collectors.toList());
        assertThat(actualKeyValues, equalTo(expectedKeyValues));
    }

    @Test
    public void testConsumeProtobufEvents() {
        MockConsumer<String, CustomerEventProto.CustomerEvent> mockConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
        String topic = (String) commonConfigs.get("proto.topic");
        List<String> expectedProtoResults = Arrays.asList("Protobuf Pageview event -> http://acme/traps", "Protobuf Pageview event -> http://acme/bombs", "Protobuf Pageview event -> http://acme/bait", "Protobuf Purchase event -> road-runner-bait");
        List<String> actualProtoResults = new ArrayList<>();
        mockConsumer.schedulePollTask(()-> {
            addTopicPartitionsAssignment(topic, mockConsumer);
            addConsumerRecords(mockConsumer, produceConsumeApp.protobufEvents(), CustomerEventProto.CustomerEvent::getId, topic);
        });
        mockConsumer.schedulePollTask(() -> produceConsumeApp.close());
        produceConsumeApp.consumeProtoEvents(() -> mockConsumer, topic, actualProtoResults);
        assertThat(actualProtoResults, equalTo(expectedProtoResults));
    }

    private <K, V> KeyValue<K, V> toKeyValue(final ProducerRecord<K, V> producerRecord) {
        return KeyValue.pair(producerRecord.key(), producerRecord.value());
    }

    private <V> void addTopicPartitionsAssignment(final String topic,
                                                  final MockConsumer<String, V> mockConsumer) {
        final TopicPartition topicPartition = new TopicPartition(topic, 0);
        final Map<TopicPartition, Long> beginningOffsets = new HashMap<>();
        beginningOffsets.put(topicPartition, 0L);
        mockConsumer.rebalance(Collections.singletonList(topicPartition));
        mockConsumer.updateBeginningOffsets(beginningOffsets);
    }

    private <V> void addConsumerRecords(final MockConsumer<String, V> mockConsumer,
                                        final List<V> records,
                                        final Function<V, String> keyFunction,
                                        final String topic) {
        AtomicInteger offset = new AtomicInteger(0);
        records.stream()
                .map(r -> new ConsumerRecord<>(topic, 0, offset.getAndIncrement(), keyFunction.apply(r), r))
                .forEach(mockConsumer::addRecord);
    }
}

Then, create the file for the Avro application test at src/test/java/io/confluent/developer/MultiEventAvroProduceConsumeAppTest.java.

package io.confluent.developer;

import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KeyValue;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;

public class MultiEventAvroProduceConsumeAppTest {
    private static final Map<String, Object> commonConfigs = new HashMap<>();
    private static final Properties properties = new Properties();
    private final Serializer<String> stringSerializer = new StringSerializer();
    private MultiEventAvroProduceConsumeApp produceConsumeApp;

    @BeforeClass
    public static void beforeAllTests() throws IOException {
        try (FileInputStream fis = new FileInputStream("configuration/test.properties")) {
            properties.load(fis);
            properties.forEach((key, value) -> commonConfigs.put((String) key, value));
        }
    }


    @Before
    public void setup() {
        produceConsumeApp = new MultiEventAvroProduceConsumeApp();
    }

    @Test
    @SuppressWarnings("unchecked")
    public void testProduceAvroMultipleEvents() {
        KafkaAvroSerializer avroSerializer
                = new KafkaAvroSerializer();
        avroSerializer.configure(commonConfigs, false);
        MockProducer<String, SpecificRecordBase> mockAvroProducer
                = new MockProducer<String, SpecificRecordBase>(true, stringSerializer, (Serializer) avroSerializer);
        produceConsumeApp.produceAvroEvents(() -> mockAvroProducer, (String) commonConfigs.get("proto.topic"), produceConsumeApp.avroEvents());
        List<KeyValue<String, SpecificRecordBase>> expectedKeyValues =
                produceConsumeApp.avroEvents().stream().map((e -> KeyValue.pair((String) e.get("customer_id"), e))).collect(Collectors.toList());

        List<KeyValue<String, SpecificRecordBase>> actualKeyValues =
                mockAvroProducer.history().stream().map(this::toKeyValue).collect(Collectors.toList());
        assertThat(actualKeyValues, equalTo(expectedKeyValues));
    }

    @Test
    public void testConsumeAvroEvents() {
        MockConsumer<String, SpecificRecordBase> mockConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
        String topic = (String) commonConfigs.get("avro.topic");
        List<String> expectedAvroResults = Arrays.asList("Avro Pageview event -> http://acme/traps", "Avro Pageview event -> http://acme/bombs", "Avro Pageview event -> http://acme/bait", "Avro Purchase event -> road-runner-bait");
        List<String> actualAvroResults = new ArrayList<>();
        mockConsumer.schedulePollTask(() -> {
            addTopicPartitionsAssignment(topic, mockConsumer);
            addConsumerRecords(mockConsumer, produceConsumeApp.avroEvents(), (SpecificRecordBase r) -> (String) r.get("customer_id"), topic);
        });
        mockConsumer.schedulePollTask(() -> produceConsumeApp.close());
        produceConsumeApp.consumeAvroEvents(() -> mockConsumer, topic, actualAvroResults);
        assertThat(actualAvroResults, equalTo(expectedAvroResults));
    }

    private <K, V> KeyValue<K, V> toKeyValue(final ProducerRecord<K, V> producerRecord) {
        return KeyValue.pair(producerRecord.key(), producerRecord.value());
    }

    private <V> void addTopicPartitionsAssignment(final String topic,
                                                  final MockConsumer<String, V> mockConsumer) {
        final TopicPartition topicPartition = new TopicPartition(topic, 0);
        final Map<TopicPartition, Long> beginningOffsets = new HashMap<>();
        beginningOffsets.put(topicPartition, 0L);
        mockConsumer.rebalance(Collections.singletonList(topicPartition));
        mockConsumer.updateBeginningOffsets(beginningOffsets);
    }

    private <V> void addConsumerRecords(final MockConsumer<String, V> mockConsumer,
                                        final List<V> records,
                                        final Function<V, String> keyFunction,
                                        final String topic) {
        AtomicInteger offset = new AtomicInteger(0);
        records.stream()
                .map(r -> new ConsumerRecord<>(topic, 0, offset.getAndIncrement(), keyFunction.apply(r), r))
                .forEach(mockConsumer::addRecord);
    }
}

Invoke the tests

3

Now run the tests, which is as simple as:

./gradlew test

Deploy on Confluent Cloud

Run your app with Confluent Cloud

1

Instead of running a local Kafka cluster, you may use Confluent Cloud, a fully managed Apache Kafka service.

  1. Sign up for Confluent Cloud, a fully managed Apache Kafka service.

  2. After you log in to Confluent Cloud Console, click Environments in the lefthand navigation, click on Add cloud environment, and name the environment learn-kafka. Using a new environment keeps your learning resources separate from your other Confluent Cloud resources.

  3. From the Billing & payment section in the menu, apply the promo code CC100KTS to receive an additional $100 free usage on Confluent Cloud (details). To avoid having to enter a credit card, add an additional promo code CONFLUENTDEV1. With this promo code, you will not have to enter a credit card for 30 days or until your credits run out.

  4. Click on LEARN and follow the instructions to launch a Kafka cluster and enable Schema Registry.

Confluent Cloud

Next, from the Confluent Cloud Console, click on Clients to get the cluster-specific configurations, e.g., Kafka cluster bootstrap servers and credentials, Confluent Cloud Schema Registry and credentials, etc., and set the appropriate parameters in your client application. In the case of this tutorial, add the following properties to the client application’s input properties file, substituting all curly braces with your Confluent Cloud values.

# Required connection configs for Kafka producer, consumer, and admin
bootstrap.servers={{ BROKER_ENDPOINT }}
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='{{ CLUSTER_API_KEY }}' password='{{ CLUSTER_API_SECRET }}';
sasl.mechanism=PLAIN
# Required for correctness in Apache Kafka clients prior to 2.6
client.dns.lookup=use_all_dns_ips

# Best practice for Kafka producer to prevent data loss
acks=all

# Required connection configs for Confluent Cloud Schema Registry
schema.registry.url=https://{{ SR_ENDPOINT }}
basic.auth.credentials.source=USER_INFO
schema.registry.basic.auth.user.info={{ SR_API_KEY }}:{{ SR_API_SECRET }}

Now you’re all set to run your streaming application locally, backed by a Kafka cluster fully managed by Confluent Cloud.