Get Started Free
November 12, 2020 | Episode 128

Why Kafka Streams Does Not Use Watermarks ft. Matthias J. Sax

  • Transcript
  • Notes

Tim Berglund:

Well, good news. I've got Matthias Sax, Kafka Committer who works on Kafka Streams, back on the show to talk about watermarking. What it is? Does Kafka Streams use it? And why or why not? Now I'm being coy here and not telling you the answer, I guess you'll have to listen to this whole episode of Streaming Audio, a podcast about Kafka, Confluent, and the Cloud.

Tim Berglund:

Hello, and welcome back to another episode of Streaming Audio, I am as ever your host, Tim Berglund, and I'm joined in the studio by a repeat guest Matthias Sax. Matthias is an engineer who works on... Matthias streaming, right? Kafka Streams, Ksql that kind of thing?

Matthias J. Sax:

Correct yep.

Tim Berglund:

Yes, welcome to the show. I don't even know how-

Matthias J. Sax:

Thanks for having me.

Tim Berglund:

You bet. I don't know how many times you've been on, I need to count that, but it's been a few and it's always good to have you back. And I just know the last time you were on, we were talking about time, and it was kind of based on your 2020 Kafka Summit Talk, about the flux capacitor, and just the way time works in stream processing, and Kafka Streams in particular.

Tim Berglund:

And we started to get into watermarks, and I had kind of intended to talk about watermarks more, but there were just other good things to talk about and we covered it a little bit, but I didn't feel like we did enough. So welcome back, let's talk about watermarks.

Matthias J. Sax:

Glad to do so, it's a very interesting topic I think.

Tim Berglund:

To start with, what are watermarks?

Matthias J. Sax:

Very simplified watermarks are special messages that are embedded into your data stream, but it's not like a regular data message. It's kind of a metadata message if you wish. And a watermark has only one piece of information that it's carrying and that's the timestamp. And the idea of watermarks is that, when the stream processor receives the watermark, it gets some additional time information about the data stream.

Matthias J. Sax:

In particular, it's kind of a contract that let's say you get a watermark with timestamp 100, then you get the promise that you won't see a recording in the future that has a lower timestamp as the watermark carried. For the example, you have some watermark of 100, you get a guarantee that in the future you don't see any record with timestamp 100, 99, 98, or smaller. [crosstalk 00:02:44], this is important to handle out of order data basically.

Tim Berglund:

I was going to say, it sounds like... And I think maybe we all know this, but that is clearly a thing that is trying to reconcile the fact that the sequence of events in a partition and the timestamps of the events aren't necessarily going to align, it's possible to get events in sequence where their timestamps are out of order. Is that right?

Matthias J. Sax:

Yeah, exactly, exactly. And I mean, watermarks are one of many techniques to users, and it's a very, very prominent one. And watermarks are for very interesting properties, but always have entirely different techniques to do some things, and there's always advantages and disadvantages. And it's interesting because Kafka Streams and KsqlDB do not use watermarks, we very often get the question how we actually do this, and why do we not use watermarks?

Matthias J. Sax:

Because watermarks are very prominent out in the market. So people often think if you don't do watermarks, you don't do exact processing, because the basic things it's the only technique, or maybe even don't understand, it's kind of a technique of a high level concept. Basically taking time and put both into the same bucket, even if they're not.

Tim Berglund:

Right. I mean, they're simple to understand and simple to explain, like you just made that happen very quickly and it was clear. And it's one of those things that seems once you have that level of understanding like, "Okay, well, this is how you do it, this is obvious stream processing needs this," because we have out of order data and we need some sort of in bound indicator of how far we've gotten in processing the stream. So they're simple to understand, but why doesn't Kafka Streams use them?

Matthias J. Sax:

Well, there are sort of multiple reasons, I think one reason is that Kafka Streams in general is following what I call an incremental eager processing model. And let me give you an example for that. Let's say you do an aggregation over the data stream, then for every input record... Let's say we just do account, you have some IDs and we count.

Matthias J. Sax:

For every input record we have, we are getting actually can increase the count, and we update our result table. For this kind of processing, well, you don't even need a watermark because well, for every record you just update your result. You really incrementally maintain your output table.

Tim Berglund:

Because there's no window you're right.

Matthias J. Sax:

Exactly, because there's a window here. So for those cases you don't need it. And then of course we do have some windows and then we need to play a few other tricks, but if you go with other stream processing systems, that are what I call stream only systems, they don't really have such an aggregation because they don't have a table, everything is a stream.

Matthias J. Sax:

And then for them it's always well say, is embrace windows more aggressively I would say. And then for them the question when to emit an event is as much more important, because let's say in a table also for window operations, if we [inaudible 00:06:07] the window, you can actually say, even if you don't have reached the end time of the window, you can still give you the posture of the sound of self, the window you've seen up to now.

Matthias J. Sax:

But if you are a stream only system, saying you don't want to do this, because when you emit those records in your output stream, you basically emit effect stream or an event stream again. And that means you cannot really reconcile results. And because you cannot reconcile results, you say, no, no, no, I don't emit anything until I'm really sure that the window is completed and I can close the window.

Matthias J. Sax:

And then they use the watermarks to do this. But if you have this incremental model, watermarks are a little bit less important because it just will update you all the time. And I have the ability to reconcile the result.

Tim Berglund:

And yeah. Then that's the Kafka Streams approach is during a window, you've always got an intermediate result. And I know this is the thing I'm always explaining to people when I'm introducing people like to ksql or to Kafka Streams. When you explain windows again, watermarks are super intuitive.

Tim Berglund:

The intuition people have about a windowed aggregation is that, okay, it's 10 seconds, well, we're going to wait for 10 seconds and then we'll give you a result, which is one way to do it, but that's not what Kafka Streams does. It's always giving you the incremental thing. Could you take us through the reconciling the result again? I know you explained that, but I'm not quite there yet. You're talking about partial results of an aggregation, while a window is open. And then there's this reconciling process, if you could tell me about that.

Matthias J. Sax:

That's actually was simple, so again, let's go with the account example. Let's say we have account window of five minutes and we get records in. So in our result table, we will have an entry that says, well, that's the result for Tim between zero and five. How often did Tim click on my page? And every time we see a record with a timestamp between zero and five, well, we vamp the account, this is row in the table by one, one, two, three, four, five.

Matthias J. Sax:

Then we get new records for the next window, so we create a new row and we, again, vamp the result record. And we still have cells or raw in the result table. So basically now we have like two windows at the same time, if you wish. And when now out of order arrives, and we have to update the first window between zero and five, while we still have it around, it's still an entry in the table. So again, just vamps account. So this reconciliation's is actually pretty simple because, well, it's just like a regular computation. There's nothing special about it, because the result is a table, as I said and not a stream.

Tim Berglund:

Exactly. Another key piece of information here is that, the key to that table is going to be the things you're grouping by plus some sort of identity of the window, the timestamp of the window, is that right?

Matthias J. Sax:

Correct, correct.

Tim Berglund:

Every new window is a new row in that table, and we keep old rows around. How long do we keep old rows around for?

Matthias J. Sax:

That is something you can configure, by default, I think we set it to one day or it's kind of a random value. It works good in practice.

Tim Berglund:

The day seems good.

Matthias J. Sax:

Yeah, exactly. It gives you a very, very long period where you can handle out of order there. But it's still not like crazy expensive to keep the state around, because state is use liberty aggregated, it's not like raw data in a topic. Of course, it depends also on the window size, but if you say you have a window of let's an hour, what is very common I would say, well, then you get 25 rows per key.

Matthias J. Sax:

And while depending on your key space and depending on the aggregation it takes account, it's not like a huge amount of data that we need to keep, but having the ability to handle out of order database in a 24 hour period is actually pretty long.

Tim Berglund:

Yes, that is incredibly long to think that you'd get data that was that out of order. I can believe second where it's-

Matthias J. Sax:

It's happened [crosstalk 00:10:34].

Tim Berglund:

What's happened?

Matthias J. Sax:

I mean, for most of these cases, it's like in the second range, but there are cases like this... One example would be you have somebody taking a plane and they're disconnected, or to do stuff on an app while they're flying? And then the landing. And since certainly it's a reconnect to the network, and then the data is pushed to the system, and it's kind of all this data could be multiple hours old, because somebody was taking a flight.

Tim Berglund:

Exactly.

Matthias J. Sax:

Of course nowadays with onboard wifi, it is example that's getting a little bit out of date, but I'm pretty sure we can find another examples where people could potentially be unconnected from servers, even if it's getting error nowadays.

Tim Berglund:

We can just ask our friends who work in Edge applications, Edge computing that's a very edgy sounding kind of thing. Edge and IOT, there are going to be intermittent connections, those will be the longer window retention times.

Matthias J. Sax:

Yeah, exactly. Or maybe our lovely Caribbean cruise ship.

Tim Berglund:

Yes, yes. Also during the pandemic maybe a little too soon to mention that, that's very much a read the room kind of thing, isn't it? But yeah after the pandemic, I'm sure that will be that will be a robust activity that people enjoy together. And at worst only get a little bit of Norovirus and not much Coronavirus.

Tim Berglund:

I think what you said, what I heard you say, is that if you use watermarks, you don't have a safe way of emitting partial window results while the window is open. Now I haven't reasoned through all that, but did I hear you say that correctly?

Matthias J. Sax:

Well, you could still do that. The point is really that, the systems that are using watermarks they don't have a table abstraction, but only a stream abstraction. And if you have a stream abstraction, you can't really do this, because the stream says I'm an effect stream, I'm an stream events, this is happening.

Matthias J. Sax:

And because I do not present the result of a table, but it's a stream, that can actually only give you a single record that then sets the result of the window. Because if they give you a second record for the same window, which was an updated key, so the downstream operator don't have a chance to basically correlate them automatically, because the downstream operators says, "Well, I get events," and now there is this kind of duplicate event, and you would need to make another additional work to basically teach the downstream operator to say, "Oh, by the way, this new event is actually not an independent event, it's an update to the previous one."

Matthias J. Sax:

But the obstruction of a data stream does not have this notion of updates building, and that's the reason why it's hard to do, but in Kafka Streams, we also have this notion of a change log. And a change log and a table kind of the same thing. It kind of really depends how you represent the data. Say we have decided to say, no, no, no events with the same key in the same stream are by default updates to each other. By learning the event stream, there is no such correlation.

Tim Berglund:

Got it, got it. So you've hinted at this, but let's just go through pros and cons of each approach, make the most robust case you can for watermarks. And then let's see where that case falls apart. And I guess I want to say before because that question sounds almost political.

Tim Berglund:

This is engineering, it's real life there aren't solutions, there are trade offs, these are both things that responsible systems can do, it's just a matter of what sort of benefits you're trying to access at what cost. And that's what I really want people to see. Like the Flink case for watermarks, and then let's see where we can criticize that and find other kinds of trade-offs we'd rather access.

Matthias J. Sax:

I think the biggest strength of our watermarks is, that it's a kind of keep your whole program that is running in a distributed fashion, in some kind of cross synchrony [inaudible 00:14:57] state, because when you receive a watermark as an operator, you might have actually multiple upstream operators, and you receive watermarka from all of them.

Matthias J. Sax:

You basically learn from those different operators, how far they got in time. And use the same thing, when you as an operator send a watermark out, you basically broadcast it to all your downstream operators. And I mean, if I say downstream operators, I mean, basically all the parallel running copies up to the same operator. Because anything is-

Tim Berglund:

What are the conditions under which an operator sends a watermark?

Matthias J. Sax:

Well, basically for me the watermark, it depends how you implement it. Some systems do it periodically, it's just like every other second or five seconds you send the watermark. Or you do it on demand so that an upstream operator could send you a message back and says, "Hey, I need a watermark, please generate one for me.

Matthias J. Sax:

I think in Flink, it's kind of periodically, but I'm not totally sure at the moment. The point is in Flink is periodic action, I think is only triggered at the source notes. There's this kind of a scheduler that says every second, I think it's also configurable. All source nodes have to send a watermark.

Matthias J. Sax:

And when those watermarks send tickets through the data flow program, basically every operator that receives such watermarks will also update their own watermark, and then also send it downstream. It's kind of periodic, but not triggered at every level, but only at a source level. But then of course this initial trigger will basically flow through the whole data flow program to the end, to the things.

Tim Berglund:

I was confused, I'm still a little confused, but less when you said that downstream nodes in the stream processing program would be triggered to create their own watermarks, because source nodes emitting watermarks... What does that mean? I mean, it's less interesting, because the source node is just emitting things, it's more interesting to know as a destination node, how far you've come, right? Maybe I feel like I'm not quite getting something here.

Matthias J. Sax:

Maybe it goes a little bit into the weakness of watermarks. If it's your source node and you fetch data, let's say from a Kafka topic, or Flink [inaudible 00:17:36], and then at some point somebody says, give me a watermark. Now the sorts nodes need to have kind of an estimation, or need to give an estimation what could the watermark be?

Matthias J. Sax:

Because, well, it did not receive any watermark from Kafka itself, Kafka doesn't support that. I mean, I guess you could build it into your own data, then you say, "Well, I embed watermarks manually, but there's no built in mechanism for that."

Tim Berglund:

Right.

Matthias J. Sax:

And then usually watermarks are configured let's to say, "Well, I just assume if I have seen a record timestamp 500, I just assume that the maximum delay in my maximum out orderness might be 10 seconds. I'm at timestamp 500, I just did emit a watermark with 490 and just hope the watermark is correct."

Tim Berglund:

Got it.

Matthias J. Sax:

[crosstalk 00:18:29] node, exactly.

Tim Berglund:

But a source node would know absolutely the oldest record, or I guess the newest record, the most recent record it has processed, right? Doesn't it have perfect knowledge of that

Matthias J. Sax:

Yes, with that so it would be the example of 500, but how does or how should the source not know if there might now be a record with 450 in the future, because the watermark gives you lower bounds, right? If I made a watermark-

Tim Berglund:

Of course it's not an upper bound.

Matthias J. Sax:

With 490, then I promise that no record with the lower record timestamp comes in, but how does source not know if such a record might actually be in the Kafka topic at a high offset? [crosstalk 00:19:15].

Tim Berglund:

And it does not know. All right. It's they're low watermarks effectively. Not... Well-

Matthias J. Sax:

Well, exactly.

Tim Berglund:

All right. They're always moving forward in time. I mean, the water level is always going up, but this is the lowest that we've seen in some window by implication. And you're saying not

Matthias J. Sax:

Yeah, it's actually the lowest we have seen. Because, I mean, the lowest we've seen could be several. It's just like a thing that we say, well, we assume that we don't see anything sort of lower than that.

Tim Berglund:

You got the source nodes generating those speculatively, and then downstream a destination node. Walk me through what it does with those again, because I didn't quite follow.

Matthias J. Sax:

For the destination node, now we have to enter the world of data parallel processing. It's more complicated. So let's say you read from a Kafka topic with four petitions, and you have four source nodes. The four source notes will read on their own pace, and there's not really synchronization between them.

Matthias J. Sax:

When they emit the watermark, someone might say, "Well, I sent you a watermark with 490." The second one will say, "Well, I sent you a watermark with 520 and so forth." So you get different watermarks. When you receive all those watermarks, then it's kind of, well, what kind watermark can I generate for myself now?

Matthias J. Sax:

And for operators that it's receiving all those different watermarks of the different upstream operators, well, it can only guarantees a minimum of the watermarks it gets as an input, because there's only strict guarantees at all of those fulfill. If you get two watermarks and one watermark says, "I promise you not to give you anything below 490," and another watermark says, "I promise you not to give you anything below 500." Then you have still no guarantees that you won't get anything between 490 and 500.

Tim Berglund:

Yeah, [crosstalk 00:21:24].

Matthias J. Sax:

Promised only one made, but not both.

Tim Berglund:

Right.

Matthias J. Sax:

But you have the guarantee of both, so you won't receive anything smaller than 490.

Tim Berglund:

Got it.

Matthias J. Sax:

And then instead it's the watermark you basically can send downstream by yourself. So you process all data up to the watermarks that you receive because they're inbound, you would not process any future data maybe, now you can actually do it doesn't really matter. And when you have reached this point, then you say, "Okay, now I just emit a watermark with the smallest watermark as my input and broadcast it to all my downstream operators."

Matthias J. Sax:

And of course, this process take us down to the sources. And if you're a window operator, for example, now you can say, "Well, if my window is from zero to five, and if I receive a watermark that is larger than the end timestamp, then I know that I've seen all data, and I can close the window, I can do the computation, and I can also emit the corresponding results downstream.

Tim Berglund:

Right. It's for sure done. But back to the source node, was the source node doing some kind of speculation as just some kind of threshold where it's saying, "Well, I probably won't see anything this out of order, so I can speculate, and send this watermark." How does that uncertainty at the source node affect windowed computations downstream? And well, implied question. Am I right about that uncertainty?

Matthias J. Sax:

Yeah, there is an uncertainty for sure. I think there are some systems that have built in watermark support. You get some additional watermarks from your source system. Then the source nodes they'll need to estimate them or guess them. But most systems don't. Kafka doesn't do, Kinesis system doesn't do, ProMedica doesn't do.

Matthias J. Sax:

For those cases, you need to have some kind of estimation, and well, if there's effect on downstream, window operations is basically... You might close a window, let's say too early. Basically the result might be incomplete and then when you receive this as a record, census record is called late, because it's after the watermark. It violates the watermark contract. And then it's up to the system to say, "Well, how do we actually really want to handle those late data?"

Matthias J. Sax:

You could either say, I discard them, you could say, well, maybe I'll write in some kind of dead letter queue, or you could still say, no, actually I still want to reconcile my result if I have kept it around, because if you say I closed the window, so systems usually aggressively are perched a corresponding state.

Matthias J. Sax:

If the state is perched, then you could say, well, maybe let's just reopen this window with empty data, puts this late record in and emit a new result just for the sake of computation. That is also something Flink supports I think. You could also configure Flink to say, no, no, actually keep the state of staying around for a little bit longer, in case there is late data. And then still emit this update record.

Matthias J. Sax:

Then of course, if you do this, as I said, you need to do some special magic, and build some manual operators, I guess, to be able to handle those updates correctly, because out of the box the operators don't understand updates. I think the Flink beam model is a little bit better here in general. So I have some more internal data to track this, but it's not straightforward, especially if you build many operators that you can always do, because you need to have this built in update support.

Tim Berglund:

Gotcha.

Matthias J. Sax:

And I think that I kind of the [inaudible 00:25:04] you have downstream. By default, similar to what Kafka is doing, the records are dropped on the floor, because it's usually the most sensitive thing to do because everything else is much more complicated.

Tim Berglund:

For what we call truly late data, that is-

Matthias J. Sax:

Exactly.

Tim Berglund:

The store closed, you don't get to do it anymore. And obviously every system is going to have to define a time at which... We're not going to play your silly little games anymore, IOT network it's been too long, the data is not valuable and we will discard it since we don't have difference space.

Matthias J. Sax:

Correct, I mean, otherwise you would need to keep this as a state of self operator around forever, and census state would grow unbounded, because while you generate new windows all the time, you need to keep more and more state and that's practically just not possible.

Tim Berglund:

That is an undesirable property of a system for the state to grow in an unbounded way. I'm going to go ahead, and make that opinionated statement, and I stand by it. We have a pretty good understanding of watermarks, and I realized vocabulary... Again, this is one of those things that maybe you've said, and I'm tracking with you, but what do we call the alternative approach that the Kafka Stream's non watermark thing?

Matthias J. Sax:

I mean, we don't really give it a name, into academic literature I have once here, it's the term Slack Time Approach. What basically described the thing Kafka Streams is doing. I mean, we just say, well, we track stream time, we don't really give it a name, but if you wish you can call it the Slack Time Approach.

Tim Berglund:

Slack time. Got it. And you gave a good explanation of watermarks, and one more time the... I guess we're kind of working through the mechanics of the watermarks of the watermark approach. What is the primary argument for it? Why is it the desirable thing in the minds of someone who would design a system with watermarks? Why would you pick that approach?

Matthias J. Sax:

And as I said, it gives you this very nice property that downstream, because you have this kind of broadcasting of your watermark. So every parallel instance broadcasts to every as a parallel instance. As a receiving operator, you basically get time information of all your upstream operators, and that basically prevents you to close the window too early.

Matthias J. Sax:

Let's say you have a window from again, zero to five, and you receive watermarks from your upstream operators. Let's say they're a four. And for one reason, one upstream operator starts to lag, whatever there's a long GC or stuff like that. That basically means you don't receive a watermark from this lagging operator because it's lagging, and so you just keep the window open.

Matthias J. Sax:

And only when all this operator now catches up, you receive the corresponding watermark and so you close the window. You get this kind of implicit coupling, and that gives you a stronger guarantee about the correctness or completeness of a window. Then you say, "Well, I'm pretty sure that I've received all data of all my upstream operators, because the watermark gives me this guarantee."

Matthias J. Sax:

And so you basically have this ability to kind of time synchronize your whole data flow program is all parallel instances in a very, very semantically strict manner. And that's of course, something that is desirable.

Tim Berglund:

Got it, got it. And Slack time, I liked that the Slack Time Approach, I like to have Slack time, I don't really ever, but Slack time is a good thing in one's own schedule. So what is the advantage? What's the reason that Kafka Streams has said no to watermarks and yes to Slack time?

Matthias J. Sax:

Yes, I think the main reason to no to watermark has explained the society of having a table anyway. If you update your results incrementally, and the notion of closing a window becomes less important. And then of course in practice, we still need to close the window eventually, because we need to purchase a state off the table at some point, because otherwise, again, we have this problem of ungrounded bounds.

Matthias J. Sax:

And [inaudible 00:29:42], other things that sometimes people actually say, "Well, even if I have the table, I actually would like to receive a result stream from this table." The only content's the result of the closed windows persists at some point we added this suppress operator, so that is exactly doing that. I think there was actually an episode on that at some point. Because having only the final result in a stream can be very, very useful.

Tim Berglund:

Ah, yes.

Matthias J. Sax:

But here again-

Tim Berglund:

There's a Kip about a year ago that altered the way table aggregations emitted new records to emit fewer of them. Is that what you're talking?

Matthias J. Sax:

Yeah, exactly. The point is really just like it did not really alter the table. It's kind of an additional operators that you apply on the table result. It's still composable.

Tim Berglund:

Which everything in Kafka Streams is. And thank you for that.

Matthias J. Sax:

Exactly. And then of course, you need to say, When can we actually push something into the suppress operator, or when can the suppress operator really emits the final result." And now in order to do this, we track what we call stream time, and stream time is really just like the maximum timestamp we have observed, more very straight forward.

Matthias J. Sax:

You get different data, you look at the timestamp. The timestamp is five, okay, stream time is five. Next record timestamp is seven, stream time is seven. Now, an out of order data comes, the record timestamp is three, well, stream time just stays at seven, because we never go backwards in time, and so forth and so forth. And so you could think of stream time compared to the watermark approach as kind of a high time watermark. So that is how far we got.

Tim Berglund:

Yes, yes. That stream time is a watermark, it's just a high watermark instead of a low within a window of watermark.

Matthias J. Sax:

Yeah, exactly.

Tim Berglund:

Got it.

Matthias J. Sax:

And the [crosstalk 00:31:39].

Tim Berglund:

And I'm not confused by the terminology by calling it a watermark. I mean, we don't say that we call it stream time. Don't dare call it a watermark.

Matthias J. Sax:

We call it stream time. And it's also the point, I mean, there is no special metadata record in the stream. We don't propagate it, we don't need to propagate it, because every record carries a timestamp anyway. The stream time advances natively with the data. We don't have special embedded watermark messages in the stream,, we don't need some.|

Tim Berglund:

Got it, got it. You're just using the timestamp, which already exists. There's no extra thing to propagate or manage or come to consistency around, or anything will between processors or anything like that. It's just timestamps.

Matthias J. Sax:

Exactly, exactly. And now when we go back to the windows thing, so when can we close the window? It's a very similar now for the window operator you say now, well, had you find a grace period to keep it open for longer? So don't close the window when stream time hits the end time of the window, because if they're still out of order data, we will exclude them, but just say, "Keep the window open for another 10 seconds, or 30 seconds, or a minute." We can configure it.

Matthias J. Sax:

And then when stream time hits, so its end time plus grace periods, what you call as the window close time, so we would really close the window, and then everything after that would now become late data set we don't process anymore.

Tim Berglund:

Got it. What is coming up next? Is this in your mind, and I guess in the mind of the community, is it pretty much completed functionality? Are there interesting Kips in play that are going to change the way time processing works in Kafka? What can we look forward to?

Matthias J. Sax:

I mean, basically the mechanism always stays the same. I mean, you can now compare it to watermarks, there are a couple of drawbacks, but all the advantages with our approach. And maybe we can talk about those, and I can also say no, but what we have in mind to actually attack some of the drawbacks of our approach to make it better, and get more like watermark properties without doing watermarks. And the biggest problem is really-

Tim Berglund:

And that's cost us watermarks.

Matthias J. Sax:

Yeah, exactly. What you don't get when you use stream time is that you get this kind of time synchronization between the different operators in a data program. Let me go back to the example of this lagging operators that we have before, that didn't send the watermark, so the window was not closed. That is a property we're losing here, because when we only look at stream time, and if one operator starts to lag to send us data, or the other operators will still send us data, this is higher and higher timestamps.

Matthias J. Sax:

The operators that is locally dragging its own stream time, and every operator basically does more or less, and would say, "Well, my stream time is still advancing." And so at some point it would just close the window. Then when this lagging operator now starts to catch up again, it will basically send a lot of late data, or it could send a lot of late data, because we did close the window, because the downstream operator, when it receives the data, it does not know if a record is from the one operator or the other operators, so it treats them all the same, and just blends the stream time. And that could be a disadvantage here.

Tim Berglund:

Got it.

Matthias J. Sax:

On the other hand, a big advantages is that you always decoupling how long you want to wait for different operators. Let's say you have two window operators on the same data stream, in the same program. And one window operator says, "Well, I'm a time window of 10 minutes, and I'm willing to wait one minute for out of order data," and since there's this other operator that says, "No, I do an aggregation of our whole day. And for this reason, I also want to wait a full hour to get out of order data." Now the basically want to have like different waiting periods, different grace periods.

Matthias J. Sax:

In our systems that's easily possible, in the watermark their system it's a little bit more complicated, because in the end you say, "Well, I want to close a window when a watermark arrives." So you would say, "Well, in order to fulfill the delay of one hour, I need to send low watermarks, it gives me kind of a delay of an hour."

Matthias J. Sax:

But then you also get a delay of an hour of your 10-minute window, because you cannot send two different watermarks. So basically you couple all your window operators and gives them all the same grace period. You cannot pick on an operator basis anymore. That is the kind of the trade-off of this kind of, well, we can synchronize everything, and if you synchronize everything, well, everything is synchronized.

Tim Berglund:

Everything is synchronized, exactly. And that's not necessarily the case you might very well have different windows. I mean, that sounds like that's even common.

Matthias J. Sax:

Yeah, it's very common. And especially if... I mean, the grace period usually is some fraction of your window size. If you have a 10 minute window, you very unlikely want to wait an hour or two days. If you have a window for a week waiting for a day, yeah, why not? Sounds reasonable, because it's some percentage of your windows side usually.

Tim Berglund:

Exactly. Makes complete sense to do that at that point.

Matthias J. Sax:

So with this regard, our approach gives us much more flexibility to decouple operators. So every operator can make its own local decision, how long it's willing to wait, because in the end, it's kind of waiting period is latency in your system effectively, because if the window closes today, but I give you as a result in one day, because that is my grace period or my watermark delay, that is effectively kind of latency.

Matthias J. Sax:

Even if the result might not be complete, because there some might be out of order data in this one hour, but also from experience, that means that all depends then on how big your outliers are. Because very often you would say, "Well, I might have out of order data." And most out of order records, let's say life is in five seconds. And so once in a while, there might be one that is arriving like a minute later.

Matthias J. Sax:

And then it becomes this question of completeness versus latency. And if you have sort of outliers that are very, very, very large, then you need to have a very large grace period. But then of course, this grace period applies to all your parallel data windows. If it's all your data, it's all your parallel keys. And then you apply the same latency to all of them, because you don't know which one will be the outlier.

Tim Berglund:

Right. In complex topologies, that's a big cost that synchronization imposes on you is that, you get sort of a worst-case latency. For smaller windows a latency that is quite reasonable. Well, it's never good to have that latency, but you've taken the hit, you've taking the hit on that latency and it's reasonable for the big giant windows, but feels grossly disproportionate for the smaller windows, but you're stuck with that same grace period for all of the processors.

Matthias J. Sax:

Exactly, exactly. And so again, as I often say, it's always trade-offs depends what you try to optimize for. And for our approach there's also another problem, and that stream time because we don't have those watermarks never change. And doesn't really have demands like synchronized. Different operators can be at different stream time, and it really depends on the data you've seen.

Matthias J. Sax:

And the one very good example is let's say you have an upstream filter operator, and the filter operator is really aggressive, and discards 50% of your data even more. Your upstream operator could say, "Well, I'm already at timestamp 500, but I filtered out a lot of records," and the downstream operator have never received any of those records.

Matthias J. Sax:

The downstream operator stream time is much lower, because I just filtered out everything between 450 and 500, because it just dropped on the floor. While in a watermark approach that could never happen, because the filter would say, "Well, I just [inaudible 00:40:17] all those records, but I still send you a watermark of 500." The downstream operator can still learn about the time progress, even if there's no data coming. In our case, downstream operators only learn about time progress when there is data.

Tim Berglund:

Right, since there is no metadata other than the timestamp.

Matthias J. Sax:

Exactly. And that can also have this kind of impact. And for example, what we see with suppress especially when [inaudible 00:40:40] writes more tests. Write the unit test and they say, "Well, I want that the window closes after five minutes or something like that." And since I pushed the date into the window, and since I stopped pushing data, and since the test does not return any results, and so we're kind of, oh, whereas my result? Well, we couldn't limits the result because there was no upstream data. So it was pushing stream time forward, so we could not close the window.

Tim Berglund:

Of course. That makes sense. And is that the kind of thing-

Matthias J. Sax:

Can be a little bit confusing.

Tim Berglund:

Time in stream processing, if you're still with us everybody you know can be a little bit confusing. This is actually super difficult stuff to reason through. But is that the kind of thing that is possible to remedy, or that just an inherent trade-off of Slack time or how's the future look?

Matthias J. Sax:

Not necessary, there's no official Kip yet, but here at Confluent, we're doing some brainstorming, and one idea is to say, well, I mean, the simplest thing is to mitigate. The problem is first of all, we need to assess kind of metadata messages so that will propagate stream time forward. It's that simple. Instead of propagating a low watermark, you just would probably a high watermark.

Matthias J. Sax:

This kind of filter problem goes away, because the filter itself knows I have reached stream time 500, so I can just send a message down that says, "Hey, my stream time was 500," and then the downstream operator can still advance it.

Tim Berglund:

Exactly.

Matthias J. Sax:

So that part is simpler, but that's not good enough for what we have in mind. We're actually thinking about to not send Scala timestamps downstream but to use a vector clock approach, and this vector clock approach now would also give us help to basically cross synchronize across different parallel instances, what we did not have before.

Matthias J. Sax:

We have this example of the lagging operator, if you send a Scala timestamp, then we still might not know exactly so is the time of all those different operators. But if you send down a vector clock, then we can propagate those information much more fine-grained because now we have a vector of all those things. And we also especially want to use this vector clock approach in combination with inductive cruise.

Matthias J. Sax:

That's a feature that allows you in Kafka Streams so that if I have a table, I can basically curate a table state from outside, and can do a look up and say, what is the current count for two, between five and 10 of the time window. The problem is when you do this query into the store, you don't really know what stream time is, it's not really exposed to you, so you curate, but well, if your program lacks for today's, you just get an older result.

Matthias J. Sax:

Or maybe it's a window that doesn't even exist, because we are not there yet. And it's hard to reason about for users. And so the idea is, while we use this vector clock approach, and that allows us to basically say, beginning from the source nodes, we can track the time progress and the source nodes, and we propagate this information through the whole typology similar to what watermark do.

Matthias J. Sax:

And then when you also ask questions against the store, you can actually say, "Well, give me the result," but only give me the result, if you know you have reached this point in time, for example. And then we can basically allow users to reason much more about the staleness of the state, because while in stream processing and you're like there might always be a little bit of staleness.

Matthias J. Sax:

And it also gives you nice properties, if you say for example, I push a record to an input topic. And send them on to curate a store after the record was processed, and now this is kind of time-tracking approach. You can actually do those things because you would say, "Well, I know the timestamp of my record 500, and if the store did not reach 500, I know I cannot have processed this record, because otherwise the time would have advanced to 500." But yeah, we vector clocks are quite complicated.

Tim Berglund:

Vector clocks are quite complicated, but they would allow independent nodes in our stream processing topology, in the sense of independent computers running stream processors on the same stream, those independent nodes get to come to consensus about where they are mutually in time. That consensus kind of emerges among those nodes in a systematic and predictable way. Everybody gets to have their own opinion about what time it is but in a correct way.

Matthias J. Sax:

Yeah, exactly.

Tim Berglund:

I did not know that the clocks were a possibility here. That's good fun to know.

Matthias J. Sax:

We hope we can come up with something, and then of course there would be a Kip about it. And we hope that this vector clock approach might give us a little bit best of both worlds, so they can combines the watermark thing, and the properties of watermarks, but still use our vector approach, to still allows us to keep operators decoupled in the waiting time for grace periods.

Matthias J. Sax:

And besides that, because you said future thinking, I was always a fan of watermarks, and I mean, both approaches at the moment, watermarks as they implemented in Flink, and vector approach both have this problem, that in the end you guess, you need to guess, because you have no upstream information about any bounds on the unorganized of your input stream.

Matthias J. Sax:

But the point about watermarks is really, if you build some end-to-end from your upstream producer application beginning, then actually you can get absolutely correct watermarks, because if I'm a producer, and I'm pushing data into a topic, I can control the timestamps and send those records.

Matthias J. Sax:

And if I'm a producer and at some point I sent you a watermark with a promise that, I won't send you anything that's 500, then that's a very strict guarantee. And the producer knows that, the produce doesn't need to guess, because that's what producer does. If you would have the ability to let Kafka track watermarks natively, and build producers in a way, that's a good sent absolutely correct watermarks that are not estimated.

Matthias J. Sax:

Then of course, in a downstream system you don't need to estimate anything, because you get the correct information from upstream. And that's something I could imagine, maybe at some point we might build into Kafka and then you can still keep your operators decoupled as well, because then you can basically say, "Well, I still define a grace period," but I can also say, "Well, I don't care about the grace period, I just use the watermark to basically and also close some windows."

Matthias J. Sax:

You can basically mix and match both approaches. And I think that would be super helpful longterm, the problem is of course is that, doing this is a major change in the developers and the producer, so it's not easy to do. But I think it could be super helpful.

Tim Berglund:

And if all the vector clock management is taking place in basically the window code inside the API, inside Kafka streams, and that's not pushed to the developer, then it's a huge win, because I remember 10 years ago, this is a big controversy between Riak and Cassandra, because there was a little time there where it wasn't clear, which one of those was going to win in the dynamo style distributed database, no SQL the cage match.

Tim Berglund:

And it was a selling point of Riak that vector clocks produce more rigorously, correct results. Like you always know when you have an inconsistent update to a key, you know what the history of that key is, and Cassandra is like, "Ah, last but wins, it's good enough, it's fine." And sometimes it's wrong, but Cassandra won because if you make... Developers have to understand vector clocks in order to use the API, the API in this case was just a key value store.

Tim Berglund:

But you actually had to process exceptions and as it were traversed the vector and make decisions like there's just no way applications are going to be written correctly with that level of complexity pushed outside of the API surface. And so with that inside the API, you've got an exciting piece of mathematics or piece of computer science that gives you a lot of rigor.

Tim Berglund:

And if it's framework developers like you, infrastructure developers getting that right once, then the rest of us have some hope of writing correct code. With that, my friend we're at time, my guest today has been Matthias Sax. Matthias, thank you so much for coming back and being a part of Streaming Audio.

Matthias J. Sax:

Thanks for having me, always happy to talk about those things and if you have anything else, I'm happy to come back again.

Tim Berglund:

Hey, you know what you get for listening to the end? Some free Confluent Cloud, use the promo code 60PDCAST, that's 6-0-P-D-C-A-S-T, to get an additional $60 of free Confluent Cloud usage. Be sure to activate it by December 31st, 2021, and use it within 90 days after activation. And any unused promo value on the expiration date will be forfeit. And there are limited number of codes available, so don't miss out.

Tim Berglund:

Anyway, as always, I hope this podcast was helpful to you. If you want to discuss it or ask a question, you can always reach out to me @tlberglund on Twitter. That's T-L-B-E-R-G-L-U-N-D. Or you can leave a comment on our YouTube video, or reach out in our community Slack. There's a Slack signup link in the show notes if you'd like to join.

Tim Berglund:

And while you're at it, please subscribe to our YouTube Channel, and to this podcast, wherever Fine podcasts are sold. And if you subscribe through Apple podcast, be sure to leave us a review there, that helps other people discover us, which we think is a good thing. So thanks for your support and we'll see you next time. (music).

Do you ever feel like you’re short on time? Well, good news! Confluent Software Engineer Matthias J. Sax is back to discuss how event streaming has changed the game, making time management more simple yet efficient. 

Matthias explains what watermarking is, the reasons behind why Kafka Streams doesn’t use them, and an alternative approach to watermarking informally called the “slack time approach.” 

Later, Matthias discusses how you can compare “stream time,” which is the maximum timestamp observed, to the watermark approach as a high-time watermark. Stick around for the end of the episode, where Matthias reveals other new approaches in the pipeline. Learn how to get the most out of your time on today’s episode of Streaming Audio!

Continue Listening

Episode 129November 18, 2020 | 50 min

Distributed Systems Engineering with Apache Kafka ft. Roger Hoover

Roger Hoover, one of the first engineers to work on Confluent Cloud, joins Tim Berglund to chat about the evolution of Confluent Cloud, all the stages that it’s been through, and the lessons he’s learned on the way.

Episode 130November 23, 2020 | 44 min

Multi-Tenancy in Apache Kafka ft. Anna Pozvner

Anna Povzner kicks off the conversation with Tim Berglund by explaining what multi-tenancy is, why it is worthy to be desired, and advantages over single-tenant architecture. By putting more applications and use cases on the same Kafka cluster instead of having a separate Kafka cluster for each individual application and use case, multi-tenancy helps minimize the costs of physical machines and also maintenance.

Episode 131December 2, 2020 | 60 min

Tales from the Frontline of Apache Kafka DevOps ft. Jason Bell

Jason Bell delves into his 32-year journey as a DevOps engineer and how he discovered Apache Kafka. As a Kafka DevOps engineer today, Jason works with on-prem clusters and faces challenges like instant replicas going down and bringing other developers who are new to Kafka up to speed so that they can eventually adopt it and begin building out APIs for Kafka.

Got questions?

If there's something you want to know about Apache Kafka, Confluent or event streaming, please send us an email with your question and we'll hope to answer it on the next episode of Ask Confluent.

Email Us

Never miss an episode!

Confluent Cloud is a fully managed Apache Kafka service available on all three major clouds. Try it for free today.

Try it for free