Get Started Free
Screen Shot 2021-07-23 at 4.13.50 PM

Sophie Blee-Goldman

Senior Software Engineer (Presenter)

bill-bejeck

Bill Bejeck

Integration Architect (Author)

Testing

Kafka Streams connects to brokers. But in unit testing terms, it’s expensive to have all of your tests rely on a broker connection. You do want to have some level of integration testing, but you want to use a unit-test type of framework. Furthermore, with Kafka Streams, you have connected components in a topology, and ideally you'd like to test each one in isolation, but you'd also like an end-to-end test that is as fast as possible.

TopologyTestDriver solves these issues: it lets you unit test a topology in an end-to-end test, but without a broker. Integration testing should still be done with a live broker, but it can be done sparingly, and it's not needed for all tests.

TopologyTestDriver

With the TopologyTestDriver, you build your topology as usual, including all of the configurations. Even if you're using Schema Registry with Kafka Streams, you can still use the TopologyTestDriver. MockSchemaRegistry is an in-memory version of Schema Registry, and can be specified by the URL provided in the configuration; instead of http:// you put in mock://, and the test will automatically use the MockSchemaRegistry.

First, you instantiate your TopologyTestDriver, using your topology and configurations as constructor parameters. Then you create TestInputTopic instances. These feed your topology records. Next, you call TestInputTopic.pipeInput with KeyValue objects. There are also overloaded methods that allow you to provide timestamps, lists of records, etc.

When you execute TestInputTopic.pipeInput, it triggers stream-time punctuation. So if you don't provide timestamps on the records, then under the hood, each record is advanced by current wall-clock time. But you can provide your own timestamps within the records to trigger certain behaviors that would happen within a Kafka Streams application.

Within TopologyTestDriver's wall-clock punctuation, you can trigger punctuation based on stream time using the timestamps that you give to the records. But wall-clock punctuation will only be fired if you call a method called advanceWallClockTime.

TestOutputTopicInstances

TestOutputTopicInstances mock out the sink nodes, the topics to which you will write. After you've called .pipeInput for all of the records you've sent through, you call TestInputTopic.readKeyValue and assert the results. There are overloaded methods to read all of the values in a list, read KeyValues in a list, and read KeyValues in a map.

Testable Applications

The Kafka Streams DSL has several operators that take a SAM interface, so you can use lambda expressions with them. The downside is that you can't easily test the lambdas in isolation; you have to test them with the topology as you've wired it up. So you might want to consider instead writing a concrete class, which would let you write a separate test for it.

Integration Tests

You always need some level of integration testing against a live broker. For example, you want to see how your stateful operations behave in a real environment. TopologyTestDriver doesn't have caching behavior or commits, and it doesn't write to real topics.

The best choice for brokers in an integration test is the TestContainers library: It has annotations that can easily control the broker lifecycle, and it's possible to share a container across multiple test classes, which leads to reduced testing time.

Use the promo code STREAMS101 & CONFLUENTDEV1 to get $25 of free Confluent Cloud usage and skip credit card entry.

Testing

Hi, I'm Sophie Blee-Goldman here with Confluent. And in this module, we're going to talk about testing with Kafka streams. Testing is a pretty critical part of software development, especially with applications that you're writing, you're gonna want to unit or integration test them. And this can be a little tricky with Kafka streams because Kafka streams by its nature connects to brokers, and you don't necessarily want to set up a whole cluster just to write a unit test. Now, for one thing, it can be kind of inconvenient to set up a whole cluster and for another it's expensive. Your tests will take longer because they rely on these connections that they're making to the broker and these messages getting sent across the network. Now you can't really avoid that with integration tests and you should always have some integration tests but with the unit tests, you don't necessarily want to be involving a broker. And this is where the Topology Test Driver comes in. The topology test driver is really a Kafka streams answer to unit tests. You wanna make sure that the custom logic in your topology is producing the correct results, and that's where the topology test driver comes in. So how do you actually use the topology test driver? To use it, you just build a topology as usual with the same configurations that you would use to set up your Kafka streams application. You might wonder what if you're using Schema Registry? Well, there's a Mock Schema Registry that Kafka streams provides, which you can use to mock the actual schema registry so that you don't need a schema registry cluster, similarly to how it essentially does away with the need for a broker cluster. Now, once you have your topology, all you need to do is instantiate the topology test driver and pass it in your topology and these configurations. This basically represents the Kafka streams application itself, so now it's on you to create some test input topics and some input. So the test input topic is what you will create and use with the topology test driver. You just create that and then you can pipe input with the test input topic pipe input method. Now, all you need to do here is send in normal key value objects. This represent the records that you would have coming in from a broker. And there are various overloaded methods that allow for providing timestamps lists more just general convenience methods, but fundamentally you're just writing records as you would to a normal topic but instead to this test input topic. Now some interesting things to note about the test input topic, executing this pipe input will actually trigger a stream time punctuation. So, this is just part of the nature of the test input topic that it can provide normal stream time punctuations. Does not provide wall clock punctuations because punctuations that go by wall clock time can be a little bit hard to test deterministically. And it's important part of your tests that they are reproducible and not resulting in potentially flaky test failures or something that you can't quite verify. So instead of going by actual wall clock time passage and the topology test driver, you need to use the advanced wall clock time method to actually advance time and that gives you control over the way that wall clock time advances as opposed to trying to sleep on your thread or something else that is just not as precise. Now some good practices for writing testable applications. You'll notice that the streams DSL had a lot of operators that accept SAM interfaces or functional interfaces. This is convenient for writing an application because you can just use a Lambda expression. It's less code in general, and you can just do it completely in line, but this has the downside that you can't as easily test that same Lambda in isolation. It's going to be much more convenient for you to test the arguments to these map values or reduce methods if you actually write a concrete implementation of the interface instead of using a Lambda. That way you can write unit tests for each of your value mappers or your reducers, all of these things in isolation, to make sure that each step of the way has been thoroughly tested and give you greater confidence that all the pieces are working together. Of course, the only way to know if things really work together is through integration tests. You know, a unit test will test whether each piece itself is working, but you can't really get a good sense of that unless you have a full integration test. Now there's some cases where integration test really is necessary. For example, if you use patterns subscription, that's when you instead of passing in a topic name, you just pass in a pattern, regex that the broker will match to the topic names and subscribe whatever matches. In that case, you might be interested in what happens to your application when a new topic that matches the regex is created or a topic that matches the regex disappears. That's not really something that you can test using the topology test driver because there are no brokers. So these sort of events that happen in the surrounding environment, you're going to need actual integration tests for, with a live broker. Another reason that the topology test driver does not quite catch every single thing that you would want in an integration test is that it doesn't really test the full caching behavior and other behavior that stateful operations might have. So instead of caching, it will output every single record that is processed, which is not the same way that Kafka streams itself will process things, unless of course you have set the cach in the commit interval to zero. It also doesn't write to real topics, so you might wanna make sure that you are writing to the correct topic, whether that be something that is dynamically determined at runtime or an actual topic that you will read from later. Now, how do you write integration test in Kafka streams? The best choice for using brokers in a test is to use test containers. So test containers also make it possible to share a container across multiple test classes, which is important because this saves you from having to set up the broker and tear it down each time for every single integration test. This helps your test run faster when they can share the same resources rather than doing the same set up and clean up that you would for each single test and generally just wasting time. So while it's important to have integration tests, the topology test driver is generally going to be your best friend with Kafka streams testing. So, now we're going to go over an exercise to see what that actually looks like in action.

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.