Get Started Free
Screen Shot 2021-07-23 at 4.13.50 PM

Sophie Blee-Goldman

Senior Software Engineer (Presenter)

bill-bejeck

Bill Bejeck

Integration Architect (Author)

Testing

Kafka Streams connects to brokers. But in unit testing terms, it’s expensive to have all of your tests rely on a broker connection. You do want to have some level of integration testing, but you want to use a unit-test type of framework. Furthermore, with Kafka Streams, you have connected components in a topology, and ideally you'd like to test each one in isolation, but you'd also like an end-to-end test that is as fast as possible.

TopologyTestDriver solves these issues: it lets you unit test a topology in an end-to-end test, but without a broker. Integration testing should still be done with a live broker, but it can be done sparingly, and it's not needed for all tests.

TopologyTestDriver

With the TopologyTestDriver, you build your topology as usual, including all of the configurations. Even if you're using Schema Registry with Kafka Streams, you can still use the TopologyTestDriver. MockSchemaRegistry is an in-memory version of Schema Registry, and can be specified by the URL provided in the configuration; instead of http:// you put in mock://, and the test will automatically use the MockSchemaRegistry.

First, you instantiate your TopologyTestDriver, using your topology and configurations as constructor parameters. Then you create TestInputTopic instances. These feed your topology records. Next, you call TestInputTopic.pipeInput with KeyValue objects. There are also overloaded methods that allow you to provide timestamps, lists of records, etc.

When you execute TestInputTopic.pipeInput, it triggers stream-time punctuation. So if you don't provide timestamps on the records, then under the hood, each record is advanced by current wall-clock time. But you can provide your own timestamps within the records to trigger certain behaviors that would happen within a Kafka Streams application.

Within TopologyTestDriver's wall-clock punctuation, you can trigger punctuation based on stream time using the timestamps that you give to the records. But wall-clock punctuation will only be fired if you call a method called advanceWallClockTime.

TestOutputTopicInstances

TestOutputTopicInstances mock out the sink nodes, the topics to which you will write. After you've called .pipeInput for all of the records you've sent through, you call TestInputTopic.readKeyValue and assert the results. There are overloaded methods to read all of the values in a list, read KeyValues in a list, and read KeyValues in a map.

Testable Applications

The Kafka Streams DSL has several operators that take a SAM interface, so you can use lambda expressions with them. The downside is that you can't easily test the lambdas in isolation; you have to test them with the topology as you've wired it up. So you might want to consider instead writing a concrete class, which would let you write a separate test for it.

Integration Tests

You always need some level of integration testing against a live broker. For example, you want to see how your stateful operations behave in a real environment. TopologyTestDriver doesn't have caching behavior or commits, and it doesn't write to real topics.

The best choice for brokers in an integration test is the TestContainers library: It has annotations that can easily control the broker lifecycle, and it's possible to share a container across multiple test classes, which leads to reduced testing time.

Use the promo code STREAMS101 to get $25 of free Confluent Cloud usage

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.