Do you feel like you maintain balance in your life? No? Well, how about in your consumer groups? Kafka committer Sophie Blee-Goldman wants you to, which is why she's here today to talk about her work in KIP-429, that's cooperative rebalancing, and KIP-441, the so-called smooth scaling KIP, both of which help Kafka Streams applications scale a little bit easier. It's all on today's episode of Streaming Audio a podcast about Kafka, Confluent, and the cloud. Hello, and welcome to another episode of Streaming Audio. I am, as always, your host, Tim Berglund. And I'm joined here in the virtual studio today by my coworker and Kafka committer and Kafka Streams person, Sophie Blee-Goldman. Sophie, welcome to the show.
Thanks, yeah. Thanks for having me.
You got it. Well, hey, we're going to talk about, I think, what I would call some special topics in Kafka Streams or some, I don't know, they probably don't seem advanced to you. But they're moderately advanced Kafka Streams topics today. And there's a lot of details I'm excited to dig into, but first, as always, I would love it if you could tell us a little bit about yourself. I mean, I already said you're an engineer, you're a committer. You work on the Kafka Streams, the part of Kafka known as Kafka Streams. But how did you get there?
That's a good question. I probably came here a slightly different route than others. I actually came to Confluent as my first job out of school. I actually studied physics instead of computer science. I spent some time learning, taking some CS courses and just learning on my own and on the side. But at first, I wanted to go into physics, and I liked being in the lab. But I realized that what I was always really excited to do was the part where we were programming, which of course is a big part of any physics lab in the modern world. It's like half coding at best.
Yeah, so I kind of realized... I'd been a Google intern a while back in the infrastructure team, and I really enjoyed that. So I thought, "Well, why not try to dig into the coolest infrastructure that's currently in the hot seat, which is Kafka?" So that's how I ended up at Confluent.
Could not agree more. I like that path. I've known, I think of the extraordinary developers I've known. There's a lot of musicians and a lot of physicists. That's just weird, those two patterns. That happen to-
Something about that drives you to go to computer science, I guess.
Right. I mean, the musicians, they find out, "Oh, wait, I can't make a living doing this," which of course, is not always the case, but is the case for some. But no, something... That'd be a neat bit of psychology research to figure out why that is. Of course, for that research to work, someone would have to have an account of what it is that makes you good at programming. And I'm not really sure that's really knowledge that we have yet.
It's tricky.
So... Yeah, it is. Anyway, a couple of things, if we have time for them, that I want to talk about today. One is rebalancing in Kafka Streams. I want to talk about kind of the bad old days, and historically how that has worked, and what has been unfortunate about that. And then cooperative rebalancing, which I think is KIP-429-ish.
429 plus or minus three. Sorry, that would be engineering, not physics. And if possible, scaling out and then dealing with the state store and moving state around when, I guess, scaling out is really the problem there. So let's see what we can do. And as always, Streaming Audio is about me learning things. Ideally, it's also about our listeners learning things. I hope everybody else is learning. But these are two things that I personally would like to dig into a little bit more for my own benefit, so I'm excited. All right, rebalancing. Now could you maybe lay some groundwork, if there's anybody who doesn't immediately understand the problem, maybe talk about just how consumer groups work, even before there's Kafka Streams. What's rebalancing? And why is it a problem? And all that.
Yeah. Sure. Kind of thinking back to when I was first new at Kafka, that was one of the things that I think was kind of trickiest to understand, the rebalancing, but to take a step back, like you said, consumer group, it's what it sounds like, you have a group of consumers and they each are assigned some set of partitions from a set of subscribed topics. So instead of bringing up a consumer and manually assigning it, you read topic A, you read topic B, you just bring up a group of consumers, whatever resources you have. And Kafka figures out how to assign partitions to these consumers such that it evenly distributes the workload. Each consumer is responsible for one or more partitions, but each partition is only read by exactly one consumer. So each consumer's kind of stand alone and has its own isolated view of this partition. So that's the-
Right. So if you have a topic with 10 partitions and you have two consumers, then they're probably each going to have five.
Yeah.
They'll be consuming five of those partitions.
Exactly. And technically this is applicable for more advanced users. For the most part you really just want to kind of distribute them in a round robin or sticky fashion, which we can get into more later.
Yes. If you know how the plugability works you don't need this episode.
That's a fair point, yeah.
Yeah.
Yeah, so the rebalancing. Rebalancing is kind of, as the name suggests, I guess, it does this work of distributing resources, or partitions in this case, to the consumers. Obviously you want to make sure that each consumer has enough work to do, or some work to do if there are partitions available to consume. And you want to make sure that it's evenly balanced. Hence the rebalance. So a rebalance is actually-
Yeah. As fair as possible.
Yeah, as fair as possible. It can't always be totally fair. It's life. But a rebalance is just an event that occurs when you have a change in topic meta data, a new topic is created, or group membership. And group membership just meaning you have a new consumer that joined, one of your consumers, the broker noticed that it died or it's no longer responding. Whatever might happen. Kafka just wants to make sure that all the partitions are actively being consumed.
That no longer responding is a heartbeat that consumers are responsible for answering, right?
Yes. So the background of the consumer, it has a heartbeat thread, which every three seconds, by default, is just flashing, "Hey, I'm still alive. I'm still here, I'm still working. Don't kick me out of the group."
Right.
So if a member does stop responding we would trigger a rebalance, which just means, "Okay, whatever partitions were previously assigned to this member who seems to now be dead, we need to give them to someone else or else they're just going to be neglected." No one's going to be working on them. It's going to become unbalanced.
They'll be sad.
They'll be sad. So hence the rebalance, aptly named. Now, unfortunately, kind of back in the day, in the bad old days, a rebalance actually meant all the current group members need to rejoin the group, as we say, where they basically just, they send what's called a subscription to the brokers and say, "Here's the topics that I'm interested in, here's the group that I'm a part of." And the broker kind of does a little checklist of who are all the members that are currently in this group that we need to distribute partitions to. So that's kind of how the rebalance begins. We start, everyone sends their subscription. Now at this point, in the bad old days, sending these subscription actually had to be appreciated by each consumer completely stopping work on all of its partitions and revoking ownership of those partitions. So every rebalance ultimately meant every single consumer has to stop work on all partitions across the entire rebalance and then only at the end they'll get their partitions back, or their new assignment. So kind of a downtime for the entire duration of the rebalance. It's especially a bummer because half the time it... I don't know what the actual percentage is, but a lot of the time they're just going to get the same partitions back. Maybe you're in a cloud environment and we're going to just bounce the pod. Nothing really changed, but you still had a rebalance, but you had to stop all work for pretty much no reason.
How long, in practice, is that actually? And what does that depend on?
That's a good question. I think, anecdotally, it can probably on the order of 10 seconds or so. I think it kind of depends on... I mean, what really needs to happen is for all group members to join. So it can be a very long time because the way that a consumer is notified of a rebalance event is through its pole. It literally calls the pole method and in that pole method it will join the group. But of course, there's this other config called the max pole interval which just determines how long can this consumer go with calling pole. And I think this is five minutes by default. So in theory you might have a fine minute rebalance while you're waiting for somebody to join the group.
Gotcha. In the bad old days, the pre-KIP-429 days that would be, I'd say, was unfortunate. Maybe. It just depends on what your SLAs are. I mean, maybe five minutes is fine. But probably bad.
Sure. In the scheme of physics and the universe-
You know, I'm-
... five minutes is not that long, but in computer science and-
It's not that long.
... distributor systems.
In getting things done in most business applications that might be too long these days. It's not an allow four to six weeks for delivery sort of world anymore. But this is not completely off topic, but a little bit off topic, it's just a story about me. But I'm remembering a talk I did almost a year ago, so this is late February when Sophie and I are recording this. This would've been in the last few days of February or first couple days of March, I don't remember exactly which, last year. So this was the very beginning of the pandemic when everybody was starting to say, "Oh wait, do we have to shut down life?" And so there was actually a conference that I attended in London that was in person. And this is going somewhere, don't worry. It was weird, because there were these bouncers at the front with hand sanitizer. Like, "Hey, nice hands you got there. Be a shame if anything happened to them. Sanitize them." Muscle there to make sure you sanitize your hands, because nobody knew anything. But, point is, at that conference I did this talk that I didn't have a chance to do any other time in the whole year because it was really an in person thing where I got people on stage and we actually acted out the consumer group rebalance protocols. So there was a person with a little sign that said pole. There was a consumer, they'd walk over and exchange it. "Okay, now you get this partition." It was kind of fun. At least I thought it was fun. Someday we'll do it again and maybe you can help me. Anyway.
Oh, I hope to be a part of that.
And the upper bound on how long the rebalance takes is that pole interval, which pathological cases can be configured for a long time. But it's probably seconds at any time because you don't want that. Probably seconds in the normal case because you don't want that pole happening all the time. Okay. So that's a significant amount of pause. If you have a major GC that lasts that long I feel like you're not on your A game.
Yeah, throwing out Java at that point.
Yeah, right? Which you don't get to do in this case.
Yeah.
So that's the bad times.
Yes. So this is how things were when I first joined, started working on Kafka, and I remember thinking, "That seems ridiculous. Why not just allow consumers to hold on to their partitions? Especially if they're going to just get back the same partitions after the rebalance." And I kind of assumed that that was the way that worked, and a little bit shocked to find out that that's not the way that worked. And of course, the reason is not that the original creators of Kafka were fools, but because-
They were just dumb. They didn't know it could've been better. Yeah.
They were busy. There's a lot of things that-
Turns out not. That's right. Benefit of the doubt. They were going through a lot then and it was a tough time. But it's also a distribute estate problem so what-
Exactly. Yeah, yeah.
... how did that go?
Things are hard. And the problem really here, the reason that we did this, what we call, the Eager rebalancing protocol, because we eagerly give up all the partitions, is because we need to satisfy this safety condition where we don't want two consumers to claim ownership of a single partition, potentially at the same time. So if you imagine, we had a rebalance, nobody revoked anything, partition A is being moved from consumer one to consumer two, that's fine. But it's distributor systems, you can't rely on the ordering of events. So it's possible that consumer two would get its new assignment, start working on partition A, but over back on consumer one it still hasn't been notified, or hasn't gotten around to revoking this partition. So unfortunately at that point it might end up revoking the partition, which typically involves committing offsets. And it might override the offsets of the new owner of this partition. So that's really kind of the crux of the problem here, is we just need to enforce a barrier. We need to synchronize and make sure that all partitions are no longer owned by anyone before we actively assign them to a new owner.
Right. Because, I guess, the case there is that you can have one partition assigned to two owners, as you explained. And you're still going to have a way of reconciling that to just one owner, but you've kind of got a split brain partition ownership state there, which means offsets are not going to be consistent across the resolution of that split brain thing.
Right, yeah.
Just making sure I'm tracking. So I think I'm tracking.
All right, perfect. Yeah, yeah. Yeah, the problem is you don't... Each consumer then gets a partial view of the topic and they might potentially override each other's offsets. Yada, yada, yada. So-
Yes, which is bad.
Exactly. So that was the problem. That was why we adopted this Eager rebalancing protocol, it's just kind of the easiest thing to do. And for a first pass, it worked. It works, it's not ideal, but there's a million other things to work on. So eventually the time came to kind of reconsider this. I think probably plenty of users are complaining because suddenly their apps are experiencing complete downtime for seconds or potentially even minutes, as we discussed. You might be hitting many, many rebalances, which especially in a cloud environment where pods are coming and going and you don't really have a perfectly stable environment, you might be getting rebalances all the time. And there might be no-op rebalances where nothing actually gets reassigned to any new owner. So there was no reason to stop working.
Yeah. But you paused, you stopped the world so that you could have a no-op rebalance because Kubernetes had a bad morning or something.
Yeah, exactly. Which doesn't seem quite fair.
It doesn't, it doesn't. Why should I have to stop my work because you had to move pods around?
Exactly, it's kind of the cloud native way. Things are happening, they're in the background, your apps should not react to it. Your apps should be stable.
Yeah. And, yeah, and your stateless microservice is fine with that. And, I mean, I think we all know this, but that whole method of application delivery, sort of... well just container orchestration, begins with the assumption that you have stateless or not particularly stateful microservices, with just a little bit of state to move around. Small programs. But this is stateful infrastructure and... Yeah. So not the case.
Yeah. Exactly. So yeah, so there's some work on our end to kind of make Kafka more resilient and available in the face of such events. So yeah, so I think the question there is obviously, why not just allow all consumers to keep holding onto their partitions to keep working during the rebalance? The answer then is that we need to enforce this synchronization barrier where everyone has given up the partition. But if we're going to have a rebalance where everybody gets reassigned pretty much the same set of partitions that it had before, then maybe instead of revoking everything we can just allow the consumers to hold onto their partitions throughout the rebalance and then, crazy idea, only revoke the partitions that actually need to move to a different consumer or be reassigned.
Ah. Ah.
So, but of course, then we don't have the synchronization barrier. So the question there is, how can we still enforce this? How can we make sure that it's safe to reassign these partitions? Well, the point of this KIP-429, as it was, was really to make rebalances cheap. And if a rebalance is cheap, why not just have another one? It's two rebalances for the price of one. Originally that would've been the worst deal in the history, but if rebalances aren't that bad-
Double the pain.
Yeah. We might as well. If we can make them low cost. So-
So why the second rebalance?
Yeah. So the first rebalance then takes care of the safety net of revoking partitions from consumers who's partitions are being assigned to someone else. So you have partition one was originally assigned to consumer A, now we want to reassign this partition to consumer B who just came up. So this first rebalance, we would revoke the partition at the end of the rebalance, the first consumer revokes this partition, but we don't yet reassign it. Because we don't yet know, we haven't guaranteed that it's been revoked at this point. So this is the end of the first rebalance. And now we do have this synchronization barrier. But we don't have this topic partition assigned to anyone. So we just trigger a second followup rebalance at this point, by now we know, we have this promise that this partition's been revoked, and so now we can assigned it to a new consumer or move it wherever we want, and it should be completely safe.
And that's faster. So you spend less time inside the synchronization barrier because you've got an unassigned partition and you just need to give it to somebody.
Yeah.
Is that the situation?
So the rebalance on the whole should be pretty cheap. More importantly, all the consumers can continue processing during the rebalance. So you're not sitting around waiting for some other member to join the group who's stuck wherever, stuck on some files, some IO.
Sure. So if you make it through the first pass and your partition wasn't, well it wasn't revoked, then you're fine, you should keep going.
Exactly.
You only stop if revoked and you stop because, well, your partition got revoked. So that in itself is not new consumer functionality, consumers already know, when I don't have a partition don't consume those messages.
Exactly.
Now, how do you predict... And if you're about to explain this or something else and I'm jumping ahead of you, just keep going, tell me to stop. But at the end of that pass, how do you predict which partitions you need to revoke? That seems magical to me at this point, I'm sure it's not, but how do you know that?
It is. Or at least it is to the broker, actually. So each consumer has what's called a group leader. It's nothing really special, it's just one member of the-
The consumer group has a group leader, right?
Yes. Sorry. The consumer group leader, which is a consumer, not a broker. And basically this is just, one of the consumers get picked, the broker points to you and says, "Hey, you're special, and your job as a group leader is to figure out the assignment of partitions." So in the middle of this rebalance, after everyone has sent their subscription in, the subscription, which includes the topics that are being subscribed to, all this information is collated by the broker and they send all the subscription info from all the consumers back to the group leader. The group leader then uses this subscription info to figure out how to assign partitions to new owners, how to distribute it evenly, try to give back partitions to their former owner, make sure that everyone has a balanced load. So the group leader is busy doing this during the rebalance. Everyone else is just doing whatever. And then the group leader sends this, what we call, assignment, sends the assignment back to the broker, and the broker then kind of does the reverse. And it distributes the assignment to all of the individual members. So then each member basically just contrasts. Here is my current set of partitions, here is my new set of partitions. Whatever overlap there is I got to keep, anything else, if I don't own it anymore, well, it's revoked. I got to clean up the state or [crosstalk 00:23:14]-
Yeah. I don't own it, it's revoked. And, yeah.
Yeah. So it's just a [crosstalk 00:23:18]-
Commit offsets is the thing that you do then.
Yeah, I think in general, yeah.
Okay.
But-
So this is just a consumer thing. I mean, we started, kind of, with a Kafka Streams framing. Is there any special Kafka Streams behavior around this? So Kafka Streams would inherit this, of course. But does it get stream-y at any point?
Kind of, yeah. Technically there's a slightly different implementation. The way that you-
Oh, right. Wait. Streams doesn't use the default group assignment logic, right?
Bingo. Yep. That is exactly it. Yeah, there's this thing called a partition assigner which, again, as the name indicates, figures out the assignment of partitions. And this [crosstalk 00:24:01]-
Naming things isn't always hard.
You're right. Phew. This assign method obviously is responsible for doing this assignment. That's what the group leader is doing. So this is something that Kafka Streams has its own special version of this designer. So when we implemented cooperative rebalancing we had one version for the plain consumer client and another version for Kafka Streams. So the plain consumer client, if you're sitting at home thinking, "Hey, I want to turn on cooperative rebalancing, but I just have a plain consumer app." You just need to use this, I think it's called the cooperative sticky assigner, basically you just plug in this specific kind of assigner, sticky, because it returns partitions to their previous owner to the best extent it can while maintaining a balance. And you plug that in and then cooperative rebalancing is enabled. For Kafka Streams you don't have to do anything to plug in the assigner, we plug it in for you and we just went into Kafka Streams and tinkered with the bits and the assigner to make sure that it stays sticky and it knows not to assign a partition until it's been revoked. So that's kind of the fundamental change there in the assigner, is that it just has to... It computes its intended assignment and then it looks. And if partition one I want to assign it to a new consumer, well I need to first revoke it from its previous owner so then I can't assign it actually during this first rebalance, I have to wait for a second one.
Right, right. Okay. So you basically had to do half of that work twice from the default assigner to the Kafka Streams implementation. Ah, so hey, on something entirely Streams specific, scaling out. The so-called smooth scaling KIP, or 441. Let me tee it up and then you can tell me, again, about the bad old days and about how life is much better now.
But we were just talking about consumers, which of course, Kafka Streams applications are a type of, and partition assignment. We were not talking about state. And any time I'm explaining Kafka to a brand new person and I explain consumer groups and scaling and everything, it's this wonderful world, and it's great. And, oh, well that's only really true if you're stateless, sorry. If you have any state it's on you.
I remember I was giving a talk like that once to a group and somebody said, "Well can't you just use an in memory shared grid for that?" And I'm like, "Well, yes, if you have one. That's great. But you shouldn't have to have one." So anyway, Kafka Streams has the internal state store, which is currently and by default RocksDB instance. And so it's keeping state in a durable way on a disc locally and flushing it back to Kafka topics.
And as instances of a Kafka Streams application fail, or as you add them and you get these group membership events that would cause partition reassignment to happen you also have the problem of moving state around. Now, I say problem, it's a problem for you because you're a Streams developer. It's a glorious solution for everybody who uses the API, because that horrible amount of distributed systems engineering, that as an application developer I don't have time for, or maybe expertise for, it has been done for me.
So this is super cool. But again, there are problems with an early implementation and a KIP that made things more better. Tell us about what the problem was.
Yeah. Sure. Yeah, no, the fascinating and frustrating thing with state is, obviously if your source of truth for this state is in the changelog topic in Kafka, that means you need to reveal your state on whatever your local instance is for anytime you're trying to actually actively process something. So it might be something simple like just a count, maybe you're counting the number of times you saw the word the. And so you just store in RocksDB.
You say, "Okay, I've see the word the five times, that's my current count. The next time I see it I'll increment it and move on." The tricky thing there is that you need to always have a completely up to date view of the state if you want to actually have consistency. Which means that if you-
Right. And this is, if I can say, this is per partition, right? So when we're talking about keys like a count of the word the, that word is going to have appeared in a partition. And that table is going to be associated with a partition, right?
Yeah. Exactly. Kafka Streams shards things according to partition, where each partition is kind of a self contained, isolated thing that has its own state stores. It corresponds to its own partition in a changelog, for example. And each of these is partitions, or yeah, in the Streams lingo we sometimes we call them tasks. But consider a task basically equivalent to a partition. That's just kind of the fundamental unit of work in Kafka Streams, is what we call a task. So-
Gotcha. So you have your own little universe of state inside a task because its associated with a partition.
Exactly. We don't care about any other partition, we just want to know for this partition, which includes all the things that start with the letter a, or whatever, whatever you're doing, here's your partition, here's your task. That's all that matters. So the problem here, the bad old days, is certain events, I think scaling out is the particularly bad example, although this can happen at any time. When you scale out you add a new instance of your Kafka Streams application. Presumably, you're doing this because there's some increased workload or there's some reason probably that you're trying to scale out your application. The real bummer is that, when you add this new instance it obviously doesn't have any copy of the local state, it's completely new so it doesn't have anything at all. So let's say you assign a partition or a task to this new instance, it really can't do anything until it's replayed the entire changelog and built up its local state store. You have to get it to the point where it remembers that it's seeing the word the five times, or so on, so that the next time it sees the word the it comes up to six rather than seeing-
So then it's six, and not five again. Yeah.
Yeah. Exactly. Or one, or whatever. And this is process of replaying changelog is super long. If you have a lot of state, in the triple digits of gigabytes, which is not uncommon, it can take hours. Users have complained, they have complained plenty. It's been painful. It's been bad. Restoration is a hard problem. For many users it's probably fast enough for them, but scaling out-
Not really.
... in particular is so bad because everything... You have to just replay everything, on every task, on every partition, completely from scratch. So you add this new instance, you're trying to handle this new load, and now this new instance is just sitting there. It can't actively consume its partitions, because it's still replaying a state. The other thing is, in Kafka Streams we have-
It's IO bound on the network interface, basically.
Yeah, it's... It takes a while to stuff things into RocksDB. Or if you have an in memory store, it's a lot of data, a lot of stuff moves through Kafka. So it's not trivial to rebuild state. And a lot of the time applications will have, what are known as, interactive queries, which really just allow you to serve a view of that local state. So let's say you have your application, you just want to know what is the value for the count of the word the at this moment of time, you issue this query to Kafka Streams and it just queries down through layers to hit the underlying RocksDB store or whatever your storage is. And it tells you, "Okay, we've seen this five times now." So a lot of applications are built around that.
And it-
And the problem-
Right. And just real briefly there, if you don't know what interactive queries are, I've never, like we said, partition assigner before, naming things isn't always hard. That's a sensible name. Interactive queries doesn't... It's like, "Yeah, I get it." It doesn't quite get the notion. But if you've got a Kafka Stream topology and at some point you have created a table like Sophie's been talking about counting things, aggregating, grouping by words, and counting occurrences of words.
Well, that words is going to be a key in a table, and the value's going to be the current count, right? So you have created an aggregation of some kind of, you have made a table, and you want to look a thing up in the table by its key. It's a very sensible thing. There's nothing... You're like, "Interactive query? What's that?" No, it's like a hash table, and you want to get a thing out of it, that's it. It's just that the thing you want to get may not be where you are, right? And so there's a layer where there may be another... If it's in another partition in another instance of the Streams application elsewhere in the group, this layer goes and gets it for you. So you have this transparent sort of key lookup hash table thing, you might be going over the network to get it. So it's not trivial, there's a lot of framework there that you are glad that someone who's not you wrote. But the idea is, it's a hash table and you're getting.
Yeah, it should be-
Anyway, sorry.
... easy enough to use.
Wanted to make sure everybody-
Although I'll say-
Totally, and it-
... we have not implemented the actual network routing in Kafka Streams. I think it is a feature that would be heavily requested. I think we have an example that does. So it's not the end of the world. But it would be a cool project if anyone is interested in getting their feet wet in really intense and detailed way. Feel free to-
In that way I have just suggested.
Yeah, feel free to really jump in the deep end there.
Right.
But yeah, fundamentally, at the end of the day, you're issuing queries, you're doing a lookup against this table. And Kafka Streams will serve whatever data it has for that partition. And of course, in Kafka Streams we value consistency, clearly, over availability. So if you bring up a new instance, for example, and you issue a query to it, well if it's only halfway through replaying the changelog it just can't respond. It doesn't have the latest up to date value for that key or for that word. It has to wait until its completely restored from the changelog, otherwise you might be getting a query that was days old or hours old, you might just be kind of bouncing around through time. So for true consistency we really want to make sure that it's completely up to date, hitting the head of the changelog, whatever that corresponds to when you rebuilt in RocksDB. That's kind of the other downside to scaling out events and other partition transfer things in Kafka Streams, is that you're not actually consuming from the input topics, and your query is serving layers down. Unless you want to complete sacrifice consistency, that availability, it's just down.
While you're scaling. And if you have one of those no-op reassignments, that won't take long, because there's not a lot of state to move around, so you just have the much shorter pause during the repartition. But if it is an actual scale out scenario or a fail over scenario or something like that, it's potentially, well in terms of SLAs, catastrophic.
Yes. I was going to say, devastating. But I think, yeah, that gets to the heart of it.
Yeah, yeah. Devastating, eviscerating. Some extreme sort of word. So, how does this get better?
Yeah. Well, so the... Again, kind of the problem there is that we've really been aggressive and eager, once again, in assigning these partitions or these tasks to the new instance. Kind of if you think about before you scaled out, what was happening, well all these Streams instances were completely, obviously, caught up on the changelog, they were actively processing, serving queries, everything is happy. And the problem, really, is not that you added a new instance, that's totally fair game. The problem is that we immediately reassigned a partition from whatever its current owner who was processing it to this new guy who now has to spend however minutes or hours replaying state from the changelog for it. So once again, we've been to [inaudible 00:37:39] and it's a process that worked for a while, but eventually you need to dig down and kind of get into the hard problems. And the time came to address this. So how do we do that? Well-
Yeah. And as I like to say, when you're talking about the old world, it's not like it was bad, or stupid, or anything, it was really hard to build that thing and it was the thing that we were able to build. It's like your handwriting was kind of messy when you were eight, I mean actually is still sort of borderline not even really handwriting, but most adults, when you were a little kid is was super messy, and then you grow up and it's not as messy anymore. You weren't bad when you were little, it's just that what you did. So this is the development of this framework.
Mm-hmm (affirmative). Yeah. I think if you try to do everything perfectly from scratch your project is just not going to get anywhere. And [crosstalk 00:38:29]-
You're never going to get anywhere. And you're not smart enough to do it. You literally aren't smart enough to know what those problems are.
Yes. That's the other thing, yeah. And sometimes it takes a few users, and planning, and filing tickets, to even realize what those problems were in the first place. And the severity of these problems is not always obvious up front. So whatever we're doing, it worked, but we did have this kind of trade-off of availability that for some applications, it's too much. They can't be down for hours at a time. I think probably any application would prefer not to, although some can handle it, some literally just cannot.
Yeah. Right, you'd rather not.
So, yeah. Personally I wouldn't want to waste hours of my time. But some people do. No, I'm just kidding, I waste plenty of my own time. So yeah, so the problem here is where we're too eager, once again, and the solution, obvious in hindsight, not obvious in the original design, but obvious in hindsight, is that we just need to not reassign these tasks to the new owners right away. We reassign them to the new owners, the owner suddenly pauses all work on that partition or that task until it's done. Well, instead why not just leave it assigned to its original owner who has a totally caught up version of that local state, can serve queries, can process all that jazz. Leave it assigned to its original owner. And then when a new instance joins the group, instead of moving it to that new instance we just tell the new instance, "Hey, here's these partitions that we want to assign to you they have a state. So you're going to need to sit there and the background and read from the changelog like you're in timeout in middle school or something. You need to sit there, read from the changelog quietly in the background [crosstalk 00:40:31]-"
Mm-hmm (affirmative). I mean, not that I'd know that I was like, but yeah.
[inaudible 00:40:34]. Yeah. Yeah. And you just warm up the state, as we say. You just sit there and read the state until you are caught up enough to the head of the changelog.
And because you're reading from a log, that works. Okay. So while you were talking... And this is not contrived folks, I mean I knew that this is 40... Terrible with KIP numbers. This is KIP-441. I knew that 441 was a thing and that it helped with Kafka Streams scaling, but I'd never thought through the process before. So, Sophie, as you were talking I was thinking, "Okay, so I start with a copy of the state, but then the original one is still modifying the state. And so how am I going to make sure I pick up all those modifications?" Well, because it's a log and I don't have to merkle tree the whole thing or do some bizarre thing. I just play the log and it's fine.
Yeah, exactly.
This is why we love logs.
The beautiful thing of Kafka. Sometimes it's frustrating, but usually it comes together in a beautiful way.
Exactly. Because it's built on the right abstractions, and so problems like this... I mean, there's all kinds of hard problems everywhere, you wouldn't have a job if there weren't. We wouldn't have this podcast episode if there weren't. But the solutions to those things become elegant. Because I was literally considering strange data structures in my mind, just while you were talking, thinking, "How would you build that?"
And then I thought, "Oh, no, hm, hm, no. You don't need that. You have a list, that's your data structure. Tim, it's a list of things."
Here we are. It's nice when the answer is obvious, even if it makes you feel silly for not having seen it before.
Exactly, yeah.
I think that's how I spend half of my time these days, just [inaudible 00:42:21] and then thinking, "Oh, damn, how did I not see that earlier?"
Oh, oh, that was easy.
But, yeah, I think a good solution, ultimately, is one that is simple and that makes you feel foolish for not having seen it earlier. We don't want to solve problems by just adding complexity on complexity on complexity. If you do that you might end up with an inefficient system, but you're going to reach a point where you can't add any new features, you can't do anything, you're just introducing bugs. So we always go for the simplest solution. And in this case, it's one that just kind of falls naturally out of Kafka and Kafka Streams. Yeah, we warm up from the changelog.
Yeah, yeah. No, that makes total sense.
Yeah. So then, once again, there's a lot of analogs between this and the cooperative rebalancing, KIP-429. Once again, we have this situation where we're kind of doing something in the background, in the cooperative rebalancing it was revoking the partition. In this case we're reading from the changelog, trying to warm up this state store, trying to get close enough to the head. But of course that means that we haven't yet assigned anything to this new instance. So it's catching up, catching up fast we hope. But it's not actually doing any work yet. So what do we do in this case? Well again-
Right. And that work is still being done by the previous.
Yes, someone else, yeah. So yeah, so everything is still live. It's available. They just... We haven't finished the scaling out event because we haven't redistributed the load to the new guy yet. He's going through his initiation, he doesn't yet get his topic. So, luckily, we had KIP-429, before this, which made rebalances cheap and low overhead, because we can continue to process during these rebalances. So, again, the question comes up, well, maybe we can just trade off a second rebalance to kind of solve this problem. So we do this in a slightly different way in KIP-441, as in KIP-429. In KIP-429 we rebalance immediately after the first rebalance. That is, the second rebalance comes immediately after so that these partitions which are revoked can be reassigned right away. With KIP-441, we might have a large group of new instances, which are all trying to catch up on different numbers of tasks, did levels of state, which could all reach their caught up state at any point. So we don't want to necessarily have a rebalance for every single one that catches up, and instead we issue what's called a probing rebalance where every, I think it's 10 minutes by default, we trigger a rebalance and we check to see if this new instance or instances are caught up. And how do we measure caught up? It's kind of a vague term, I feel we should've picked a better name there, but can't think of any. If you have any that come to mind I am all ears.
In sync was already taken, so, right.
Yeah. And so unfortunately all the good names end up being overloaded. We try to avoid confusion rather than just piling on the same few terms, in general, not always.
Exactly.
But yeah, we need to figure out, is this instance caught up? And that's kind of not a trivial answer, in a way. But it's not hard either. What is this task doing? What does this partition need to do? It's just reading from the changelog. It's done when it's caught up enough to the changelog. So the measure that we can use of how caught up it is is just, what is the total lag for this task across all of the changelog partitions that are associated with this task? So we read... What is the current offset into the changelog that this instance has, in its local state. Subtract that from the log end offset for the changelog, add all that up, you have multiple changelogs, if you have multiple stores for task, and that's basically how much this instance needs to catch up in order to be considered totally in sync for this task.
Is that within some absolute threshold? Number of offsets? Or percentage? Or-
Yeah, we have a number of offsets. Yeah, kind of as you were hinting at earlier, since this is actively being processed by someone else in the background, you're never really going to be perfectly caught up, at least if you are you're not going to be caught up for long, because this other guy is actively writing new things for the changelog. So there's a config, I think it's called the acceptable recovery lag, which is just in terms of number of records, so number of changelog offsets that you need to be able to catch up. By default this is set to 10,000, which seems super huge, but in our own benchmarks we found that it takes just a few seconds for Kafka streams to catch up. Yeah.
Okay. That does sound like more than I'd expect. I expect a few tens.
That was my exact thought.
But-
One of my teammates proposed that, and I thought perhaps it was a typo and he had added an extra zero. [crosstalk 00:47:46]-
Right. You're like, "You're obviously not good at this."
Yeah. So this is why we work together.
Exactly. Yeah.
Which just goes to show, if we can do 10,000 records in a few seconds, there's a crazy number of records that it must have been processing before that if it was taking hours to rebuild, which kind of gives you some sense of the full scale of what goes through Kafka. [crosstalk 00:48:11]-
Mm-hmm (affirmative). It's the scale of the problem.
Yeah.
Yeah, yeah.
Sometimes it's hard to really wrap your head around and think about how many records we're dealing with and imagine how quickly this is possibly going. But it turns out, pretty quickly. But when you have a huge... When you're getting up into the, whatever it is, billion... Whatever numbers of records, things can take a while. So yeah, so-
It is a system capable of size.
Yes.
Capable of scale. Yeah.
And, yeah. And if it's not enough then you scale out. And luckily, now that should not be a totally painful event.
That's easier. Exactly. Well, my guest today has been Sophie Blee-Goldman. Sophie, thank you very much for being a part of Streaming Audio.
Yeah, thanks for having me. It's been good.
Hey, you know what you get for listening to the end? Some free Confluent Cloud. Use the promo code 60PDCAST, that's 60PDCAST, to get an additional $60 of free Confluent Cloud usage. Be sure to activate it by December 31st, 2021, and use it within 90 days after activation. Any unused promo value after the expiration date is forfeit. And there are a limited number of codes available, so don't miss out. Anyway, as always, I hope this podcast was useful to you. If you want to discuss it or ask a question, you can always reach out to me on Twitter @tlberglund, that's T-L-B-E-R-G-L-U-N-D, or you can leave a comment on a YouTube video or reach out on Community Slack or on the Community Forum. There are sign up links for those things in the show notes if you'd like to sign up. And while you're at it, please subscribe to our YouTube channel and to this podcast wherever fine podcasts are sold. And if you subscribe through Apple Podcasts, be sure to leave us a review there. That helps other people discover it, especially if it's a five star review, and we think that's a good thing. So thanks for your support, and we'll see you next time.
Availability in Kafka Streams is hard, especially in the face of any changes. Any change to topic metadata or group membership triggers a rebalance. But Kafka Streams struggles even after this stop-the-world rebalance has finished. According to Apache Kafka® Committer and Confluent Software Engineer Sophie Blee-Goldman, this is because a Streams app will generally have some state associated with a given partition, and to move this state from one consumer instance to another requires rebuilding this state from a special backing topic called a changelog, the source of truth for a partition’s state.
Restoring the changelog can take hours, and until the state is ready, Streams can’t do any further processing on that partition. Furthermore, it can’t serve any requests for local state until the local state is “caught up” with the changelog. So scaling out your Streams application results in pretty significant downtime—which is a bummer, especially if the reason for scaling out in the first place was to handle a particularly heavy workload.
To solve the stop-the-world rebalance, we have to find a way to safely assign partitions so we can be confident that they’ve been revoked from their previous owner before being given to a new consumer. To solve the scaling out problem in Kafka Streams, we go a step further. When you add a new instance to your Streams application, we won’t immediately assign any stateful partitions to it. Instead, we’ll leave them assigned to their current owner to continue processing and serving queries as usual. During this time, the new instance will start to “warm up” the local state in the background; it starts consuming from the changelog and building up the local state. We then follow a similar pattern as in cooperative rebalancing, and issue a follow-up rebalance.
In KIP-441, we call these probing rebalances. Every so often (i.e., 10 minutes by default), we trigger a rebalance. In the member’s subscription metadata that it sends to the group leader, each member encodes the current status of its local state. We use the changelog lag as a measure of how “caught up” a partition is. During a rebalance, only instances that are completely caught up are allowed to own stateful tasks; everything else must first warm up the state. So long as there is some task still warming up on a node, we will “probe” with rebalances until it’s ready.
EPISODE LINKS
If there's something you want to know about Apache Kafka, Confluent or event streaming, please send us an email with your question and we'll hope to answer it on the next episode of Ask Confluent.
Email Us