How does leader election work in Apache Kafka? Well, hang on, back up a second. What even is leader election? Why do we need it? How does it work? What's it doing for us? Well it's actually at the heart of how Kafka stays highly available. And in this episode of Streaming Audio we're going to go for a deep dive into the mechanics of it, why we need it, how it works, what can go wrong and how it's currently being improved to even higher up times.
Guiding us through is Adithya Chandra, who's a staff software engineer here at Confluent, and he works on Kafka's scalability and performance. And this turns out to be his story of an elegant design with some interesting trade-offs along the way, and just a little bit of a Pokémon hunt as we go looking for those one in 10,000 cases that get in the way of four-nines availability. This podcast is brought to you, as ever, by Confluent Developer. More about that at the end, but for now I'm your host, Kris Jenkins. This is Streaming Audio. Let's get into it.
So I'm joined today by Adithya Chandra. Adithya, how are you doing?
I'm doing great. Yeah, thanks for inviting me to the podcast.
Thanks for joining us. We're going to get really technical on this one, we're going to have a deep dive into leadership election, right?
Yeah.
'Cause you work on, sorry, you're a staff software engineer here at Confluent and you work on scalability and performance, and particularly at the moment on leadership election stuff.
That's right. So I've been at Confluent for two and a half years and I've primarily focused on the Kafka performance team. And as part of performance we've looked at running Kafka cost effectively, which is primarily getting the right price performance ratio in terms of hardware and our cloud configurations. And then I've also focused on scalability, which is like, we now support more than 10 gigabytes per second on our Kafka clusters, which is basically running larger clusters with a lot of instances and also running larger instances with a lot more memory and a lot more CPU.
And finally we've been focusing and looking very closely to make sure our tail latencies in Confluent Cloud... We bring them down in a systematic manner. So we've been able to build dashboards and probes to measure our latencies accurately, and then we've been constantly iterating on finding the bottlenecks and fixing them over time. And that is one of the places where we've had to enhance leader election that's currently there in Apache Kafka. And that was how I started working with leader election and looking at it.
Okay, cool. I have to ask, is that how you prioritize work? You just see where the pain points are and just look at the latest burn?
Yes, exactly. For the performance team it's all about finding the bottlenecks. It's kind of like peeling the onion. If we find one thing and fix it we may think, "Oh wow, it'll come down by a lot," but you just hit the next bottleneck. It comes down by a bit, we find the next bottleneck. And the super unique thing about the cloud is we look at all these customers running all these different kind of workloads. So the bottlenecks are very different and they change from workload to workload.
For example, it might be compression and the type of compression scheme that you're using. It then just relates to, say you're also using compaction or you're using an older version of the Kafka protocol. And we are doing up conversion or down conversion on the brokers. So there are all these slightly different workloads which are interacting in weird ways and taking latencies up and down. The average cluster and the average... On average the latency that we see for Kafka is incredible. 10, 20 milliseconds is kind of what we see. But at the edge is what the interesting thing is. The worst clusters, you might say, the worst latencies. So that's kind of what we're looking at and that's where all these edge cases come in. And then we've been slowly improving them one by one.
Those random, surprising edge cases, the way you find all your fun.
It's super interesting. We find them one by one, the fixes are small but it takes a lot of monitoring, a lot of understanding of the entire system, the whole stack, the life cycle of a request. And then you find the small thing that, "Hey, why are we doing it like this? Why not do it like that?" And you tweak a little bit and you can get pretty big wins, which is surprising. But we're also working on taking a longer term view and thinking about the architecture, that's also happening in the side.
Yeah, yeah. One of the things I love about performance tuning is I wish the same rules applied to other parts of my life. I wish I could really study the cost of a car for a few weeks and then finally get the price down to a 10th of what it was before. That would be nice, wouldn't it? But those kind of numbers only work in performance tuning, they don't work in everyday life.
Yeah. I think there was a saying somewhere that if you can bring down your performance by 20% then you've done some great engineering work. If you've brought it down by two or three X, then someone else had done a lousy job. So...
Yeah, yeah. Okay. So without pointing fingers of blame in any direction we best talk about leadership election. But let's take this from the start, 'cause not everyone will have the depth of knowledge that you do. So let's see, I have this logical model in my head that Kafka is a bunch of topics, essentially log files, split into partitions, usually partitioned by key. And each partition is replicated three or more times for resilience. So with that basic model in my head, where do leaders come in, first?
Yes. So that's a fantastic question. So you have three replicas, which are exact copies of data, and the client is writing to these three replicas. You have a bunch of different ways that you can do it. The way Kafka does it is that it elects one of those three replicas as the leader. The advantage of this architecture is that the client, either a producer or a consumer, only has to write once to that particular replica. This allows us to reduce the amount of traffic that we write over the internet, assuming the producers or consumers are in a separate place compared to where you're running your Kafka broker.
And the other advantage is that it's a reasonably simple system. The client doesn't have to worry about failovers, it doesn't have to worry about whether the replica's caught up and if there are holes in the log and all those things. All that is handled behind the scenes on the server side in Kafka. The clients just have to figure out who's the latest leader, which they do by refreshing their metadata. And that's where leader election comes in. So the leader election is basically, you have three replicas, so you have a choice of which one of the three should be leader. And we have a bunch of different factors that we consider when electing them leader.
So just to check, so in our case of three replicas we've got three brokers that are all writing to their topics, but one of them's receiving from the client and the others are receiving from the leader?
That's write. So let me talk, I'll just go through the life cycle in a little more detail. So what's happening is the client is writing records to the leader, the producers, and even consumers are consuming records directly from the leader. This also has a good performance benefit, that your data... Typically most Kafka use cases are reading data that was just written. They're not reading historical data. There's new, real time traffic coming in and you're consuming it pretty quickly. So this is typically hot data and it's in the cache because you've just written it. So if someone else reads from the leader we already have it in memory and we send it to them. So that's another advantage of this leader model.
And the followers, what they're doing is they're using the same protocol, a similar RPC message as the consumers to fetch data from the leader. So the leader gets a message, it writes it to its local log and then it waits for these followers to send the fetch requests and they catch up to this data. But by default what we do is we don't respond back to the client. So let's say you got a batch of records, the leader wrote it locally and then it waits for the followers to get the same data, write it locally and then send an acknowledgement back that they have written data locally. And only then it responds back to the client saying, "Hey, all the data has been committed and persisted." What this allows us to do is, as long as a client has got an acknowledgement, then we are sure that all replicas have that data, and any of them at any point can be safely elected leader without any data loss. I'll take a pause if you have any questions.
Right, yeah. So the one question I have there is, so each replica is responsible for pulling?
Yes.
They're not pushed to the replica, the replica pulls.
Yes.
Okay. Why?
This is something that we've been discussing, push versus pull. There are advantages of push. Pull makes a lot of things simpler. Data is on the leader, we are using the same mechanism that followers are using. It's simpler to implement and you can get a lot of... The biggest advantage, I would say, of push is that you can send it immediately and you don't have any, for example, delay when you're pushing data, which is fantastic. But push is also slightly more complicated. What if the follower can't take the data that you want to send? Then you'll have to wait. You'll have to retry and you have to... And also performance throughput versus latency is a dance that we constantly play in core performance.
I could imagine, yeah.
Let's say you're getting small records. If you send them all as soon as you get, you may not get the best throughput and you may burn a lot of CPU in the process. So we want to batch it together and then decide what would be the ideal batch size that we want to send to the follower. So in the pull model it's easier that the follower controls this, it knows the rate at which it wants to pull and then it'll pull that data.
One thing that I didn't talk about was, what if somebody is slow or what if somebody is down? If we keep waiting for this particular follower which is down then our latencies would be terrible and we would never respond to the customer. To deal with this what Kafka does is it keeps a list of replicas, what it calls in-sync replicas, or the ISR. And the ISR tracks all the replicas that successfully have been copying data from the leader. And this is persisted in the metadata. And these are the replicas that are eligible for leader election. If a replica is not in the ISR we don't consider it for leader election unless there is an unclean election flag where you say, "Hey, I want to trade off durability or correctness guarantees for availability." Which we don't recommend, but if you do that that's the time where anyone can be elected.
I'm just trying to get all the details here. When you say it's tracking which clients are in sync, is that they're acknowledging that they've got up to offset number X?
Yes.
And only the people who are in the same offset as you, the leader, are considered in sync?
Yes. So basically the leader, what it's tracking is, "Hey, these are my followers and they're constantly sending me these fetch requests." And the fetch requests that you have served, you served all the data that you had, so they're not lagging a lot. We also keep track of the lag, how far behind they are, like you said, in terms of offsets. But typically because it's a batch call, when the follower comes back and says, "Hey, give me all the data that I haven't yet received from the last time," it'll send a fetch offset which tracks what was the last point till which it successfully replicated.
So we know all the new data from that fetch offset is what we should send back to the follower, and that's what we send back. So as long as we are sending it back we presume that it's in the ISR. There's a setting which can control how far behind the followers can be. You can set it to different numbers like 30 seconds, 40 seconds, and that's kind of how long you wait. And if a follower is behind that you will kick it out of the ISR and you'll shrink the ISR, basically.
Right, right. Okay. I think I've got that in my head how it works when it's supposed to be working. And yeah, it does seem a lot like the normal producer and consumer considerations you have. The producer's backing things up and the other guy's consuming.
Yes, exactly. So that's the great thing about it. And yeah, it's exactly a similar code path as the consumer. There's one interesting thing about fetch sessions, which is an optimization where if you have a lot of partitions the follower doesn't have to constantly tell the leader because it's a pull based model. If you have no state maintained on the leader then each time the follower will have to tell, "Hey, these are all the partitions and these are all the fetch offsets that I want to get data from." And the request can become more expensive than the responses that you're sending for these fetch requests.
To speed that up there's something called fetch sessions which tracks on the leader side, "Hey, what are all the partitions that are currently in the session?" And if there are partitions for which you're not sending any data then the follower doesn't necessarily have to send it in its next record. So that's kind of an optimization that we can do.
Oh, right. So you are essentially keeping a list of things the followers subscribe to?
Yes.
Yeah, okay. That makes sense to me. Okay, right. Let's move on to the disaster. We've got that model all working just fine, what can go wrong?
Yeah. So a bunch of things can go wrong. The most obvious thing is you're doing an upgrade and you're shutting a broker down. And let's say you have three replicas, one of them is leader. Actually, let's start from even before that. Let's say you created a new topic and you have three replicas assigned to it, and which one of them is elected leader?
So when a new topic is created the controller decides which are the brokers that will be where the replicas will be assigned. So let's say there's a new topic and a partition where the replicas are one, two, three. The order of this replica list is important. The very first replica in that one, two, three, we call it the preferred leader. And I'll come back to preferred leader, but the most important thing is that this is the broker that will be given preference during leader election. And this allows you during placement to decide how leaders will be placed across the board.
So let's say we have these interesting models which track cluster performance over time, and they're like, we have SBC in Confluent Cloud, which is tracking what's happening with CPU, with networking address, et cetera, on each of the brokers. And then we are taking decisions on, where should leaders be? And the best thing that you can do at that point is think about where the preferred leader should be. And that's typically the first broker in the replica assignment. And whenever there's a disaster someone else will be elected leader other than the preferred leader. But Kafka also gives you a setting called balancing. What you can do is you can run a preferred leader election consistently.
So let's take an example. Let's say the example I gave you, one, two three, one was the preferred leader. All the brokers are up, there's no data and one was elected leader. And now we're electing, two and three are in the ISR. Nothing funny is going on. And after some time you decided to upgrade broker one. So you took it down. So you issued something that we call a controlled shutdown, at which point broker one will tell, "Hey, I want to give up leadership." So this is another trigger for leader election. And then when it says, "I want to give up leadership," the controller will go and trigger a special leader election where it'll ignore this broker one in the set of replicas that need to be elected leader because it knows, "Hey, this guy's giving up leadership and is going to go down." So one of two or three will be elected leader and then broker one can now... Yeah, if you had a question.
I'm wondering how it chooses between two and three. Who makes that decision and how?
So right now the controller only cares about... Conceptually all we care about is who's in the ISR. And then it does not really matter whether it's two or three. But right now I think it's just taking the first one in that list, so it'll be two, but that can change. So it doesn't matter whether it chooses two or three in this particular example.
So logically it chooses one at random. In practice the code probably just picks the first element of the array.
Right. Yeah, that's exactly right. So now, yeah, two became leader. and what happens next is you upgraded it and one came back. So if you don't do anything two will continue to be leader, but your load may now no longer be balanced because when you initially created it, let's say you gave one leader to each of those, there's some other partition's leader which is on two. And now even for this partition, the leader has moved to two and two is kind of doing more work than it's supposed to.
Yeah. 'Cause you could have deliberately created your partitions with the first one as your largest, most expensive, fastest machine and other two are kind [inaudible 00:20:33]. So now you've failed over to two as your cheaper redundancy machine.
Yes.
Yeah, okay.
And that's something that I've also seen people do and ask for, basically exactly what you said. The first one is where you have these expensive and in the main data center as your client applications, and then you have these redundant models which are in a different data center and they're mainly for disaster recovery, they're copying data. And you just fail over. And now we want to fail back into your primary data center when it comes back. And that's where you use this preferred leader election strategy, which is you... It allows you to set this periodic interval at which we will run preferred leader election. So let's say we set it for five minutes. So every five minutes there is an event in the controller and it goes and tries to trigger a preferred leader election for all partitions where the leader is not the preferred leader, and then...
But the leader is now available again.
Yes. And then if the preferred leader is in the ISR and available again then it will fail back into the preferred leader and your load will move back to this instance.
Oh, okay. 'Cause I can imagine people, at least if they don't have that, I can see people deliberately shutting down two just so they can push the leader back to one and then bringing two back up, right? I bet that happens.
Yes, that would be... Yeah. Yeah, that's another way to do it, you're right.
But now we have an automated mechanism for it.
Yeah. Yeah, you're totally right.
Okay. Is that it for graceful failover? Is there anything else we need to know or should we move on to disastrous failures?
Yeah, so the main thing for graceful failover that you have to think about is there's another setting called, how many of these in-sync replicas should respond before you acknowledge back to the client. So the default is X equal to all, where all the replicas in the in-sync replica list have to acknowledge a particular right before you respond back to the client. And this model is very clear. Everybody's replicated the data and any of them can become leader.
There's also cases where people don't really care that much about data loss and they set X equal to one, where it's just written to the local log of the leader and none of the followers have potentially replicated it. And they'll replicate over time, but you don't wait to acknowledge to the client. So in this world, if there is a failover we still have in sync replicas everything, and consumers don't see the data until it's replicated. But it's just that the producers can move fast. They just write and they just fire and forget about it. So if there is a failover but your producers have thought, "Hey, I've successfully written something," that may disappear during a failover if you had X equal to one.
Right. I think what I want is somewhere in the middle, right? Because I don't want to lose any data if I lose my leader, ever, personally, but I don't want to follower to go down and take my cluster down, a single follower. [inaudible 00:24:08]
Yes, you're totally right. That is exactly what we want. So one way people do something like that is with quorums, where they will wait for some subset of followers to have replicated data before they move forward. But in Kafka, with the in-sync replica replication model, we can't do something like that. So that is why we wait for all the in-sync replicas to replicate, and we depend on the ISR shrinking. So basically one of the replicas is not able to catch up or is slow, we want to explicitly kick it out of the ISR and then we no longer wait for it during this hot part of new requests coming in and replicating.
So let's say you have three brokers, like you said, one of them is slow. It's not replicated, it's behind, let's say, 40 seconds since it's replicated from your log. Then the leader will explicitly kick it out of the ISR, means you no longer wait for that guy. So you still have performance, but it's not done in a more real time manner. At any time it's not like your rights can go to some subset of the three. It always has to go to all the replicas in the ISR.
So instead of saying, "I want to make sure I've acknowledged four out of five," I do everything but that number five could shrink dynamically to four?
Yes, yes.
Right, gotcha.
And the biggest advantage of this model is you can potentially run with two and we would still be correct and we would replicate to two and then things are fine, and then one of the two can go down and you can go down to even one, and that would still work. So that's the advantage of this flexible model of shrinking the ISR and growing it back.
Okay, I see that. Okay. So are we going to move on to when the leader, someone accidentally pulls the plug out of the leader?
Yes. So if someone accidentally pulls the plug out of the leader, what happens is we have ZooKeeper and KRaft where things work slightly differently, but in both cases there's a way. So in ZooKeeper the broker keeps connections and heartbeats, and we know if the broker is alive and reachable. So what we care about is this broker part of the cluster, and as long as it's part of the cluster the controller will presume that it's the leader and things are working correctly. But let's say something went wrong, we lost the ZooKeeper connection, or the heartbeats in KRaft aren't working. Then in that world that broker is no longer eligible to be the leader. And that's the case you are talking about. And the controller maintains this list of alive brokers where it'll take it out of the alive broker list and it will trigger a leader election.
This is slightly different from the controlled shutdown where the broker's giving up leadership, in the sense that it's flushing everything to disk, it's making sure its local logs are written and then it's giving up gracefully. In this unclean world data was not flushed. There was something that you might have written, for example, you've not acknowledged back to the client, you've written a whole bunch of offsets but we were waiting for the followers to replicate. So this uncommitted data will be lost. So one of the followers, similar to what happened in the graceful shutdown will be elected leader. So that part is pretty similar. And then the point up to which it has replicated is what the new... Once it becomes leader the new followers will say, "Hey, this is what the log of my new leader is. I should truncate my log and start from that point." Because all the acknowledged data is supposed to be replicated everywhere, you will not lose any acknowledged data because you're waiting for X equal to all and it's guaranteed that the follower has caught up. So that's kind of-
So you really do have to wait on the producer until you've got confirmation that your record has been written?
Yes. Otherwise you can potentially lose that. There's no guarantee that it'll be persistent if there's a failover.
Right. Gotcha. How long does that take? I mean, in both cases, what's the time to get the new leader in a clean and an unclean shutdown scenario? Ish.
Yes. It depends on a bunch of things. So instead of giving you time I can talk about all the different things that need to happen in the ZooKeeper world and in the KRaft world. In the ZooKeeper world what needs to happen when, let's say, a broker goes down, is that first the ZooKeeper connection has to break and then the controller gets to know, "Hey, this broker is not alive," however long that takes. And then the controller decides, "Hey, this needs to..." It triggers a leader election, which is typically very fast, let's say microseconds. And then it decides who the new leaders should be. So there's also a caveat here. It depends on the number of partitions that were hosted on a particular broker. So for our examples we've been considering one partition, but people typically run thousands of partitions. So it has to literally do leader election for all 1000 of them. So that's another factor that goes into how long things could potentially take.
And then what happens is it needs to send metadata changes to all the other brokers saying, "Hey, you know what? These are the new leaders," because some of them would be elected leaders for a different set of partitions depending on how they were distributed across the board. So it potentially has to send all the other brokers in the cluster. So if it's a large cluster you might have to send these requests to a large number of brokers, and each of them would then process it and decide, "Hey, I am the new leader." You use something called epoch, et cetera. But basically what it boils down to is each of these will make a state change and say, "Hey, I am the new leader and this is what is happening." It'll say, "Okay, I'll start taking rights."
And at that point the followers also have to realize that the old leader has failed. So they'll get a metadata change message and they will go and start replicating from the new leader. And the clients themselves. They might be writing to the old leader. If the old leader is down they'll have a connection failure, they'll have to refresh their metadata. So that's another call that goes in. They refresh their metadata and they see, "Oh, you know what? This is no longer the leader. I have to connect to this other broker, which is the leader." If you already have a connection then that's great, you don't have to create a new connection. But over that connection you will go and start making requests to the new leader. And as long as the followers also have a connection between that new leader and everything is set up, your data, they will probably immediately send fetch requests and fetch that data and replicate internally and then act back, the normal replication process. And that's when it'll start moving on. So that's how long all the steps that need to happen in failover, but it's pretty fast typically.
It sounds like a lot of steps, but are we talking seconds? Milliseconds?
Yes. Typically seconds and faster than that. That's kind of how long it takes. It depends on how fast each of these RPC calls are. If they're typically in tens of milliseconds then it's pretty fast. It's just a bunch of communication that has to happen. And if each of them is fast then you will see failover is pretty fast. And like I said, it depends on the load. If you have thousands of partitions that's kind of when it starts taking longer and longer for all these messages to go out from the controller and...
I suppose if you're not replicating events anymore you've freed up a bit of network bandwidth for this chatter.
Yes.
Right. So I hadn't quite realized this, but there's potentially a different leader for every partition.
Right, right.
Is there some mechanism to spread the leaders evenly around all the different brokers?
So by default Kafka just round-robins assignment. So when you create a topic it starts saying, "Hey, I'll put the first leader on the first broker, the second leader on the second one," and then it goes along. And it also tries to, I think, be smart about, "What was the last time I started putting in my round-robin?" So that you don't always start from the... Let's say you have 10 brokers and you just have a replication factor of two. You put in one and two, you don't want to start again from one and two and keep putting things in one and two. So it keeps track and then it tries to round-robin. But that's pretty much what it does. The tricky part is... I think this changed a bit in KRaft. I haven't looked at it recently. I'm not sure if it's still round-robins or picks at random, but that's kind of the strategy to spread it uniformly, like you said, across all the brokers when you create a topic.
But the problem is this is not sufficient, 'cause you don't really know what is the workload of the partition and the leader. So for workloads where you have a single use case and then you create this one partition that is used evenly across all brokers and you have the exact number of partitions as the number of brokers, life is good because producers are probably producing uniformly. But there are use cases where there are different teams in a company which are reusing the same Kafka cluster. They have a bunch of topics, they all have different kinds of workloads. Then it becomes important to dynamically change this. And that's when we want to model the workload and then dynamically change which leaders are on which brokers. And that's where self-balancing, et cetera, which is not there by default in Apache Kafka and you have to use either SPC or there are open source tools like Cruise Control, which do some of these things for you.
Okay. So I guess that leads into a bit more about the work that you've been doing to optimize leadership elections for Confluent Cloud, right?
Right.
Tell us about some of that.
Yeah. So this is what I'm pretty excited about the work that we've done over the past year or two. I spoke about all the things of when leader election was triggered, and then it was always when the broker died completely. But in practice this is not the only way or the only failure mode. What we've talked about is the broker's custom membership failed, which boiled down to its connection with ZooKeeper failed or its connection with the controllers in the KRaft world failed.
But we have all these interesting cases, like we discussed earlier where you would run your leaders in this important data center and then you might have followers in another data center for failover. Even in the cloud we distribute our replicas across three different availability zones, which are potentially three different data centers. And the reason we do it is even if an entire data center blows up you'll have other data centers that we can fail over to.
But the problem here is there are different connections. So you'll have, let's say, external connection to one data center can potentially be broken, but the data centers can talk to each other internally. So this is kind of what we say, a particular network line was broken, like a cable was cut. And in this weird scenario we would have leaders that your clients can't connect to but can talk to the controller and the controller thinks the leader's healthy. So you have this weird case which we had to solve to improve our availability.
So that was actually happening in the field?
Yeah, it happens very frequently. And the more obvious one is we have load balancers in front of these brokers. And again, the load balancers for availability purposes are per data center, right? And let's say one of the load balancers had issues and went down. They're also replicated, et cetera, but you can imagine cases where something went wrong with that service, similar to how things go wrong with Kafka. Then again, you're not externally reachable, and external reachability is super important. Your leader, if you can't receive requests or respond to events, then you're pretty much unavailable.
Yeah, if you can't talk to it then it doesn't really matter that the cluster thinks it's happy, right?
Exactly. So that was a very frequent thing and we were thinking about, how do we solve this? The first part was detections. There was a lot of work done around network probes and we have these other things that are constantly trying to send traffic through the load balancer and trying to figure out, "Hey, is the broker reachable?" We also have added new things where the broker itself tries to see, is it externally reachable? Is it getting a request from our health check probes, et cetera? Is it receiving no traffic, et cetera? And this information is what we wanted to feed into leader election.
But at the same time we wanted to keep it pretty extensible. This is one use case that I gave, but there are other cases like performance problems, that's what my team is mostly focused on. And performance problems are frequent, and typically we know why it's happened if we have fine-grained alerts, but a lot of times we may not know what's happened and we've just got a monitor that says, "Hey, the latencies are high for this cluster." And then the on-call goes in.
And the first thing that you typically want to do is to just fail over leadership, because leadership failover is super cheap. Like we already discussed, that all you have to do is send these metadata requests and then the replicas are already caught up. And then the client just has to... And it's already probably connected to the replicas because some other leader is on them, the other brokers, and it just has to send requests to that guy instead of the current guy that it's sending requests to.
Right. So we don't know why the leader's slow, but let's just fail over to someone that isn't slow while we figure it out.
Exactly. And that's super cheap compared to, say, shutting down the broker where suddenly you'll have less capacity overall. But also the main problem of that is now you're no longer replicating three ways. So that guy, it won't be caught up when it comes back up. It has to catch up all the data. And also you're susceptible that if one of the other two goes down then you pretty much have unavailability because we set our minimum ISR size to be two.
So that's the reason we piggyback on leader election. And the way we've extended this to capture all these cases is we've added something called leadership priority. So what it allows you to do is it allows you to go and say, this is primarily just in Confluent Cloud, this is not in AK, it allows you to go and set, "Hey, the leadership priority for this particular broker is low. I think something is wrong." And we typically call it a demoted broker. So you can demote that broker and the priority will be low.
And when we do a leader election we do something smart. What we do is first we go and do a leader election without these low priority brokers. And if you are able to elect a leader then that's great, everything's good. So we have leaders that we can elect without taking these guys into account, and those are the guys that'll be elected. But let's say there are cases where somebody's not in the ISR, something else has happened and we cannot pick a healthy broker as the leader. In that case we favor availability for performance, basically. So we will just say, "Hey, nobody else can be elected leader. It's okay, we'll just elect the demoted guy as the leader. And...
This is dangerously reminding me of British politics where you don't elect the best leader, you just elect one you can elect. But let me try and stay off that.
Yeah, okay.
So you're electing the best leader available. How is that triggered? You can say just in Confluent Cloud, "This is a low priority leader," as the maintenance team behind the scenes? And then that automatically triggers an election, or what is it?
Yes, that's exactly right. So that's one path to do it. So it's an API, an RPC call. So one path is we have these command line tools and all our operational tools, which will make that API call. And an operator can just... It's almost like clicking a button and you can say, "Hey, this broker has lower priority," and it'll send this request to the controller and the controller will trigger an entire leader election for all the partitions that's hosted on that broker, and it moves things off. And then when you promote it back it'll again trigger it and move leaders back on. So that's kind of what happens.
But it's not just through that. We also use it, for example, I spoke about our network probes and our network monitoring. So we have a network health manager which is looking at the network health across the board. And when it sees that, "Hey, the external connectivity is not working for a particular broker," then it triggers this leader election and then that broker is given lower priority and it won't be leader.
And we are also extending it to storage performance. So we have the storage probes, where we're measuring how fast is storage for a particular broker, and if you're slower than everyone else... So that's when our tail latencies go up. Let's say the storage for most brokers is taking a millisecond or less than that, but you have this one broker which... We see this very frequently again in the cloud, that's why we've built these things, right? It's taking 50 milliseconds, 100 milliseconds, and the overall latencies are in hundreds of milliseconds. In that world what we will do is we move leadership off this guy, we say, "Hey, this guy is slow." We detect it and the same mechanism, it calls the RPC call which will trigger the entire leader election we've talked about. And leadership moves away.
And then the interesting part about storage is that these guys are still followers. So they're still writing, and as they're writing we will still get to see if the disc is bad. And when the disc gets better you can move leadership back to that particular broker.
If it's the preferred one.
If it's the preferred one. Yeah.
Yeah, that is smart. 'Cause the thing that always worries me about these automated failure detection things is what happens if it's a false positive, right?
Right.
But you're not actually taking them out of the cluster, you're just saying, "Lower their priority." So they're still available.
Yeah, that's a great point. So that was a very conscious design choice, that we won't take it out of the cluster, we're just choosing the best leader. But you still have an interesting point, and it's something that we've been baking into all parts of our software. It's pretty interesting actually, because what do you do if you're expanding the cluster and you have one of these brokers that's kind of degraded? Or what do you do if you are shrinking your cluster and you have one of these degraded guys? Do you pause all your maintenance activities?
And then even though we are saying we don't take it completely down, you're moving a majority of the workload off of it. So your cluster is no longer at full capacity, 'cause we kind of have these brokers which don't have any leaders. Leadership typically is way more expensive because it's doing both reads and writes, and some customers can potentially have thousands of connections with a leader. That's another thing. And also you'll have to keep in mind that the followers are also replicating from the leaders. They're also sending requests constantly to it. So a leader does a lot more work than the followers. So if you have a broker without leaders then its utilization is actually pretty low compared to... On average we see it almost at least 50% less utilization when it's not taking on leadership.
And that makes it all the more important to make sure you've got the leader on something that's very healthy.
Yes. Yeah.
How long did this bunch of changes take you to put in, out of curiosity?
It's been going on for quite a bit of time. I think we started sometime last year and it's already been a year and a half, but there's so many pieces that it touches, right? There's just the leader election part, which is just one part of the puzzle. Then you want all this detection, and like you said, you don't want false positives and you want it to work. I've not even spoken about it, I've just been talking about leader election, but you also want to be smart about the followers. Because we talked about the replication model, and the followers can potentially be slow and hold back your latencies and you can have slightly higher latencies. So that's also another thing, you can be smarter about when you shrink the ISR, when you expand the ISR.
So we've been thinking about it and we've been making slow progress, and now Confluent Cloud has four-nines of availability, and all of these were super important to get there. These networking issues where suddenly the leader is not available was a big deal. And we are seeing all these weird things in the cloud. When you bring up new instances suddenly the storage might be slow, the network might be flaky. If it's network attached storage you'll see flaky stuff there. So there are lots of these small cases where we start seeing performance problems and availability problems. We want to detect them one by one, improve confidence. And it's going to take a lot more time, I guess, for the monitoring. And that's where I think a lot of the algorithms and the intelligences, the detection and figuring out when is the broker completely healthy versus when it's not healthy. And the leader election bit is also interesting, but it's not as hard as getting the rest of the things right.
Yeah. But it's this thing, isn't it, where if you want four-nines of availability then you have to face the cases that only happen 100th of a percent of the time.
Right, exactly. Exactly right. And also the other way to think about it is we are sitting on top of the cloud providers, and let's say you're sitting on top of EC2 in Amazon, and EC2 does not guarantee four-nines. But we are guaranteeing four-nines with Apache Kafka. So what you need to do is Kafka has to be resilient to failures of your underlying infrastructure, which is running at a lower availability than what you are guaranteeing. So that's why it's so important to failover, to have all these replicas, to automatically detect these cases where something's going wrong with this underlying thing which is flakier than what you want to achieve so that you can give much better...
Yeah, 'cause normally your availability goes down the more components you add. That's the default reality. And to get past that threshold, that takes a lot of work.
Right. Yeah, and then we have so many different component and each of them has some availability. I don't think anyone gives us four-nines. And then actually the sum of all the parts, it actually gets worse if any competent failure will take you out. So that's why we have to be super resilient, that any of those guys can fail and we don't fail and we continue. And that's the only way to get to those four-nines of availability.
Well Adithya, keep fighting the good fight against the numbers. Thanks very much for joining us.
Thank you. It was great talking to you.
Cheers.
Cheers, bye.
And there we leave it. Now if Adithya has left you hankering for more technical details on leadership election then we do actually have an in-depth course on Confluent Developer that goes right into it and has some diagrams and walkthroughs and that kind of thing. We'll put a direct link to it in the show notes, or you can just go to developer.confluent.io and look for the course about Kafka internals with Jun Rao. It's part of our ever-increasing library of courses about Kafka and event systems. So it's a great place to go generally if you want to learn more.
Meanwhile, if the quest to find and fix those curious performance edge cases at huge cloud scale sounds interesting, now's probably a good time to remind you that we're always hiring at Confluent. And I found it a great place to work, so come and join us. Take a look at careers.confluent.io for more. And with that it remains for me to thank Adithya Chandra for joining us and you for listening. I've been your host, Kris Jenkins, and I will catch you next time.
How does leader election work in Apache Kafka®? For the past 2 ½ years, Adithya Chandra, Staff Software Engineer at Confluent, has been working on Kafka scalability and performance, specifically partition leader election. In this episode, he gives Kris Jenkins a deep dive into the power of leader election in Kafka replication, why we need it, how it works, what can go wrong, and how it's being improved.
Adithya explains that you can configure a certain number of replicas to be distributed across Kafka brokers and then set one of them as the elected leader - the others become followers. This leader-based model proves efficient because clients only have to write to the leader, who handles the replication process internally.
But what happens when a broker goes offline, when a replica reassignment occurs, or when a broker shuts down? Adithya explains that when these triggers occur, one of the followers becomes the elected leader, and all the other replicas take their cue from the new leader. This failover reassignment ensures that messages are replicated effectively and efficiently with multiple copies across different brokers.
Adithya explains how you can select a broker as the preferred election leader. The preferred leader then becomes the new leader in failure events. This reduces latency and ensures messages consistently write to the same broker for easier tracking and debugging.
Leader failover cannot cover all failures, Adithya says. If a broker can’t be reached externally but can talk to other brokers in the cluster, leader failover won’t be triggered. If a broker experiences transient disk or network issues, the leader election process might fail, and the broker will not be elected as a leader. In both cases, manual intervention is required.
Leadership priority is an important feature of Confluent Cloud that allows you to prioritize certain brokers over others and specify which broker is most likely to become the leader in case of a failover. This way, we can prioritize certain brokers to ensure that the most reliable broker handles more important and sensitive replication tasks. Additionally, this feature ensures that replication remains consistent and available even in an unexpected failure event.
Improvements to this component of Kafka will enable it to be applied to a wide variety of scenarios. On-call engineers can use it to mitigate single-broker performance issues while debugging. Network and storage health solutions can use it to prioritize brokers. Adithya explains that preferred leader election and leadership failover ensure data is available and consistent during failure scenarios so that Kafka replication can run smoothly and efficiently.
EPISODE LINKS
If there's something you want to know about Apache Kafka, Confluent or event streaming, please send us an email with your question and we'll hope to answer it on the next episode of Ask Confluent.
Email Us