Senior Software Engineer (Presenter)
Integration Architect (Author)
Timestamps are a critical component of Apache Kafka®, and they similarly drive the behavior of Kafka Streams. You can configure timestamps to follow either event time (the default) or log-append time.
With event time, producers automatically create a timestamp with the current time of the producer's environment if you don't add your own.
With log-append time, when the record arrives at the broker, the broker will override the timestamp of the producer record with its own timestamp (the current time of the broker environment) as it appends the record to the log.
The windowing operations that you learned about in the Windowing module are driven by record timestamps, not by wall-clock time. In Kafka Streams, the earliest timestamp across all partitions is chosen first for processing, and Kafka Streams uses the TimeStampExtractor interface to get the timestamp from the current record.
The default behavior is to use the timestamp from a ConsumerRecord, which has a timestamp set by either the producer or the broker. The default implementation of TimeStampExtractor is FailOnInvalidTimestamp, which means that if you get a timestamp less than zero, it will throw an exception. If you want to use a timestamp that is embedded in the record key or value itself, you can provide a custom TimeStampExtractor.
Kafka Streams uses the concept of stream time:
Stream time, by definition, is the largest timestamp seen so far, and it only moves forward, not backward. If an out-of-order record arrives (meaning a record that is earlier than the current stream time, but still within the window plus the grace period), stream time stays where it is.
Late records have timestamps outside of the combined window time and grace period. The delay of a record is determined by taking the stream time minus the event timestamp.
Hi, I'm Sophie Blee-Goldman with Confluent. In this module, we're going to talk about time concepts in Kafka Streams. Timestamps are really what drives the behavior of Kafka Streams as we saw in the windowing module, and timestamps themselves are a critical component of Kafka. Messages in Kafka have a dedicated timestamp field, which can be set by the producer when it sends a record or by the brokers themselves. Now, typically the producer will set the timestamp and this time will correspond to the actual current time in the producer's environment or the time of the event itself. So for example if you have a temperature sensor on a widget line in a factory, you wanna know what time is each temperature reading coming in. Now, this is called event-time processing 'cause it's the time of the event itself that drives the processing semantics. But the producer doesn't have to set the timestamp, it can just forward plain records to the broker and the broker will set the timestamp field itself based on the current wall-clock time of the broker. So since this is the time that the event is appended to the end of the log, it's called log append time. So in Kafka Streams, this kind of processing is called ingestion-time processing because it has to do with when the message itself was ingested into the system. So in both of these processing types, it's the timestamps that drive all the action in Kafka Streams. So those windowing operations we just discussed, the timestamps are always the timestamp in the record itself, and that's nothing to do with the wall-clock time or the time that your application happens to be running. So if you define a window of 10 minutes, any events that comes in with the timestamp within that 10 minute timeframe will fall within that window. But as soon as you see a record with a large enough timestamp, larger than the 10 minute window, this will officially close the window with the window end. So a window closing has nothing to do with the passage of real time, it's entirely driven by the events themselves. So in the case of multiple topics being processed together, for example a join, Kafka Streams will always choose the next event to process based on the timestamp. So it always chooses the event with the smallest timestamp since this represents what is happening next. So how do you actually get these timestamps out of the records? Kafka Streams has a TimestampExtractor interface, which it uses to get the timestamp from the record and can be defined either by the user or using one of the out-of-the-box default timestamp extractors. The default behavior is to use the event timestamp, which is just the timestamp that is literally the timestamp on the record, for example, what it was set by the producer or by the broker, or sometimes users might want to embed the timestamp to be used inside the value, in which case, the TimestampExtractor can be used to extract that timestamp. So the default extractor is the FailOnInvalidTimestamp extractor. And what that does is as the name suggests, if it comes across an invalid timestamp, for example, a -1 or something that does not really a real time, it will throw an exception. So if you ever want to define a custom timestamp extractor, all you have to do is implement this TimestampExtractor interface, and you can embed the timestamp in the payload or the record value of these events and extract it or set it however you want. So why do the timestamps really matter? Well, in Kafka Streams time is advanced by these timestamps. So for example, in the windowing operations, the windows only ever close because new timestamps, new records have come in with timestamps that are greater and greater, and this dictates the passage of time as seen by Kafka Streams. So how long a window remains open depends entirely on the timestamps of these events. Again, it has nothing to do with wall-clock time or the passage of time within the processor itself, and this is what we call stream-time. So stream-time is really just the largest timestamp seen so far. It's just a value, it dictates the current time as seen by a Kafka Streams. So as new events come in, if their timestamp is greater than the current stream-time, then stream-time will be advanced. And if new timestamps come in and they have a smaller timestamp, then stream-time, it's not advanced 'cause time never goes backwards. So the stream-time is always monotonically increasing. And again, it just tells Kafka Streams that some time has elapsed since we have new events. So you can imagine in the real world, new events with the future timestamp generally implies that time has passed from the view of this processor, new widgets have gone down the line. Whereas if you get an event with a previous timestamp that generally is just, probably a distributed systems type issue where record came in out of order, or the producer was stalled for a while and another record was sent before it with the greater timestamp. The important thing here is that stream-time only ever goes forward and it's driven entirely by the records themselves. So we keep mentioning this thing called out-of-order input. Out-of-order really just means that you have a events coming in with an earlier timestamp then the events seen so far, or using our new stream-time lingo, it means that the stream-time is actually greater than the timestamp seen for this new event. And in that case, that event is out-of-order. So for windowed operations, this means that the event timestamp is actually less than the current stream-time, but still within the window time plus the grace period. Records that fall outside of the grace period are known as late, but if it's still within the window size plus the grace period, then it's just considered to be out-of-order, and it just means that instead of the timestamp monotonically increasing in this event stream, you had a few records where the timestamp was a bit earlier. So in this example, we have first timestamp comes in at 14:01, then 14:03, and then we have another record that goes back to 14:01. So this can happen because record was retried. It might be something like the same record from 01, or because you have multiple producers all sending records to the same broker from different places. And because it's a distributed system, this means that those records might arrive at the broker at a different time. Now, note that this is only possible for this event-time processing. If the broker itself is setting the timestamp, then you're not ever gonna be having out-of-order data because the broker itself is setting that timestamp as it is appended to the log. So we touched on this briefly earlier, but in addition to out-of-order data, there's this concept of late data. And late data is distinct from out-of-order data because it means the event has arrived so far past the time period of records that you care about, that it just gets drops on the floor completely, it gets ignored. And this is where the grace period comes in. This is the per-window setting, and it just defines a cut-off period for out-of-order events. So the grace period is really just a metric of how out-of-order an event can be before it's considered to be irrelevant and therefore drops. Now, sometimes you might actually care about records that are very, very, very far out of order and coming in very late, but you might not wanna wait around for long enough to send the results and wait for them. And in that case, the trade-off there is getting your results sooner versus making sure that all possible records are given a chance to update and be reflected in the final results. And again, the delay of an event is determined by the stream-time. It's actually the stream-time minus the event-timestamp. So the delay of an event is really just how far after the highest timestamp that has been seen so far did this record arrive. Now, it's important to note that the default grace period in Kafka Streams is 24 hours. So in general, that means that you have a pretty large margin for out-of-order records to still be considered in your results. 24 hours is a lot of time, especially for things where they're generally only in the off by a few seconds or milliseconds, but sometimes you might have things that are coming in, days or even farther out, and in that case, you would want to set a higher grace period than the default. Now, note that also by default, you will still see records even when the grace period has not yet elapsed, but sometimes, you only wanna see a single final record and for that, you need to use the suppression operator, which will suppress any output until the grace period has elapsed. So, now that we know a little bit about how time works in Kafka Streams and what out-of-order and late records are, let's take a look at an exercise.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.