Get Started Free

Test,How To Use Streaming Joins with Apache Flink

Streaming data brings with it some changes in how to perform joins. In this video, David Anderson and Dan Weston talk about how and when to use temporal joins to combine your data.

16 min

How To Use Streaming Joins with Apache Flink

16 min
dan-weston

Dan Weston

Senior Curriculum Developer

David Anderson

David Anderson

Software Practice Lead

Streaming data brings with it some changes in how to perform joins. In this video, David Anderson and Dan Weston talk about how and when to use temporal joins to combine your data.

To learn more, check out our documentation on Apache Flink® SQL joins

Resources

Flink 101—Event Time and Watermarks

How To Use Streaming Joins with Apache Flink

Intro to streaming joins

Dan: Hi Dan!

David: Hey David! Thanks for joining me to talk about how Flink SQL handles JOINs.

Dan: Yeah, happy to do that.

David: So I think the reason we wanna talk about this is because this topic can be confusing, and the reason for that is, while many developers are familiar with what it's like to do JOINs in a batch-oriented database, with streaming JOINs the situation changes a bit, and this can lead you being a bit confused about what to do with the results. If we boil this down to what really makes this different is that while in a batch database, when you do a JOIN, you get a result and you get a result that corresponds to the state of those tables at the time you do the JOIN. But with streaming SQL, the tables are constantly changing as new events arrive, and this means that the results of the JOIN, also need to be changing. So we call this a continuous query on dynamic tables, and to make this work, the SQL engine has to keep some state around, and in the case of a JOIN, that's potentially a lot of state.

Dan: Okay, so when you say state, what exactly do you mean by state?

David: Well, in the case of a JOIN, you'd be keeping around the rows from both tables, all of the events you've seen, you know, each event is gonna be a row in the resulting table. You might have to keep all those rows from both tables forever.

Dan: Seems like it's gonna add up pretty quickly.

David: Yeah, absolutely.

Stateless, materializing, and temporal operations

David: So let's establish a little context here for what we're talking about. So there's really three different families of operators in Flink SQL. There are the ones where the SQL engine itself doesn't have to keep any state around at all. So for example, if you wanna filter a stream of information about customers where you're only going to keep the events that are for customers who aren't guests, then you can do that by just inspecting the contents of each event, like to see if it's an event about a guest or not. So you don't need any state. On the other hand, if you wanted to compute some analytics, like, you wanted to count to your customers according to what kind of account type they have, there you're gonna have to keep these counters around. And maybe that's not very expensive, 'cause maybe you only have a few different kinds of customers. So maybe you've just gotta handful of counters you're updating as you get events about them. But in other contexts, you know, you might have a really large state space here, and the same thing will apply to JOINs. We will look at an example, I'm pretty sure. Flink SQL organizes itself around having some specialized versions of aggregations and JOINs so that we can execute queries that involve these things more efficiently, but only if certain constraints are satisfied, only if the data and the query is organized a certain way can we apply these optimizations.

Streaming joins are continuous queries

David: So this is really what we're gonna look at right now, is to talk about the differences between regular JOINs and what we call temporal JOINs.

Dan: Okay, so let me see if I've got this kind of straight in my head. So let's say I have a Customers table right now I have one customer, George, and he's currently listed as a guest. On the left we have the purchases that George is making and on the right we have the JOIN result of joining the Customers table with the Purchases table. As George makes purchases, then of course you would see the JOIN result as those purchases go on, obviously we'll continue to see results, right?

David: Yeah, yeah, that sounds right.

Dan: Yeah, so my question though is, what happens if George changes his status and is no longer a guest customer but is now a retail customer? How would that work and how would you JOIN those two records to give you useful information?

David: Yeah, so in the scenario you've just described, we have a fact table, we might say. These purchases that are happening, these are facts. Then we have this slowly changing dimension table describing the customers, which in the Customers is more of an updating table, where Customers have some information that's true about them now and some changes happen over time as the customer, you know, changes their relationship to the store.

Demo: join with an updating table

David: Let's make this real and let's try this out in Flink SQL. So this is a table we might use for the customers, and this is gonna be an updating table, 'cause as I was just saying, the customer record might change over time. So we're gonna know who the customer is, what kind of account they have, and when they last change their customer information. And then we're gonna have this simpler fact table about the purchases where we know who made the purchase, how much they spent, and when that happened, and these records are immutable, you know, the purchase happened and it's done. Now let me just mention here briefly that we set up these tables to have watermarks. The watermark is a signal about the completeness of the table with time. We're not gonna dive into watermarks in detail in this video. If you want to learn more about that, look for the link in the description below. But if you're trying to reproduce what we've done here in this demo, the easiest way for you to avoid problems, it's gonna be to configure the Kafka topics you're using so they just have one partition. This will help you avoid any problems with vital partitions. If that didn't make sense, don't worry. It's really a detail. So, let's get started. Let's put some data in these tables and see some things happen. So here we're creating the Purchases table, the Customers table, and now we're doing the JOIN. So we're gonna pick out some of the fields from the purchase, you know, everything from the purchase. And then we're gonna enrich each purchase with information about the customer making the purchase. We're gonna grab that account type that matches the customer making the purchase. And let's get that query going. So here the query is running, but we don't have any data yet, so it can't show us any results. So now we need to insert some records for the JOIN to do something. So here's the first purchase. We have George spending $50 and then we're going to see a result, ultimately. We have George, he's a guest, he spent $50.

Dan: That's great.

David: Now let's change George so that he becomes a retail customer. And as we watch, what happens is that the JOIN result gets updated. We now see George indicated as a retail customer having spent $50. We can drill in here using the Flink CLI, we can press M, which will show us the change log information. And so here we're seeing the history of what happened. We had initially the JOIN showed George being a guest. That's indicated with that +I, telling us that that record was inserted into the JOIN result table. And then we see later that result being updated, where we retracted, that's shown as a -U. We retracted that information and updated it to show George now being a retail customer.

Dan: Okay, so let me see if I can kind of summarize what happened. So this was the original purchase where we had George that purchased $50 worth of gear as a guest. And then we went ahead and we updated George to be a retail. And now the JOIN result shows as a retail customer, which while that's correct, it's not exactly what I was looking for, what I was hoping for, because while he is currently a retail customer, it changed a previous entry and we're missing that information, that he actually purchased this item when he was a guest. Now it shows that he made the purchase when he was a retailer. It doesn't seem like that's quite what we want, even though it is technically correct information.

Demo: join with an appending table

David: Right. Now, we saw earlier that there are these two different kinds of tables, these updating and appending tables here we were modeling the Customers table as an updating table, and that led to the JOIN result also being an updating table. And I agree with you, that doesn't quite seem like what we want if our goal was to enrich the purchase.

Dan: Yeah, for sure. So what if we tried as an append only table? What would that look like?

David: Yeah, let's give that a try. So the table's gonna be very similar to the way we had before, but let's call it customers_appending instead of customers_updating so we can keep them straight. It's gonna have the same fields, we're just not gonna have a primary key this time. And the JOIN is gonna look very similar to the one we had before. So we'll set up the table and we're gonna, again, start the JOIN before we put any data in it so we can watch things as they come along. So first we need to have a customer. Here I've just put in a different customer. Let's call her Sheryl. And she starts off as a guest, just like George did. She makes a purchase while she's still a guest. Boom, we see a result. We see Sheryl making a purchase as a guest. Then we update Sheryl's record to say that she's become a retail customer and we get a new result. We see now Sheryl making a purchase as a retail customer. But wait, she only made one purchase.

Dan: Yeah, so, let me summarize this again. So, same type of scenario only we have Sheryl instead of George, but this time when she made a purchase, we got two results that came in, right? So we see that she was a guest and she purchased it for $30. And then with the append only table, it looks like it changed the results. So now we're getting two entries for every purchase that she makes. And I imagine if she continued to make purchases, that we would continue to see two entries, because we're maintaining the information that she was a guest, but then we're also maintaining the information that she's a retail customer at this point, which doesn't seem like that's really gonna lead us down the right path either. It seems like, I dunno, what's the line?

Demo: temporal join with a versioned table

Dan: It seems like there's gotta be a better way.

David: Right? I mean, this is the way that regular JOINs work. You get the full cross product of all of the matching records, and in many scenarios that's really not what you want, especially in a streaming application, where often the goal is to take an incoming record, like a purchase and just enrich it with additional information so that we know how to process this event. So there is something designed just for that purpose in Flink SQL and in some other systems, and we call it a temporal JOIN. And the JOIN is gonna look very much like the JOIN we were doing before, we're just gonna add this one additional line into the JOIN where it says FOR SYSTEM_TIME AS OF purchases.purchased_at. So what that's doing is saying, "Please do this JOIN, JOIN the purchases to the Customers updating table, but do it finding the version of the customer information that was in effect at the time of the purchase." It's like doing this magic bit of time travel and picking out the right version of the customer record, which sounds a bit magical, but (chuckles) but that's what it does. And when we execute that query, we get exactly the results I think we were hoping for, where we see George as a guest spending $50, and we did this query, you know, with the same setup we had before where we already have seen this update of George to being a retail customer.

Dan: Yeah, so in summary, essentially, we're only maintaining the information based off of when the event happened, correct? So, when George was a guest, we can see that he purchased $50 worth of product and now that he's changed, if he was to make a purchase at this point, I imagine we would actually see in the JOIN, in the JOIN result that we would see both the guest and of course the retail entry for that. Which is yeah, exactly what what we were hoping, what we wanted to see, right?

David: Exactly! So each purchase is going to be joined with the version of the customer record that was in effect at the time of the purchase, and what's extra special good about this is that because we know that both of these tables are being processed roughly in order, you know, we're getting purchases advancing through time and we're getting customer updates advancing through time, we don't have to actually keep around and state all of the purchases and all the customer records.

Conclusion

David: We only need to keep really the more recent information, 'cause that's all that's gonna actually be relevant.

Dan: That's really cool. So then the question left is, so when would I want to use a regular JOIN? It seems like these temporal JOINs are, like you said, magic, the brand new magic that if I'm using Flink SQL, that I should just be using a temporal JOIN all the time. When do I use a regular one?

David: Yeah, that's a good question. So these temporal JOINs are what you want when there's one perfect record from the right side of the JOIN to combine with the incoming event from the left side of the JOIN. So, you know, you have a purchase, and you wanna find the perfect customer record to JOIN it with. But there are times when you need to find a whole bunch of records from the other side of the JOIN. For example, let's say you're building a fraud detection pipeline. You've got a purchase event that's coming in and you wanna decide if it's legitimate or not. And one of the things you might look at to do that is to find all of that customer's past purchases to see if this one is consistent with the pattern of their past purchases. That's gonna be a JOIN, you know, across all space and time as it were. You're gonna need to keep a lot of state to satisfy that. You should try to avoid that whenever possible, so one way that could help here would be, instead of finding all of the customer's past purchases, limit yourself to some recent ones. You know, maybe only keep, maybe construct the JOINs so it has some temporal boundaries on it so that you only need the purchases from the past week or month or something like that. Keep it finite.

Dan: Well, that's awesome. Thanks David for taking the time to walk us through how temporal JOINs work in Flink. It was really cool and it was really helpful for me. If you enjoyed this video, be sure to like, subscribe (uplifting music) and hit the bell icon to be notified of additional content. You can also check out developer.confluent.io to see the courses, videos, and other tutorials that we have and helping you on your Apache Kafka or Apache Flink journey. We also would like you to leave a comment below letting us know how we did. Did you like this video? Would you like to see others like it? What specific content would you like to see us cover? Thanks again, and until next time, have a great day.