confluent kafka topic create avro-events
confluent kafka topic create proto-events
How can you have multiple event types in a topic and maintain topic-name subject constraints?
This tutorial requires access to an Apache Kafka cluster, and the quickest way to get started free is on Confluent Cloud, which provides Kafka as a fully managed service.
After you log in to Confluent Cloud, click Environments
in the lefthand navigation, click on Add cloud environment
, and name the environment learn-kafka
. Using a new environment keeps your learning resources separate from your other Confluent Cloud resources.
From the Billing & payment
section in the menu, apply the promo code CC100KTS
to receive an additional $100 free usage on Confluent Cloud (details). To avoid having to enter a credit card, add an additional promo code CONFLUENTDEV1
. With this promo code, you will not have to enter a credit card for 30 days or until your credits run out.
Click on LEARN and follow the instructions to launch a Kafka cluster and enable Schema Registry.
To get started, make a new directory anywhere you’d like for this project:
mkdir multiple-event-types && cd multiple-event-types
Next, create a directory for configuration data:
mkdir configuration
From the Confluent Cloud Console, navigate to your Kafka cluster and then select Clients
in the lefthand navigation. From the Clients
view, create a new client and click Java
to get the connection information customized to your cluster.
Create new credentials for your Kafka cluster and Schema Registry, writing in appropriate descriptions so that the keys are easy to find and delete later. The Confluent Cloud Console will show a configuration similar to below with your new credentials automatically populated (make sure Show API keys
is checked).
Copy and paste it into a configuration/ccloud.properties
file on your machine.
# Required connection configs for Kafka producer, consumer, and admin
bootstrap.servers={{ BOOTSTRAP_SERVERS }}
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='{{ CLUSTER_API_KEY }}' password='{{ CLUSTER_API_SECRET }}';
sasl.mechanism=PLAIN
# Required for correctness in Apache Kafka clients prior to 2.6
client.dns.lookup=use_all_dns_ips
# Best practice for Kafka producer to prevent data loss
acks=all
# Required connection configs for Confluent Cloud Schema Registry
schema.registry.url={{ SR_URL }}
basic.auth.credentials.source=USER_INFO
basic.auth.user.info={{ SR_API_KEY }}:{{ SR_API_SECRET }}
Do not directly copy and paste the above configuration. You must copy it from the Confluent Cloud Console so that it includes your Confluent Cloud information and credentials. |
This tutorial has some steps for Kafka topic management and producing and consuming events, for which you can use the Confluent Cloud Console or the Confluent CLI. Follow the instructions here to install the Confluent CLI, and then follow these steps connect the CLI to your Confluent Cloud cluster.
In this step, we’re going to create the topics needed for this tutorial.
Since you are going to produce records using Protobuf and Avro serialization, you’ll need two topics.
Use the following commands to create the topics:
confluent kafka topic create avro-events
confluent kafka topic create proto-events
Create the following Gradle build file, named build.gradle
, for the project:
buildscript {
repositories {
mavenCentral()
maven {
url = uri("https://packages.confluent.io/maven/")
}
maven {
url = uri("https://plugins.gradle.org/m2/")
}
maven {
url = uri("https://jitpack.io")
}
}
dependencies {
classpath "gradle.plugin.com.github.jengelman.gradle.plugins:shadow:7.0.0"
}
}
plugins {
id "java"
id "idea"
id "eclipse"
id "com.github.imflog.kafka-schema-registry-gradle-plugin" version "1.9.1"
id "com.google.protobuf" version "0.9.2"
id "com.github.davidmc24.gradle.plugin.avro" version "1.7.0"
}
sourceCompatibility = JavaVersion.VERSION_17
targetCompatibility = JavaVersion.VERSION_17
version = "0.0.1"
repositories {
mavenCentral()
maven {
url = uri("https://packages.confluent.io/maven/")
}
maven {
url = uri("https://jitpack.io")
}
}
apply plugin: "com.github.johnrengelman.shadow"
dependencies {
implementation 'com.google.protobuf:protobuf-java:3.22.2'
implementation 'org.apache.avro:avro:1.11.1'
implementation 'org.slf4j:slf4j-simple:2.0.7'
implementation 'org.apache.kafka:kafka-streams:3.4.0'
implementation ('org.apache.kafka:kafka-clients') {
version {
strictly '3.4.0'
}
}
testImplementation "junit:junit:4.13.2"
testImplementation 'org.hamcrest:hamcrest:2.2'
implementation "io.confluent:kafka-avro-serializer:7.1.4"
implementation "io.confluent:kafka-protobuf-serializer:7.2.2"
implementation "io.confluent:kafka-protobuf-provider:7.3.0"
}
protobuf {
generatedFilesBaseDir = "${project.buildDir}/generated-main-proto-java"
protoc {
artifact = 'com.google.protobuf:protoc:3.22.2'
}
}
test {
testLogging {
outputs.upToDateWhen { false }
showStandardStreams = true
exceptionFormat = "full"
}
}
jar {
manifest {
attributes(
"Class-Path": configurations.compileClasspath.collect { it.getName() }.join(" "),
"Main-Class": "io.confluent.developer.MultiEventProtobufProduceConsumeApp"
)
}
}
shadowJar {
archiveBaseName = "multiple-event-types-standalone"
archiveClassifier = ''
}
schemaRegistry {
def props = new Properties()
def configs = file("configuration/ccloud.properties")
if (configs.exists()) {
configs.withInputStream { props.load(it) }
def srUrl = props.getProperty("schema.registry.url")
def auth = props.getProperty("basic.auth.user.info").split(":")
println "Using Confluent properties Schema Registry endpoint:${srUrl}, username:${auth[0]},password:${auth[1]}"
url = srUrl
credentials {
// username is the characters up to the ':' in the basic.auth.user.info property
username = auth[0]
// password is everything after ':' in the basic.auth.user.info property
password = auth[1]
}
} else if (file("configuration/dev.properties").exists()) {
configs = file("configuration/dev.properties")
configs.withInputStream { props.load(it) }
def srUrl = props.getProperty("schema.registry.url")
println "Using local dev properties Schema Registry endpoint:${srUrl}"
} else {
println "No configs to parse yet"
}
// Possible types are ["JSON", "PROTOBUF", "AVRO"]
register {
subject('pageview', 'src/main/avro/pageview.avsc', 'AVRO')
subject('purchase', 'src/main/avro/purchase.avsc', 'AVRO')
subject('avro-events-value', 'src/main/avro/all-events.avsc', 'AVRO')
.addReference("io.confluent.developer.avro.Pageview", "pageview", 1)
.addReference("io.confluent.developer.avro.Purchase", "purchase", 1)
}
}
And be sure to run the following command to obtain the Gradle wrapper:
gradle wrapper
Then, create a development configuration file at configuration/dev.properties
:
max.poll.interval.ms=300000
enable.auto.commit=true
auto.offset.reset=earliest
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
key.serializer=org.apache.kafka.common.serialization.StringSerializer
# Application specific properties
proto.topic.name=proto-events
proto.topic.partitions=1
proto.topic.replication.factor=3
avro.topic.name=avro-events
avro.topic.partitions=1
avro.topic.replication.factor=3
Using the command below, append the contents of configuration/ccloud.properties
(with your Confluent Cloud configuration) to configuration/dev.properties
(with the application properties).
cat configuration/ccloud.properties >> configuration/dev.properties
Let’s say you have a microservice for an e-commerce site, and you track both customer pageviews and purchases. Since the pageviews could be highly related to any purchase, you’d like to capture the exact order of both of these event types as they occur, so producing the events to the same topic makes sense.
Since you use the customer ID for the key, you are guaranteed to see the exact order of events since both types will live in the same partition. Even though these two events are similar, you represent them as distinct domain objects as it fits in well with others in your organization that need the same data.
For the Protobuf portion of the tutorial, you’ll need to create three protobuf schemas. Two of the schemas represent the domain objects in the example scenario, and the third schema contains references to the other two schemas.
To get started, create a directory to place the schemas:
mkdir -p src/main/proto
Then, create this schema file for the purchase domain object at src/main/proto/purchase.proto
syntax = "proto3";
package io.confluent.developer.proto;
option java_outer_classname = "PurchaseProto";
message Purchase {
string item = 1;
double amount = 2;
string customer_id = 3;
}
For this tutorial, we won’t go into the specifics of Protocol Buffers, but you can read the Proto 3 language guide and the Protobuf Java tutorial for details beyond the scope of this tutorial.
Next, create the schema for the pageview object at src/main/proto/pageview.proto
syntax = "proto3";
package io.confluent.developer.proto;
option java_outer_classname = "PageviewProto";
message Pageview {
string url = 1;
bool is_special = 2;
string customer_id = 3;
}
Now that you have the schemas in place for your two domain objects, you’ll create the schema that references the other two.
Go ahead and create the file src/main/proto/customer-event.proto
, and then we’ll review the important parts of it.
syntax = "proto3";
package io.confluent.developer.proto;
import "purchase.proto";
import "pageview.proto"; (1)
option java_outer_classname = "CustomerEventProto";
message CustomerEvent { (2)
oneof action { (3)
Purchase purchase = 1;
Pageview pageview = 2;
}
string id = 3;
}
1 | Importing the other existing proto schema |
2 | The outer "container" event |
3 | A oneof field named action which will contain exactly one of the referenced types |
This is where the "rubber hits the road" regarding schema references. Here you have the CustomerEvent
object containing either a Purchase
or a Pageview
object in the action
field. Instead of nesting schemas for these two objects, we reference existing ones. In addition to allowing for an effective way to combine multiple event types in the same topic while maintaining the TopicName subject name strategy, by using a reference you get the same benefits that you only have one place you need to go when you need to make schema updates.
Note that, with Protobuf, oneof
can’t be a top-level field. It has to exist inside a "wrapper" class. This has implications when producing and consuming, which we will cover when creating the KafkaProducer
and KafkaConsumer
for this tutorial.
Now you’ll take a similar step and create the Avro schemas; this is done for comparison and is not strictly required. As with Protobuf, you’ll have two schemas for the domain objects and a third schema that will contain the references. Avro has a distinct difference regarding the reference schema, and you’ll see it as we go through this section.
To get started, create a directory for the Avro schemas:
mkdir -p src/main/avro
Then, create this schema file for the purchase domain object at src/main/avro/purchase.avsc
{
"type":"record",
"namespace": "io.confluent.developer.avro",
"name":"Purchase",
"fields": [
{"name": "item", "type":"string"},
{"name": "amount", "type": "double"},
{"name": "customer_id", "type": "string"}
]
}
Right away, you’ll notice one difference is that you write Avro schemas in JSON
while Protobuf more closely resembles a programming language.
In this tutorial we won’t go into details about Avro. For more information you can read the Apache Avro documentation, Getting Started (Java) guide, and the Avro Specification.
Next, create the schema for the pageview object at src/main/avro/pageview.avsc
{
"type":"record",
"namespace": "io.confluent.developer.avro",
"name":"Pageview",
"fields": [
{"name": "url", "type":"string"},
{"name": "is_special", "type": "boolean"},
{"name": "customer_id", "type": "string"}
]
}
Now that you have the schemas in place for your two domain objects, you’ll create a third schema that references the other two.
Go ahead and create the file src/main/avro/all-events.avsc
now:
[
"io.confluent.developer.avro.Purchase",
"io.confluent.developer.avro.Pageview"
]
The all-events.avsc
file contains an Avro Union. The union type in Avro is analogous to the Protobuf oneof
field in that it indicates that a field might have more than one datatype.
On the other hand, with Avro, a union
can be a top-level element, so you don’t have to create a wrapper or container class; the Avro schema itself is a union and it can represent either of the types listed in the union. To be clear, you could create an Avro schema for a wrapper class and provide a union
field within the schema, but we’re not going to cover that approach in this tutorial. The GitHub repo for the Multiple Events in Schema Registry Kafka Summit Europe 2021 presentation contains an example of using an outer Avro class containing a union field.
Now that you have created all of the necessary schema files, you need to compile them so that you can work with them in the application. The build.gradle
file contains plugins for both Avro and Protobuf, so all you need to do is run the following command to generate the Java code files:
./gradlew build
Next, you’ll need to register some schemas. When you have an Avro schema where the top-level element is a union, you need to register the individual schemas in the union first. Then, you’ll register the parent schema itself along with references to the schemas making up the union element.
Fortunately, the gradle Schema Registry plugin makes this easy for us. Here’s the configuration that you already have in the build.gradle
file:
register {
subject('pageview', 'src/main/avro/pageview.avsc', 'AVRO') (1)
subject('purchase', 'src/main/avro/purchase.avsc', 'AVRO')
subject('avro-events-value', 'src/main/avro/all_events.avsc', 'AVRO') (2)
.addReference("io.confluent.developer.avro.Pageview", "pageview", 1) (3)
.addReference("io.confluent.developer.avro.Purchase", "purchase", 1)
}
1 | Registering the schemas for the referenced objects |
2 | The parent schema containing the references |
3 | Adding the references which point to the schemas registered previously |
To register these Avro schemas, run this in the command line:
./gradlew registerSchemasTask
This task runs quickly, and you should see some text followed by this result in the console:
BUILD SUCCESSFUL
We don’t have a corresponding command to register schemas for Protobuf. Instead, you are going to use the auto-registration feature for the Protobuf schemas because Protobuf will recursively register any proto files included in the main schema. Using the Confluent Cloud Console, you can view the uploaded schemas by clicking in the Schema Registry
tab and clicking on the individual schemas to inspect them.
We’ll get into some of the details more in the next section.
Create a directory for the Java files in this project:
mkdir -p src/main/java/io/confluent/developer
To complete this tutorial, you’ll build an application that uses a KafkaProducer
and KafkaConsumer
instance for producing both Avro and Protobuf. The approach you’ll take in this tutorial is not typical of applications you’ll build in a production setting. But, by using multiple clients, you can compare how to handle multiple event types for each serializer format.
To that end, the point of this sample application is this: you want capture pageview and purchase events in the exact order that they occur and you feel the best option is to have these events produced to the same topic. Since the customer ID will be the message key, you are guaranteed to get per-customer events in the order that they occur.
Let’s go over some of the key parts of the KafkaMultiEventConsumerApplication
starting with the producer for the Protobuf events.
Since this an advanced topic, the tutorial doesn’t go into the basics of using a KafkaProducer . For more details see the KafkaProducer tutorial
|
public void produceProtobufEvents(final Supplier<Producer<String, CustomerEventProto.CustomerEvent>> producerSupplier,
final String topic,
final List<CustomerEventProto.CustomerEvent> protoCustomerEvents) {
try (Producer<String, CustomerEventProto.CustomerEvent> producer = producerSupplier.get()) { (1)
protoCustomerEvents.stream() (2)
.map((event -> new ProducerRecord<>(topic, event.getId(), event))) (3)
.forEach(producerRecord -> producer.send(producerRecord, ((metadata, exception)
//Details left out for clarity
// Relevant configurations
protoProduceConfigs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaProtobufSerializer.class);
}
1 | Retrieving the producer instance from the Supplier |
2 | Using a java.util.stream to map each event into a ProducerRecord then send them to the broker |
3 | Creating the ProducerRecord instance. |
There are two points to emphasize here. The first is the type of the producer — it’s using CustomerEventProto.CustomerEvent
. Since you must use an outer class with Protobuf, the generics on the producer are a concrete type. As a result, to set the key to be the customer ID you can call the CustomerEvent#getId
method directly. Note the use of the Supplier
to provide the producer this is done to delay the creation until the Supplier.get()
method is executed. Using a supplier also makes testing easier by simplifying the process of providing a different implementation.
The second point is that you can use auto-registration feature of Schema Registry with Protobuf and the referenced schemas get registered recursively.
Next, let’s move on to the KafkaConsumer
for the Protobuf application.
consumerRecords.forEach(consumerRec -> {
CustomerEventProto.CustomerEvent customerEvent = consumerRec.value();
switch (customerEvent.getActionCase()) { (1)
case PURCHASE:
eventTracker.add(customerEvent.getPurchase().getItem()); (2)
break;
case PAGEVIEW:
eventTracker.add(customerEvent.getPageview().getUrl()); (3)
break;
// details left out for clarity
1 | Using a switch statement for the different enum types |
2 | Adding the purchased item to the event tracker |
3 | Adding the pageview link to the event tracker |
With Protobuf, when you have a oneof
field, it generates an enum
for each message that could be in the field—determining which type to work with can be done by using a switch
statement. To retrieve the correct enum
, you’ll use a get<field-name>Case
method (in this case, getActionCase
since the oneof
field is named action
).
Before you go on to create the application, we should mention the deserialization configurations that you need to set:
protoConsumeConfigs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaProtobufDeserializer.class); (1)
protoConsumeConfigs.put(KafkaProtobufDeserializerConfig.SPECIFIC_PROTOBUF_VALUE_TYPE, CustomerEventProto.CustomerEvent.class); (2)
1 | Configurations for the Protobuf consumer to use the Protobuf deserializer |
2 | Setting the specific class type for the Protobuf deserializer |
It should come as no surprise that you need to set the deserializer class to KafkaProtobufDeserializer
for the Protobuf consumers. But, when working with multiple types, you still need to set the configuration for a specific type. For Protobuf it’s straight forward, setting the specific type to the outer class makes sense since the proto deserialization process knows how to handle the embedded types due to the schema.
Now go ahead and create the src/main/java/io/confluent/developer/MultiEventProtobufProduceConsumeApp.java
file:
package io.confluent.developer;
import io.confluent.developer.proto.CustomerEventProto;
import io.confluent.developer.proto.PageviewProto;
import io.confluent.developer.proto.PurchaseProto;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializerConfig;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.FileInputStream;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Supplier;
public class MultiEventProtobufProduceConsumeApp implements AutoCloseable {
public static final String CUSTOMER_ID = "wilecoyote";
private static final Logger LOG = LoggerFactory.getLogger(MultiEventProtobufProduceConsumeApp.class);
private volatile boolean keepConsumingProto = true;
final ExecutorService executorService = Executors.newFixedThreadPool(2);
public void produceProtobufEvents(final Supplier<Producer<String, CustomerEventProto.CustomerEvent>> producerSupplier,
final String topic,
final List<CustomerEventProto.CustomerEvent> protoCustomerEvents) {
try (Producer<String, CustomerEventProto.CustomerEvent> producer = producerSupplier.get()) {
protoCustomerEvents.stream()
.map((event -> new ProducerRecord<>(topic, event.getId(), event)))
.forEach(producerRecord -> producer.send(producerRecord, ((metadata, exception) -> {
if (exception != null) {
LOG.error("Error Protobuf producing message", exception);
} else {
LOG.debug("Produced Protobuf record offset {} timestamp {}", metadata.offset(), metadata.timestamp());
}
})));
}
}
public void consumeProtoEvents(final Supplier<Consumer<String, CustomerEventProto.CustomerEvent>> consumerSupplier,
final String topic,
final List<String> eventTracker) {
try (Consumer<String, CustomerEventProto.CustomerEvent> eventConsumer = consumerSupplier.get()) {
eventConsumer.subscribe(Collections.singletonList(topic));
while (keepConsumingProto) {
ConsumerRecords<String, CustomerEventProto.CustomerEvent> consumerRecords = eventConsumer.poll(Duration.ofSeconds(1));
consumerRecords.forEach(consumerRec -> {
CustomerEventProto.CustomerEvent customerEvent = consumerRec.value();
switch (customerEvent.getActionCase()) {
case PURCHASE:
eventTracker.add(String.format("Protobuf Purchase event -> %s", customerEvent.getPurchase().getItem()));
break;
case PAGEVIEW:
eventTracker.add(String.format("Protobuf Pageview event -> %s", customerEvent.getPageview().getUrl()));
break;
default:
LOG.error("Unexpected type - this shouldn't happen");
}
});
}
}
}
public List<CustomerEventProto.CustomerEvent> protobufEvents() {
CustomerEventProto.CustomerEvent.Builder customerEventBuilder = CustomerEventProto.CustomerEvent.newBuilder();
PageviewProto.Pageview.Builder pageViewBuilder = PageviewProto.Pageview.newBuilder();
PurchaseProto.Purchase.Builder purchaseBuilder = PurchaseProto.Purchase.newBuilder();
List<CustomerEventProto.CustomerEvent> events = new ArrayList<>();
PageviewProto.Pageview pageView = pageViewBuilder.setCustomerId(CUSTOMER_ID).setUrl("http://acme/traps").setIsSpecial(false).build();
PageviewProto.Pageview pageViewII = pageViewBuilder.setCustomerId(CUSTOMER_ID).setUrl("http://acme/bombs").setIsSpecial(false).build();
PageviewProto.Pageview pageViewIII = pageViewBuilder.setCustomerId(CUSTOMER_ID).setUrl("http://acme/bait").setIsSpecial(true).build();
PurchaseProto.Purchase purchase = purchaseBuilder.setCustomerId(CUSTOMER_ID).setItem("road-runner-bait").setAmount(99.99).build();
events.add(customerEventBuilder.setId(CUSTOMER_ID).setPageview(pageView).build());
events.add(customerEventBuilder.setId(CUSTOMER_ID).setPageview(pageViewII).build());
events.add(customerEventBuilder.setId(CUSTOMER_ID).setPageview(pageViewIII).build());
events.add((customerEventBuilder.setId(CUSTOMER_ID).setPurchase(purchase)).build());
return events;
}
@Override
public void close() {
keepConsumingProto = false;
executorService.shutdown();
}
public void createTopics(final Properties allProps) {
try (final AdminClient client = AdminClient.create(allProps)) {
final List<NewTopic> topics = new ArrayList<>();
topics.add(new NewTopic(
allProps.getProperty("proto.topic.name"),
Integer.parseInt(allProps.getProperty("proto.topic.partitions")),
Short.parseShort(allProps.getProperty("proto.topic.replication.factor"))));
client.createTopics(topics);
}
}
public static void main(String[] args) throws Exception {
if (args.length < 1) {
LOG.error("Must provide the path to the properties");
}
Properties properties = new Properties();
try (FileInputStream fis = new FileInputStream(args[0])) {
properties.load(fis);
}
Map<String, Object> commonConfigs = new HashMap<>();
properties.forEach((key, value) -> commonConfigs.put((String) key, value));
try (MultiEventProtobufProduceConsumeApp multiEventApp = new MultiEventProtobufProduceConsumeApp()) {
multiEventApp.createTopics(properties);
String protobufTopic = (String) commonConfigs.get("proto.topic.name");
LOG.info("Producing Protobuf events now");
multiEventApp.produceProtobufEvents(() -> new KafkaProducer<>(protoProduceConfigs(commonConfigs)), protobufTopic, multiEventApp.protobufEvents());
List<String> protoEvents = new ArrayList<>();
multiEventApp.executorService.submit(() -> multiEventApp.consumeProtoEvents(() -> new KafkaConsumer<>(protoConsumeConfigs(commonConfigs)), protobufTopic, protoEvents));
while (protoEvents.size() < 3) {
Thread.sleep(100);
}
LOG.info("Consumed Proto Events {}", protoEvents);
}
}
@NotNull
static Map<String, Object> protoConsumeConfigs(Map<String, Object> commonConfigs) {
Map<String, Object> protoConsumeConfigs = new HashMap<>(commonConfigs);
protoConsumeConfigs.put(ConsumerConfig.GROUP_ID_CONFIG, "protobuf-consumer-group");
protoConsumeConfigs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaProtobufDeserializer.class);
protoConsumeConfigs.put(KafkaProtobufDeserializerConfig.SPECIFIC_PROTOBUF_VALUE_TYPE, CustomerEventProto.CustomerEvent.class);
return protoConsumeConfigs;
}
@NotNull
static Map<String, Object> protoProduceConfigs(Map<String, Object> commonConfigs) {
Map<String, Object> protoProduceConfigs = new HashMap<>(commonConfigs);
protoProduceConfigs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaProtobufSerializer.class);
return protoProduceConfigs;
}
}
Next up in the tutorial is to create the application for the Avro multiple events application.
public void produceAvroEvents(final Supplier<Producer<String, SpecificRecordBase>> producerSupplier,
final String topic,
final List<SpecificRecordBase> avroEvents) {
try (Producer<String, SpecificRecordBase> producer = producerSupplier.get()) { (1)
avroEvents.stream() (2)
.map((event -> new ProducerRecord<>(topic, (String) event.get("customer_id"), event))) (3)
.forEach(producerRecord -> producer.send(producerRecord,
//Details left out for clarity
//Relevant configurations
avroProduceConfigs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
avroProduceConfigs.put(AbstractKafkaSchemaSerDeConfig.AUTO_REGISTER_SCHEMAS, false); (4)
avroProduceConfigs.put(AbstractKafkaSchemaSerDeConfig.USE_LATEST_VERSION, true) (5)
1 | Getting the producer from the supplier |
2 | Streaming over the provided collection of Avro events to send |
3 | Creating the ProducerRecord instance, note the use of map-like access to get the required field for the key |
4 | Specifying to disable automatic schema registration |
5 | Setting to use the latest schema |
You have a very similar approach with the Avro producer as you did with the Protobuf version, but take a took at the type at annotation one - it’s an abstract class, SpecificRecordBase
that every Avro generated class inherits from. Since the schema for the Avro multi-event topic uses a union
at the top level, you don’t know the concrete type. Since you want to use the customer ID as the key you need to access the field in a map-like fashion by using the field name as it exists in the schema. This is possible because SpecificRecordBase
implements the GenericRecord
interface which provides the get
method for retrieving a field value by name.
But the biggest difference is the configurations you provide to the producer for the Avro serializer, namely disabling automatic schema registration, otherwise it would override the union schema as the latest one. Additionally since you’ve set use.latest.version
to true
the serializer looks up the latest version, the union schema, and will use that for serialization. This blog post by Robert Yokota explains this mechanism in detail.
Next we’ll move on to creating the Avro consumer.
consumerRecords.forEach(consumerRec -> {
SpecificRecord avroRecord = consumerRec.value(); (1)
if (avroRecord instanceof Purchase) { (2)
Purchase purchase = (Purchase) avroRecord; (3)
eventTracker.add(purchase.getItem());
} else if (avroRecord instanceof Pageview) {
Pageview pageView = (Pageview) avroRecord;
eventTracker.add(pageView.getUrl());
// details left out for clarity
1 | Getting the record |
2 | Doing an instanceof check to determine the type |
3 | Casting to the appropriate concrete type |
With the Avro consumer you’ll need to use the Java instanceof
operator to determine concrete type for the record. Notice that here you’re using the SpecificRecord
interface which every Avro generated object implements. Once you find the correct concrete type you cast the record to that type and extract the required information.
Before you go on to create the application we should mention quickly about the deserialization configurations you need to set
avroConsumeConfigs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class); (1)
avroConsumeConfigs.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true); (2)
1 | Specifying to use the Avro deserializer for the Avro consumer |
2 | Setting the Avro deserializer to use the specific reader |
It should come as no surprise that you need to set the deserializer class to KafkaAvroDeserializer
for the Avro consumer. But when working with multiple types, you still need to set the configuration for a specific type. With Avro, even with the union
schema, you’ll still need to specify to set the SPECIFIC_AVRO_READER_CONFIG
to true
to get the concrete types.
Go ahead and create the src/main/java/io/confluent/developer/MultiEventAvroProduceConsumeApp.java
file:
package io.confluent.developer;
import io.confluent.developer.avro.Pageview;
import io.confluent.developer.avro.Purchase;
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.apache.avro.specific.SpecificRecord;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.FileInputStream;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Supplier;
public class MultiEventAvroProduceConsumeApp implements AutoCloseable{
public static final String CUSTOMER_ID = "wilecoyote";
private static final Logger LOG = LoggerFactory.getLogger(MultiEventAvroProduceConsumeApp.class);
private volatile boolean keepConsumingAvro = true;
final ExecutorService executorService = Executors.newFixedThreadPool(1);
public void produceAvroEvents(final Supplier<Producer<String, SpecificRecordBase>> producerSupplier,
final String topic,
final List<SpecificRecordBase> avroEvents) {
try (Producer<String, SpecificRecordBase> producer = producerSupplier.get()) {
avroEvents.stream()
.map((event -> new ProducerRecord<>(topic, (String) event.get("customer_id"), event)))
.forEach(producerRecord -> producer.send(producerRecord, ((metadata, exception) -> {
if (exception != null) {
LOG.error("Error Avro producing message", exception);
} else {
LOG.debug("Produced Avro record offset {} timestamp {}", metadata.offset(), metadata.timestamp());
}
})));
}
}
public void consumeAvroEvents(final Supplier<Consumer<String, SpecificRecordBase>> consumerSupplier,
final String topic,
final List<String> eventTracker) {
try (Consumer<String, SpecificRecordBase> eventConsumer = consumerSupplier.get()) {
eventConsumer.subscribe(Collections.singletonList(topic));
while (keepConsumingAvro) {
ConsumerRecords<String, SpecificRecordBase> consumerRecords = eventConsumer.poll(Duration.ofSeconds(1));
consumerRecords.forEach(consumerRec -> {
SpecificRecord avroRecord = consumerRec.value();
if (avroRecord instanceof Purchase) {
Purchase purchase = (Purchase) avroRecord;
eventTracker.add(String.format("Avro Purchase event -> %s",purchase.getItem()));
} else if (avroRecord instanceof Pageview) {
Pageview pageView = (Pageview) avroRecord;
eventTracker.add(String.format("Avro Pageview event -> %s",pageView.getUrl()));
} else {
LOG.error("Unexpected type - this shouldn't happen");
}
});
}
}
}
public List<SpecificRecordBase> avroEvents() {
Pageview.Builder pageViewBuilder = Pageview.newBuilder();
Purchase.Builder purchaseBuilder = Purchase.newBuilder();
List<SpecificRecordBase> events = new ArrayList<>();
Pageview pageView = pageViewBuilder.setCustomerId(CUSTOMER_ID).setUrl("http://acme/traps").setIsSpecial(false).build();
Pageview pageViewII = pageViewBuilder.setCustomerId(CUSTOMER_ID).setUrl("http://acme/bombs").setIsSpecial(false).build();
Pageview pageViewIII = pageViewBuilder.setCustomerId(CUSTOMER_ID).setUrl("http://acme/bait").setIsSpecial(true).build();
Purchase purchase = purchaseBuilder.setCustomerId(CUSTOMER_ID).setItem("road-runner-bait").setAmount(99.99).build();
events.add(pageView);
events.add(pageViewII);
events.add(pageViewIII);
events.add(purchase);
return events;
}
@Override
public void close() {
keepConsumingAvro = false;
executorService.shutdown();
}
public void createTopics(final Properties allProps) {
try (final AdminClient client = AdminClient.create(allProps)) {
final List<NewTopic> topics = new ArrayList<>();
topics.add(new NewTopic(
allProps.getProperty("avro.topic.name"),
Integer.parseInt(allProps.getProperty("avro.topic.partitions")),
Short.parseShort(allProps.getProperty("avro.topic.replication.factor"))));
client.createTopics(topics);
}
}
public static void main(String[] args) throws Exception {
if (args.length < 1) {
LOG.error("Must provide the path to the properties");
}
Properties properties = new Properties();
try (FileInputStream fis = new FileInputStream(args[0])) {
properties.load(fis);
}
Map<String, Object> commonConfigs = new HashMap<>();
properties.forEach((key, value) -> commonConfigs.put((String) key, value));
try (MultiEventAvroProduceConsumeApp multiEventApp = new MultiEventAvroProduceConsumeApp()) {
multiEventApp.createTopics(properties);
String avroTopic = (String) commonConfigs.get("avro.topic.name");
LOG.info("Producing Avro events");
multiEventApp.produceAvroEvents(() -> new KafkaProducer<>(avroProduceConfigs(commonConfigs)), avroTopic, multiEventApp.avroEvents());
List<String> avroEvents = new ArrayList<>();
multiEventApp.executorService.submit(() -> multiEventApp.consumeAvroEvents(() -> new KafkaConsumer<>(avroConsumeConfigs(commonConfigs)), avroTopic, avroEvents));
while (avroEvents.size() < 3) {
Thread.sleep(100);
}
LOG.info("Consumed Avro Events {}", avroEvents);
}
}
@NotNull
static Map<String, Object> avroConsumeConfigs(Map<String, Object> commonConfigs) {
Map<String, Object> avroConsumeConfigs = new HashMap<>(commonConfigs);
avroConsumeConfigs.put(ConsumerConfig.GROUP_ID_CONFIG, "avro-consumer-group");
avroConsumeConfigs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
avroConsumeConfigs.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
return avroConsumeConfigs;
}
@NotNull
static Map<String, Object> avroProduceConfigs(Map<String, Object> commonConfigs) {
Map<String, Object> avroProduceConfigs = new HashMap<>(commonConfigs);
avroProduceConfigs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
avroProduceConfigs.put(AbstractKafkaSchemaSerDeConfig.AUTO_REGISTER_SCHEMAS, false);
avroProduceConfigs.put(AbstractKafkaSchemaSerDeConfig.USE_LATEST_VERSION, true);
return avroProduceConfigs;
}
}
In your terminal, run:
./gradlew shadowJar
Now that you have an uberjar, you can launch each application locally. When you run the following, you’ll see some output as the producer sends records to the Kafka broker, and you’ll also see the results of the multiple event consumer.
We’ll use a pipe and grep at the end of the command to filter out the logging from the clients which will make the results harder to see |
First run the multi-event application for Protobuf:
java -cp build/libs/multiple-event-types-standalone-0.0.1.jar io.confluent.developer.MultiEventProtobufProduceConsumeApp configuration/dev.properties 2>&1 | grep 'io.confluent.developer'
The output should look something like this:
[main] INFO io.confluent.developer.MultiEventProtobufProduceConsumeApp - Producing Protobuf events now
specific.protobuf.value.type = class io.confluent.developer.proto.CustomerEventProto$CustomerEvent
[main] INFO io.confluent.developer.MultiEventProtobufProduceConsumeApp - Consumed Proto Events [Protobuf Pageview event -> http://acme/traps, Protobuf Pageview event -> http://acme/bombs, Protobuf Pageview event -> http://acme/bait, Protobuf Purchase event -> road-runner-bait]
Then run the multi-event application for Avro:
java -cp build/libs/multiple-event-types-standalone-0.0.1.jar io.confluent.developer.MultiEventAvroProduceConsumeApp configuration/dev.properties 2>&1 | grep 'io.confluent.developer'
The output should look something like this:
[main] INFO io.confluent.developer.MultiEventAvroProduceConsumeApp - Producing Avro events
[main] INFO io.confluent.developer.MultiEventAvroProduceConsumeApp - Consumed Avro Events [Avro Pageview event -> http://acme/traps, Avro Pageview event -> http://acme/bombs, Avro Pageview event -> http://acme/bait, Avro Purchase event -> road-runner-bait]
You may try another tutorial, but if you don’t plan on doing other tutorials, use the Confluent Cloud Console or CLI to destroy all of the resources you created. Verify they are destroyed to avoid unexpected charges.
First, create a test file at configuration/test.properties
:
proto.topic=proto-records
avro.topic=avro-records
schema.registry.url=mock://multi-event-produce-consume-test
Create a directory for the tests to live in:
mkdir -p src/test/java/io/confluent/developer
Testing a KafkaProducer
and KafkaConsumer
used in an application is fairly easy to accomplish thanks to the MockProducer and the MockConsumer. Since both the KafkaProducer
and KafkaConsumer
are well tested, we don’t need to test the clients themselves. Instead, we’ll use mocks to verify that our logic executes as expected.
There are two test classes MultiEventAvroProduceConsumeAppTest
and MultiEventProtobufProduceConsumeAppTest
(one for the Avro application and the Protobuf application). Before you create the tests, let’s look at some of key parts of using a mock producer and consumer.
// Details left out for clarity
MockProducer<String, CustomerEventProto.CustomerEvent> mockProtoProducer
= new MockProducer<>(true, stringSerializer, protobufSerializer); (1)
List<CustomerEventProto.CustomerEvent> events = produceConsumeApp.protobufEvents();
produceConsumeApp.produceProtobufEvents(() -> mockProtoProducer, (String) commonConfigs.get("proto.topic"), events);(2)
actualKeyValues = mockProtoProducer.history().stream().map(this::toKeyValue).collect(Collectors.toList()); (3)
assertThat(actualKeyValues, equalTo(expectedKeyValues));
1 | Creating the MockProducer |
2 | Executing the produce of Protobuf records with the mock producer |
3 | Replaying the history of the producer |
In annotation 3 above, we can use a mock producer in the test to validate that all the records we expected to be produced were sent to the producer correctly. The test for the Avro producer has identical logic so we won’t review it here, but you can view the full source code if you’d like to see it.
For testing the consumer, it’s a little tricky because the consumer polls for records and will continue polling until you close the application. The MockConsumer
provides a method schedulePollTask
where you provide the action you want to take at each poll call.
mockConsumer.schedulePollTask(() -> { (1)
addTopicPartitionsAssignment(topic, mockConsumer);
addConsumerRecords(mockConsumer, produceConsumeApp.protobufEvents(), CustomerEventProto.CustomerEvent::getId, topic);
});
mockConsumer.schedulePollTask(() -> produceConsumeApp.close()); (2)
1 | Assigning the topic-partitions and records in the first poll call |
2 | Shutting down the application in the next call |
For the first poll
call, we’ll assign the topic partitions and then provide the records to the consumer to process. In the next poll
call, we simply shut the application down. Note that the methods in the first schedulePollTask
are internal to the test. To fully understand what’s going on, you’ll need to look at the source code for the test. The test for the Avro multi-event application more or less uses the same logic, so we won’t review that test here.
Go ahead and create the following file for the Protobuf application test at src/test/java/io/confluent/developer/MultiEventProtobufProduceConsumeAppTest.java
.
package io.confluent.developer;
import io.confluent.developer.proto.CustomerEventProto;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KeyValue;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
public class MultiEventProtobufProduceConsumeAppTest {
private static final Map<String, Object> commonConfigs = new HashMap<>();
private static final Properties properties = new Properties();
private final Serializer<String> stringSerializer = new StringSerializer();
private MultiEventProtobufProduceConsumeApp produceConsumeApp;
@BeforeClass
public static void beforeAllTests() throws IOException {
try (FileInputStream fis = new FileInputStream("configuration/test.properties")) {
properties.load(fis);
properties.forEach((key, value) -> commonConfigs.put((String) key, value));
}
}
@Before
public void setup() {
produceConsumeApp = new MultiEventProtobufProduceConsumeApp();
}
@Test
public void testProduceProtobufMultipleEvents() {
KafkaProtobufSerializer<CustomerEventProto.CustomerEvent> protobufSerializer
= new KafkaProtobufSerializer<>();
protobufSerializer.configure(commonConfigs, false);
MockProducer<String, CustomerEventProto.CustomerEvent> mockProtoProducer
= new MockProducer<>(true, stringSerializer, protobufSerializer);
List<CustomerEventProto.CustomerEvent> events = produceConsumeApp.protobufEvents();
produceConsumeApp.produceProtobufEvents(() -> mockProtoProducer, (String) commonConfigs.get("proto.topic"), events);
List<KeyValue<String, CustomerEventProto.CustomerEvent>> expectedKeyValues =
produceConsumeApp.protobufEvents().stream().map((e -> KeyValue.pair(e.getId(), e))).collect(Collectors.toList());
List<KeyValue<String, CustomerEventProto.CustomerEvent>> actualKeyValues =
mockProtoProducer.history().stream().map(this::toKeyValue).collect(Collectors.toList());
assertThat(actualKeyValues, equalTo(expectedKeyValues));
}
@Test
public void testConsumeProtobufEvents() {
MockConsumer<String, CustomerEventProto.CustomerEvent> mockConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
String topic = (String) commonConfigs.get("proto.topic");
List<String> expectedProtoResults = Arrays.asList("Protobuf Pageview event -> http://acme/traps", "Protobuf Pageview event -> http://acme/bombs", "Protobuf Pageview event -> http://acme/bait", "Protobuf Purchase event -> road-runner-bait");
List<String> actualProtoResults = new ArrayList<>();
mockConsumer.schedulePollTask(()-> {
addTopicPartitionsAssignment(topic, mockConsumer);
addConsumerRecords(mockConsumer, produceConsumeApp.protobufEvents(), CustomerEventProto.CustomerEvent::getId, topic);
});
mockConsumer.schedulePollTask(() -> produceConsumeApp.close());
produceConsumeApp.consumeProtoEvents(() -> mockConsumer, topic, actualProtoResults);
assertThat(actualProtoResults, equalTo(expectedProtoResults));
}
private <K, V> KeyValue<K, V> toKeyValue(final ProducerRecord<K, V> producerRecord) {
return KeyValue.pair(producerRecord.key(), producerRecord.value());
}
private <V> void addTopicPartitionsAssignment(final String topic,
final MockConsumer<String, V> mockConsumer) {
final TopicPartition topicPartition = new TopicPartition(topic, 0);
final Map<TopicPartition, Long> beginningOffsets = new HashMap<>();
beginningOffsets.put(topicPartition, 0L);
mockConsumer.rebalance(Collections.singletonList(topicPartition));
mockConsumer.updateBeginningOffsets(beginningOffsets);
}
private <V> void addConsumerRecords(final MockConsumer<String, V> mockConsumer,
final List<V> records,
final Function<V, String> keyFunction,
final String topic) {
AtomicInteger offset = new AtomicInteger(0);
records.stream()
.map(r -> new ConsumerRecord<>(topic, 0, offset.getAndIncrement(), keyFunction.apply(r), r))
.forEach(mockConsumer::addRecord);
}
}
Then, create the file for the Avro application test at src/test/java/io/confluent/developer/MultiEventAvroProduceConsumeAppTest.java
.
package io.confluent.developer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KeyValue;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
public class MultiEventAvroProduceConsumeAppTest {
private static final Map<String, Object> commonConfigs = new HashMap<>();
private static final Properties properties = new Properties();
private final Serializer<String> stringSerializer = new StringSerializer();
private MultiEventAvroProduceConsumeApp produceConsumeApp;
@BeforeClass
public static void beforeAllTests() throws IOException {
try (FileInputStream fis = new FileInputStream("configuration/test.properties")) {
properties.load(fis);
properties.forEach((key, value) -> commonConfigs.put((String) key, value));
}
}
@Before
public void setup() {
produceConsumeApp = new MultiEventAvroProduceConsumeApp();
}
@Test
@SuppressWarnings("unchecked")
public void testProduceAvroMultipleEvents() {
KafkaAvroSerializer avroSerializer
= new KafkaAvroSerializer();
avroSerializer.configure(commonConfigs, false);
MockProducer<String, SpecificRecordBase> mockAvroProducer
= new MockProducer<String, SpecificRecordBase>(true, stringSerializer, (Serializer) avroSerializer);
produceConsumeApp.produceAvroEvents(() -> mockAvroProducer, (String) commonConfigs.get("proto.topic"), produceConsumeApp.avroEvents());
List<KeyValue<String, SpecificRecordBase>> expectedKeyValues =
produceConsumeApp.avroEvents().stream().map((e -> KeyValue.pair((String) e.get("customer_id"), e))).collect(Collectors.toList());
List<KeyValue<String, SpecificRecordBase>> actualKeyValues =
mockAvroProducer.history().stream().map(this::toKeyValue).collect(Collectors.toList());
assertThat(actualKeyValues, equalTo(expectedKeyValues));
}
@Test
public void testConsumeAvroEvents() {
MockConsumer<String, SpecificRecordBase> mockConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
String topic = (String) commonConfigs.get("avro.topic");
List<String> expectedAvroResults = Arrays.asList("Avro Pageview event -> http://acme/traps", "Avro Pageview event -> http://acme/bombs", "Avro Pageview event -> http://acme/bait", "Avro Purchase event -> road-runner-bait");
List<String> actualAvroResults = new ArrayList<>();
mockConsumer.schedulePollTask(() -> {
addTopicPartitionsAssignment(topic, mockConsumer);
addConsumerRecords(mockConsumer, produceConsumeApp.avroEvents(), (SpecificRecordBase r) -> (String) r.get("customer_id"), topic);
});
mockConsumer.schedulePollTask(() -> produceConsumeApp.close());
produceConsumeApp.consumeAvroEvents(() -> mockConsumer, topic, actualAvroResults);
assertThat(actualAvroResults, equalTo(expectedAvroResults));
}
private <K, V> KeyValue<K, V> toKeyValue(final ProducerRecord<K, V> producerRecord) {
return KeyValue.pair(producerRecord.key(), producerRecord.value());
}
private <V> void addTopicPartitionsAssignment(final String topic,
final MockConsumer<String, V> mockConsumer) {
final TopicPartition topicPartition = new TopicPartition(topic, 0);
final Map<TopicPartition, Long> beginningOffsets = new HashMap<>();
beginningOffsets.put(topicPartition, 0L);
mockConsumer.rebalance(Collections.singletonList(topicPartition));
mockConsumer.updateBeginningOffsets(beginningOffsets);
}
private <V> void addConsumerRecords(final MockConsumer<String, V> mockConsumer,
final List<V> records,
final Function<V, String> keyFunction,
final String topic) {
AtomicInteger offset = new AtomicInteger(0);
records.stream()
.map(r -> new ConsumerRecord<>(topic, 0, offset.getAndIncrement(), keyFunction.apply(r), r))
.forEach(mockConsumer::addRecord);
}
}
Now run the tests, which is as simple as:
./gradlew test