Create a directory for the tests to live in:
mkdir -p src/test/java/io/confluent/developer
Testing a KafkaProducer
and KafkaConsumer
used in an application is fairly easy to accomplish thanks to the MockProducer and the MockConsumer. Since both the KafkaProducer
and KafkaConsumer
are well tested, we don’t need to test the clients themselves. Instead, we’ll use mocks to verify that our logic executes as expected.
There are two test classes MultiEventAvroProduceConsumeAppTest
and MultiEventProtobufProduceConsumeAppTest
(one for the Avro application and the Protobuf application). Before you create the tests, let’s look at some of key parts of using a mock producer and consumer.
Replaying the history of produced records
// Details left out for clarity
MockProducer<String, CustomerEventProto.CustomerEvent> mockProtoProducer
= new MockProducer<>(true, stringSerializer, protobufSerializer); (1)
List<CustomerEventProto.CustomerEvent> events = produceConsumeApp.protobufEvents();
produceConsumeApp.produceProtobufEvents(() -> mockProtoProducer, (String) commonConfigs.get("proto.topic"), events);(2)
actualKeyValues = mockProtoProducer.history().stream().map(this::toKeyValue).collect(Collectors.toList()); (3)
assertThat(actualKeyValues, equalTo(expectedKeyValues));
1 |
Creating the MockProducer |
2 |
Executing the produce of Protobuf records with the mock producer |
3 |
Replaying the history of the producer |
In annotation 3 above, we can use a mock producer in the test to validate that all the records we expected to be produced were sent to the producer correctly. The test for the Avro producer has identical logic so we won’t review it here, but you can view the full source code if you’d like to see it.
For testing the consumer, it’s a little tricky because the consumer polls for records and will continue polling until you close the application. The MockConsumer
provides a method schedulePollTask
where you provide the action you want to take at each poll call.
Driving the behavior of a consumer poll
mockConsumer.schedulePollTask(() -> { (1)
addTopicPartitionsAssignment(topic, mockConsumer);
addConsumerRecords(mockConsumer, produceConsumeApp.protobufEvents(), CustomerEventProto.CustomerEvent::getId, topic);
});
mockConsumer.schedulePollTask(() -> produceConsumeApp.close()); (2)
1 |
Assigning the topic-partitions and records in the first poll call |
2 |
Shutting down the application in the next call |
For the first poll
call, we’ll assign the topic partitions and then provide the records to the consumer to process. In the next poll
call, we simply shut the application down. Note that the methods in the first schedulePollTask
are internal to the test. To fully understand what’s going on, you’ll need to look at the source code for the test. The test for the Avro multi-event application more or less uses the same logic, so we won’t review that test here.
Go ahead and create the following file for the Protobuf application test at src/test/java/io/confluent/developer/MultiEventProtobufProduceConsumeAppTest.java
.
package io.confluent.developer;
import io.confluent.developer.proto.CustomerEventProto;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KeyValue;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
public class MultiEventProtobufProduceConsumeAppTest {
private static final Map<String, Object> commonConfigs = new HashMap<>();
private static final Properties properties = new Properties();
private final Serializer<String> stringSerializer = new StringSerializer();
private MultiEventProtobufProduceConsumeApp produceConsumeApp;
@BeforeClass
public static void beforeAllTests() throws IOException {
try (FileInputStream fis = new FileInputStream("configuration/test.properties")) {
properties.load(fis);
properties.forEach((key, value) -> commonConfigs.put((String) key, value));
}
}
@Before
public void setup() {
produceConsumeApp = new MultiEventProtobufProduceConsumeApp();
}
@Test
public void testProduceProtobufMultipleEvents() {
KafkaProtobufSerializer<CustomerEventProto.CustomerEvent> protobufSerializer
= new KafkaProtobufSerializer<>();
protobufSerializer.configure(commonConfigs, false);
MockProducer<String, CustomerEventProto.CustomerEvent> mockProtoProducer
= new MockProducer<>(true, stringSerializer, protobufSerializer);
List<CustomerEventProto.CustomerEvent> events = produceConsumeApp.protobufEvents();
produceConsumeApp.produceProtobufEvents(() -> mockProtoProducer, (String) commonConfigs.get("proto.topic"), events);
List<KeyValue<String, CustomerEventProto.CustomerEvent>> expectedKeyValues =
produceConsumeApp.protobufEvents().stream().map((e -> KeyValue.pair(e.getId(), e))).collect(Collectors.toList());
List<KeyValue<String, CustomerEventProto.CustomerEvent>> actualKeyValues =
mockProtoProducer.history().stream().map(this::toKeyValue).collect(Collectors.toList());
assertThat(actualKeyValues, equalTo(expectedKeyValues));
}
@Test
public void testConsumeProtobufEvents() {
MockConsumer<String, CustomerEventProto.CustomerEvent> mockConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
String topic = (String) commonConfigs.get("proto.topic");
List<String> expectedProtoResults = Arrays.asList("Protobuf Pageview event -> http://acme/traps", "Protobuf Pageview event -> http://acme/bombs", "Protobuf Pageview event -> http://acme/bait", "Protobuf Purchase event -> road-runner-bait");
List<String> actualProtoResults = new ArrayList<>();
mockConsumer.schedulePollTask(()-> {
addTopicPartitionsAssignment(topic, mockConsumer);
addConsumerRecords(mockConsumer, produceConsumeApp.protobufEvents(), CustomerEventProto.CustomerEvent::getId, topic);
});
mockConsumer.schedulePollTask(() -> produceConsumeApp.close());
produceConsumeApp.consumeProtoEvents(() -> mockConsumer, topic, actualProtoResults);
assertThat(actualProtoResults, equalTo(expectedProtoResults));
}
private <K, V> KeyValue<K, V> toKeyValue(final ProducerRecord<K, V> producerRecord) {
return KeyValue.pair(producerRecord.key(), producerRecord.value());
}
private <V> void addTopicPartitionsAssignment(final String topic,
final MockConsumer<String, V> mockConsumer) {
final TopicPartition topicPartition = new TopicPartition(topic, 0);
final Map<TopicPartition, Long> beginningOffsets = new HashMap<>();
beginningOffsets.put(topicPartition, 0L);
mockConsumer.rebalance(Collections.singletonList(topicPartition));
mockConsumer.updateBeginningOffsets(beginningOffsets);
}
private <V> void addConsumerRecords(final MockConsumer<String, V> mockConsumer,
final List<V> records,
final Function<V, String> keyFunction,
final String topic) {
AtomicInteger offset = new AtomicInteger(0);
records.stream()
.map(r -> new ConsumerRecord<>(topic, 0, offset.getAndIncrement(), keyFunction.apply(r), r))
.forEach(mockConsumer::addRecord);
}
}
Then, create the file for the Avro application test at src/test/java/io/confluent/developer/MultiEventAvroProduceConsumeAppTest.java
.
package io.confluent.developer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KeyValue;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
public class MultiEventAvroProduceConsumeAppTest {
private static final Map<String, Object> commonConfigs = new HashMap<>();
private static final Properties properties = new Properties();
private final Serializer<String> stringSerializer = new StringSerializer();
private MultiEventAvroProduceConsumeApp produceConsumeApp;
@BeforeClass
public static void beforeAllTests() throws IOException {
try (FileInputStream fis = new FileInputStream("configuration/test.properties")) {
properties.load(fis);
properties.forEach((key, value) -> commonConfigs.put((String) key, value));
}
}
@Before
public void setup() {
produceConsumeApp = new MultiEventAvroProduceConsumeApp();
}
@Test
@SuppressWarnings("unchecked")
public void testProduceAvroMultipleEvents() {
KafkaAvroSerializer avroSerializer
= new KafkaAvroSerializer();
avroSerializer.configure(commonConfigs, false);
MockProducer<String, SpecificRecordBase> mockAvroProducer
= new MockProducer<String, SpecificRecordBase>(true, stringSerializer, (Serializer) avroSerializer);
produceConsumeApp.produceAvroEvents(() -> mockAvroProducer, (String) commonConfigs.get("proto.topic"), produceConsumeApp.avroEvents());
List<KeyValue<String, SpecificRecordBase>> expectedKeyValues =
produceConsumeApp.avroEvents().stream().map((e -> KeyValue.pair((String) e.get("customer_id"), e))).collect(Collectors.toList());
List<KeyValue<String, SpecificRecordBase>> actualKeyValues =
mockAvroProducer.history().stream().map(this::toKeyValue).collect(Collectors.toList());
assertThat(actualKeyValues, equalTo(expectedKeyValues));
}
@Test
public void testConsumeAvroEvents() {
MockConsumer<String, SpecificRecordBase> mockConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
String topic = (String) commonConfigs.get("avro.topic");
List<String> expectedAvroResults = Arrays.asList("Avro Pageview event -> http://acme/traps", "Avro Pageview event -> http://acme/bombs", "Avro Pageview event -> http://acme/bait", "Avro Purchase event -> road-runner-bait");
List<String> actualAvroResults = new ArrayList<>();
mockConsumer.schedulePollTask(() -> {
addTopicPartitionsAssignment(topic, mockConsumer);
addConsumerRecords(mockConsumer, produceConsumeApp.avroEvents(), (SpecificRecordBase r) -> (String) r.get("customer_id"), topic);
});
mockConsumer.schedulePollTask(() -> produceConsumeApp.close());
produceConsumeApp.consumeAvroEvents(() -> mockConsumer, topic, actualAvroResults);
assertThat(actualAvroResults, equalTo(expectedAvroResults));
}
private <K, V> KeyValue<K, V> toKeyValue(final ProducerRecord<K, V> producerRecord) {
return KeyValue.pair(producerRecord.key(), producerRecord.value());
}
private <V> void addTopicPartitionsAssignment(final String topic,
final MockConsumer<String, V> mockConsumer) {
final TopicPartition topicPartition = new TopicPartition(topic, 0);
final Map<TopicPartition, Long> beginningOffsets = new HashMap<>();
beginningOffsets.put(topicPartition, 0L);
mockConsumer.rebalance(Collections.singletonList(topicPartition));
mockConsumer.updateBeginningOffsets(beginningOffsets);
}
private <V> void addConsumerRecords(final MockConsumer<String, V> mockConsumer,
final List<V> records,
final Function<V, String> keyFunction,
final String topic) {
AtomicInteger offset = new AtomicInteger(0);
records.stream()
.map(r -> new ConsumerRecord<>(topic, 0, offset.getAndIncrement(), keyFunction.apply(r), r))
.forEach(mockConsumer::addRecord);
}
}