May 13, 2021 | Episode 158

The Truth About ZooKeeper Removal and the KIP-500 Release in Apache Kafka ft. Jason Gustafson and Colin McCabe

  • Transcript
  • Notes

Tim Berglund:

When KIP-500 was new—I want to say August of 2019—I had Jason Gustafson and Colin McCabe on the show to talk about how we thought it was going to go, what the plan was, how long it would take, what the steps would be—all that kind of stuff. Well, now that KIP-500 is merged and released in Apache Kafka 2.8, I had those two guys back on the show to talk about how it's gone. So let's dig into it in some detail on this episode of Streaming Audio, a podcast about Kafka, Confluent, and the cloud.

Tim Berglund:

Hello and welcome to another episode of Streaming Audio. I am, as always your host, Tim Berglund, and I'm joined in the virtual studio today by my colleagues, returning guests, here on Streaming Audio, Jason Gustafson and Colin McCabe.

Tim Berglund:

Jason and Colin, welcome to the show.

Jason Gustafson:

Thanks Tim. I'm happy to be here.

Colin McCabe:

Yeah. It's great to be here.

Tim Berglund:

And back. As I say, the triumph of hope over experience, right? Anyway, the last time you were on, we were talking about KIP-500 because it was this bold new KIP and this project we'd embarked on and that was, I want to say a year and eight months ago, I think that was August of eight, August of '19? We're going to have a link to it in the show notes. So we'll know, that's historical record. And KIP-500 has been merged, so we're back. What does that mean? Where actually are we?

Jason Gustafson:

That's a good question. I was trying to recall when we had our last podcast, I guess it was pre COVID days. So, a year and eight months is probably on target, but it feels like it hasn't been that long. Since then, we have made a ton of progress. So in the last podcast we talked about the overall architecture that KIP-500 brings. We talked about the Raft implementation. We probably talked a little bit about the controller and its mechanics and all of this. So since then, a lot of the big pieces have... We've done the KIPs, they've been approved, and the code has been merged. So let me just give a quick summary and Colin can add some details about the progress.

Jason Gustafson:

So we did a KIP for the Raft Protocol. We merged that last year. We've been testing it quite a lot since then. We did a KIP for the controller, basically the design of it including what are the records and how we reason about the state and the [inaudible 00:02:53] in all of this, so that also has been checked in.

Jason Gustafson:

Colin, do you want to talk a little bit of maybe the 2.8, what we have?

Colin McCabe:

Yeah. That's a good summary. Basically, we had some pretty important foundational KIPs that filled in the details of KIP-500 as promised. You mentioned a few of them, the KIP-595, the Raft implementation. We had another one for snapshots and we had obviously the Controller 1 KIP-631 as well as the one about redirection. I think that, for me, the most exciting one was definitely the Controller 1, just because I really like the idea of redesigning the controller for scalability. But the other KIPs were in their own way, equally important. And especially Raft was super exciting too, because we took Kafka into the world of Raft. At the beginning, we knew that there were lots of similarities between the Kafka protocol as it stood and Raft, but we didn't know exactly how everything would work out. And I was really happy to see everything come together. I think it came together really well there.

Tim Berglund:

Nice.

Colin McCabe:

So anyway, I should also talk a little bit about what you can actually do with 2.8. I mean, our users probably want to know that. So we have this early access mode of KIP-500. And what early access basically means is, "Don't use it in production." Don't do it.

Tim Berglund:

Exactly.

Colin McCabe:

But do take a look, test it out, kick the tires, and see where it's going. It's a preview of what we're doing. It allows us to iterate on where we are. So basically the stuff we have working right now is like we have produced consume working, obviously the most fundamental thing of Kafka. We have a bunch of different APIs related to topics or working, creating topics, deleting topics. You can describe various things about topics. There's a surprising amount of little feature-

Tim Berglund:

Those have been...

Colin McCabe:

Go ahead.

Tim Berglund:

Those have been APIs that have kind of been creeping in. The additional admin APIs for doing things with topics. It seems like they've been...

Colin McCabe:

Yeah, so that's why we [crosstalk 00:05:28] those APIs over time.

Tim Berglund:

[crosstalk 00:05:29] kind of leaking in.

Colin McCabe:

Exactly. I mean, we've added those APIs over time as Kafka has gotten more... more... What's the word? We wanted to make it-

Tim Berglund:

Cloud-native.

Colin McCabe:

More... what?

Tim Berglund:

Cloud-native. I mean, that's they always seem to be like.

Colin McCabe:

Well, cloud-native is definitely a good... I mean, adding APIs absolutely made Kafka more cloud-native. I would say like just in general becoming more battle-tested and more production-ready, having a very stable API that you can depend on is part of that rather than just modifying things in ZooKeeper directly.

Colin McCabe:

When Kafka originally started, you would just modify stuff in ZooKeeper directly. And it was all very tied to a specific format. So when you upgraded, maybe that format was a little different. So you'd have to use the right admin tools. Over time, we've come to realize the importance of APIs to our users and just in general to building the system and we've gotten much better at that. Any new feature in Kafka right now comes with a discussion of compatibility, how does it interact with upgrade, and obviously people are going to be thinking about how this works in the cloud, as well as on-premise.

Colin McCabe:

Obviously when Kafka was first created, the very, very early stage, it was more of like a proof of concept, but that was a long time ago. But anyway, having these stable APIs has allowed us to sort of switch the backend and replace your old ZooKeeper backend with your shiny, new KIP-500 backend. And hopefully, as with the old coffee maker jingle, hopefully you can't tell the difference or it's even better with the new.

Jason Gustafson:

Yeah. Exactly.

Tim Berglund:

Is that for decaf? [crosstalk 00:07:25].

Colin McCabe:

Oh, it's definitely not decaf. I can tell the difference there.

Tim Berglund:

Yeah. I assure you, I'd be able to tell the difference. KIP-500 is not decaf.

Jason Gustafson:

It's been so long since we had this... since we did the last podcast. I think, I recall that a big part of that discussion was how do we take those old APIs that we've had that were kind of reaching into ZooKeeper directly and mutating state and replace them with eight guys that are at a little bit of the Kafka level. And so. I just kind of forgotten, but yeah, most of that work was completed probably a year ago. So basically some stragglers in terms of full support but that was a big part of it. There are a lot of ways that the old architecture kind of ended up getting exposed through APIs in ways that made this migration a little bit difficult.

Jason Gustafson:

Let me just give one example of that. In the old ZooKeeper world, the brokers, they all had access to ZooKeeper. So they all were able to accept any APIs which could mutate state. So for example, a user comes to the brokers. I wanted to change some configuration for some topic. You don't have find any particular node to do that. Any one of the brokers can reach into ZooKeeper, update the state, and everything's fine.

Jason Gustafson:

So with KIP-500, of course, that's no longer exactly the case anymore, or at least not directly because... Well, there's no ZooKeeper, so there's no access. So what we had to do in this case is actually introduce a mechanism for forwarding requests. So we have to be able to take the request from the client and we send it off to the Raft quorum where the change is actually applied. So it's analogous, but you know, all of this stuff requires a considerable amount of work.

Tim Berglund:

Absolutely. And it's been fun over the past year and a half. You know I'm not a committer, but I'd certainly keep up on at least a major KIPs in each release. And I make videos about each release. So like, I have to understand what those things are about. And it's been cool to see just the little bits of connective tissue between brokers and ZooKeeper kind of getting cut away a little piece at a time. And each one's small, it's a little admin API. It's no big thing, but here's a cleaner way to do it. But you know, clearly a part of the master plan.

Jason Gustafson:

Yeah, that's exactly right. What makes a project like this so difficult is that you have to... There are so many places in the code where this expectation of having ZooKeeper there... A lot of people, they get to do something that worked much faster, so people took advantage of that. But every single one of those little things, they actually kind of turned into a project of their own. Like, before, for example, we have some extensions for Jay, Bob, just a bunch of disks where we would rely on ZooKeeper to propagate changes to the controller. But then, in itself, you kind of have to redesign that feature when you no longer have the ZooKeeper anymore. So even though these little pieces, they seem small and sometimes they are small, but every one of them has to be dealt with, which is why this project, it takes such a long time to get to completion.

Tim Berglund:

Right. The new controller in our podcast of 600 years ago, you kind of sketched out what it would look like. Walk us through that again and to whatever extent is necessary. Tell us about how that contrasts with the way it used to work. So what's the...

Colin McCabe:

Yeah. Good question. So the way the old controller works is basically, it uses ZooKeeper as it's backing store. However, it's not the only entity that can access ZooKeeper. So the old controller has to treat all of its modifications to ZooKeeper as kind of provisional or conditional, like it has to check what's there and do sort of a transaction. So it's a little bit like you're accessing the database, but maybe someone else's, too. So it makes it a little bit hard to have the most up-to-date information about what's going on in the cluster, because really what you want is you want that local cache of what's in memory. So you can do things quickly. And so you can also have the information at hand to do whatever you want to do.

Colin McCabe:

In the new controller... So I guess from the 10,000 foot view, I would say from the user's point of view, the biggest difference is you have standby controllers now.

Colin McCabe:

So you typically would have either two or four standby controllers in addition to your primary controller. And those standby controllers are ready to take over if the primary controller fails and they can do that without having this lengthy loading process that the ZooKeeper-based controller did. And that's again, because we've integrated the data store with the controller itself. So all of these standby controllers will be following along with whatever minute data modifications are happening rather than having to load it all in sort of a big bang fashion, if a failover happens. So one thing that's really exciting is [crosstalk 00:13:26] the ability to...

Tim Berglund:

A locally cached copy.

Colin McCabe:

Yeah.

Tim Berglund:

I used to have to go and read a thing out in ZooKeeper and have a locally cached copy. And now the locally cached copy is the metadata. It's the state.

Colin McCabe:

Yeah. One simple way to put it is ZooKeeper doesn't have a way to like follow all the changes. You can follow a particular Znode. You can even follow the children of the Znode, but you can't get all the changes. So there's always going to be something that you miss and that makes it really hard to do a standby with that architecture. I don't think it's impossible, but I do think it would be very hard and you would end up with an architecture that doesn't really feel very nice. But in the case of KIP-500, you actually do have that full in-memory safe. It's just slightly behind, so once you replay the latest updates, then you're fully up-to-date.

Colin McCabe:

And because of Raft, we have this guarantee that when an update has been committed to Raft, that means a majority of the controller nodes have that change. And the next leader must be chosen from one of those notes that has the change.

Tim Berglund:

Got it, got it.

Colin McCabe:

Which is, again, it's pretty much the same guarantee ZooKeeper has with their ZK protocol. But it's just that by integrating the logic of the controller with the ref logic itself, we have something which is sort of more than the sum of its parts.

Tim Berglund:

So for now, you stressed that this is not for production use and that's I think very clearly communicated about 2.8 versus what 2.8. is released like, "No seriously, don't do this." And that's cool because it's new and large and we need more miles on it when you do more testing, but also not every feature is there yet. Thinking specifically of 2.8. We're kind of ringing this bell saying, "Hey, KIP-500 is merged. It's okay. It's all over now." But there's still a phased implementation here. So what can we do now? And what can we not do now in 2.8?

Jason Gustafson:

Yeah. Yeah. That's a good question. So, what I would say we've done so far is the most significant parts I think are probably the Raft implementation and the controller. But there's kind of a long tail here of additional features that we kind of build into the system. So what you can do right now in 2.8, you can start up a cluster without ZooKeeper. You can start it off with, you can run it as a single node. For example, you can run it as a quorum of three nodes or five nodes or whatever. So you can do all of this. What we support today is basically like, you can create topics, you can write to them, you can read from them and you can delete them.

Jason Gustafson:

There's a bunch of administrative stuff which you cannot do. For example, a lot of this stuff around security. So, the ability to change scram credentials through the new controller, we just haven't gotten around to implementing that API. And there are a few things like this. I think, partitioned reassignment is one of the ones. We were hoping to get this into 2.8. We couldn't quite get there. So I think, this is going to come along the way. And there's some small ones. To use EOS for example. We depend on this logic that we use to have unique producer ideas. We just didn't quite get that too in the new controller yet. So basically you can't use EOS yet. Then there was another one that was kind of disappointing. All of these ones are not large in terms of the amount of work, there's not just enough time to get it into 2.8. So what you can do, create topics, write to them, read from them and then delete them.

Colin McCabe:

Yeah. We have a long list of things that you we're going to follow up on. EOS is definitely one of those partitioned reassignment, definitely one. JBOD, the ability to have multiple data directories that span multiple discs. That's something we're going to have soon. We don't have it yet in 2.8 for KIP-500. Definitely security. Yeah. There's a few things we want to follow up on there. We don't have the ZooKeeper free authorizer yet, which we will soon. So yeah, those are just some examples of the things that we don't have yet that we will soon.

Tim Berglund:

Sure. Plenty of work to do yet. But the core of the thing that replaces ZooKeeper is present and that's awesome.

Tim Berglund:

That single node thing that you mentioned, like if you hadn't thought of that as some independent thing, it makes perfect sense now. Like why wouldn't you do that? It's easy to maintain consent, easy to achieve consensus. You're always consistent and you don't need a ZooKeeper hanging around by definitions. That's kind of cool. That sounds like that would be handy for heavier weight integration tests and tutorials. I don't know. We're speculating, so this isn't like backlog or commitments of what the project is going to do or any sort of a Confluent opinion. But what else would you do with a single node?

Jason Gustafson:

Yeah, that's a good question. I think, well, the one you mentioned there is sort of what we've already started doing. Integration testing. So a lot of our integration tests, we do use a single node just for convenience and it's cheaper and faster. I think also, this has been an asking in the community for quite a while to have sort of like a standalone Kafka. I think where I've seen this is sort of like edge use cases. So like IOT, for example, or basically like, you've got a lot of remote sensors and remote things which need to ship data into some centralized place. And oftentimes, these are kind of constrained systems.

Jason Gustafson:

They don't necessarily have enough to be able to build like a three-node quorum or something like this. Basically you're just kind of using it as a temporary staging point, where you've got some expectation of unavailability in that just because of the network. You can kind of stage your changes all in Edge Kafka single node. And then as access is available, whatever, you can push it to the centralized place. I think that's a pretty common place that I've seen it.

Tim Berglund:

Yeah. I personally am going to run that on a Raspberry PI before too long. So don't worry. I'm sure it's been done already, but that's just too good. And yeah, you have like a simplistic sort of, it doesn't have to be Kafka connect, but basically a connector that when you get connectivity, you consume from your local thing and produce to the faraway cluster.

Colin McCabe:

Yeah. I think that in general, scalability is obviously it means scaling up, but sometimes it also means scaling down and the Raspberry PI or Edge router or other example like that is a great example of that. So having like one JVM rather than two, I mean, yeah, sure. When you have a beast of a node, it doesn't seem like a big deal, but it actually can be a big deal with certain applications. And another thing that we have with KIP-500 is we're going to give people the ability to run with three nodes rather than four, and still be able to roll effectively. And this probably requires a little bit of explanation.

Tim Berglund:

Tell me about that.

Colin McCabe:

So definitely a lot of people will run with four nodes rather than three, only for one reason. Which is that when there's a roll going on, there's actually only three nodes available most of the time because one of them is going down.

Tim Berglund:

Like a rolling upgrade.

Colin McCabe:

Right. So if you choose to create all your topics with replication factor three which most people do for safety, you will not be able to create any topics if you only have two brokers out of three with the current system.

Colin McCabe:

But with KIP-500, we actually would like that to be possible. And this is possible because we now retain the memory of that node, which we didn't before. So when the old system... When a node went away, there was kind of no record of it ever being there. You were there in the cluster or you were out of the cluster. We've actually added a third state with KIP-500, which is you're in the cluster, but you're not available currently. And adding that third state makes it possible to even place new topics on that node, which isn't available now, but we expect it to be available soon.

Jason Gustafson:

I think this is a really good example. We could have implemented this on top of ZooKeeper as well. But this is a good example of the ways that as we were designing 500 architecture and kind of breaking down these points, there are a lot of things in Kafka which people take for granted like, well, just the implementation is that way. It's always been that way. Why would we ever want to change it? So I think as this product KIP-500, we're kind of forced to break that down and say, "Okay, what is this doing? Is it doing the right thing? What could we improve there?" I think this is a nice case where, as a part of this redesign, we're able to kind of remove some of these limitations that we had previously.

Tim Berglund:

Absolutely. How about performance? I know the sore spot has always been failover in the presence of a larger number of partitions. That's kind of the thing that's got the spotlight on it. So tell us about that, obviously. How has that worked out and what other performance implications have you been able to measure so far?

Jason Gustafson:

Yeah, that's a good question. I think Colin talked a little bit about the way that now we have these standby controllers. So the big difference is that everyone in the cluster, all the nodes in the cluster, they all see the same stream of events in the same order. So all the nodes in the cluster can basically build up the state incrementally.

Jason Gustafson:

Whereas in the old controller, you got this. After a controller failover, you need to do this state reloading thing. And the big problem is that, the complexity of this operation is big O of the total number of partitions in the system.

Jason Gustafson:

So with the new controller, you don't have that anymore because the changes are being applied incrementally on all of the controllers that may become or standby controllers, which may be elected.

Jason Gustafson:

I think we've done some preliminary testing. I think we've gone up as many as a couple of million petitions. So I think what we've observed is, with the old controller, with the ZooKeeper-based one, that controller failover can take like more than 20 minutes, something like this. So it's just totally infeasible to run a cluster with that many partitions with that architecture.

Jason Gustafson:

With the new one, controller failover is nearly constant time. The only exception to that is you have to wait for the time for the controller itself to detect that the old one has failed. And then the other one, we'll hold an election and it can pick up. But our performance testing shows quite, bring that down to basically the cost of that detection time.

Tim Berglund:

Got it. And that's necessarily... It's not microseconds folks. That's not how this works. That's on the order of tens, small number of tens, of seconds, at least is the data that I've seen to detect that something is broken and you have to failover. But the actual failover, is that much? Is that fast? That's awesome.

Colin McCabe:

I've been looking at different numbers for failover. I think that this is something that will tweak and will just learn by doing to some extent what numbers work best for failover. I've been pushing for low numbers. There are trade-offs, and it may also depend on the application. But my hope is that, the controller failover can be so cheap that we can even be a little aggressive and maybe reduce that. Because if we have a failover, we didn't necessarily need to do... Maybe it's not that big of a deal if failover is cheap. So wherever possible in the system, we've been trying to replace things that are order N, where N is number of partitions or number of topics with things that are ordered one, constant time. And obviously controller failover is one example of that with a hot standby.

Colin McCabe:

Another example is when you first boot up the broker. We would like to actually have that information about a minute on cache on the broker right there. We have that now with KIP-500, because you actually have that metadata topic present when you start up. So this is a first for Kafka. Durably persisting the metadata onto the broker. Definitely not something we used to do and it's something we do too, now. And there's a few things we need to do to optimize that.

Colin McCabe:

Actually, one thing that I guess... I forgot to mention earlier, we didn't manage to get in loading snapshot support for the broker, which is another big omission in 2.8 coming very soon. But as soon as loading snapshot support is there, we expect that actually to be pretty quick for our broker startup.

Tim Berglund:

Last question, and almost a time here. But what's next? We've talked about what isn't happening yet in 2.8. But what should we look forward to in the future in as much as you can speak to that?

Jason Gustafson:

Yeah. I think there are two things that are kind of central in my mind. One of them we've already talked a little about which is just kind of dealing with this long tail of features that we have to get done. I'd say the other big thing is actually getting some production experience and start. Get this deployed, for example, to compliment cloud and start soaking it and do load testing, do all of this stuff. And you'll start getting some experience with the timeouts as Colin was talking about. Basically start turning it into building some working experience with it and tuning it and ensuring its behavior and all of this stuff so that we start having some confidence. So by the time 3.0 comes out, we can say, well, we've been at least soaking it for several months or something like that. So I think these two things are in a foremost of my mind.

Colin McCabe:

Yeah, definitely. I would also add, we're going to do a lot of work on upgrade, making upgrade work smoothly so that you can actually migrate your data from a ZooKeeper-based cluster to a KIP-500 based cluster that isn't there yet, but it will be soon so that little thing is still in the future.

Tim Berglund:

My guest today has been Colin McCabe and Jason Gustafson. Colin and Jason, thanks for being a part of Streaming Audio.

Jason Gustafson:

Thank you Tim.

Colin McCabe:

Well, thanks Tim.

Tim Berglund:

And there you have it. Hey, you know what you get for listening to the end? Some free Confluent Cloud. Use the promo code 60PDCAST—that's 60PDCAST—to get an additional $60 of free Confluent Cloud usage. Be sure to activate it by December 31st, 2021, and use it within 90 days after activation. Any unused promo value after the expiration date is forfeit and there are a limited number of codes available. So don't miss out. Anyway, as always, I hope this podcast was useful to you. If you want to discuss it or ask a question, you can always reach out to me on Twitter @tlberglund, that's T-L-B-E-R-G-L-U-N-D. Or you can leave a comment on a YouTube video or reach out on Community Slack or on the Community Forum. There are sign-up links for those things in the show notes. If you'd like to sign up and while you're at it, please subscribe to our YouTube channel and to this podcast, wherever fine podcasts are sold. And if you subscribe through Apple podcasts, be sure to leave us a review there that helps other people discover it, especially if it's a five-star review. And we think that's a good thing. So thanks for your support, and we'll see you next time.

Jason Gustafson and Colin McCabe, Apache Kafka® developers, discuss the project to remove ZooKeeper—now known as the KRaft (Kafka on Raft) project. A previous episode of Streaming Audio featured both developers on the podcast before the release of Apache Kafka 2.8. Now they’re back to share their progress.

The KRraft code has been merged (and continues to be merged) in phases. Both developers talk about the foundational Kafka Improvement Proposals (KIPs), such as KIP-595: a Raft protocol for Kafka, and KIP-631: the quorum-based Kafka controller. The idea going into this new release was to give users a chance to try out no-ZooKeeper mode for themselves. 

There are a lot of exciting milestones on the way for KRaft. The next release will feature Raft snapshot support, as well as support for running with security authorizers enabled.  

Continue Listening

Episode 159May 20, 2021 | 42 min

Engaging Database Partials with Apache Kafka for Distributed System Consistency ft. Pat Helland

When compiling database reports using a variety of data from different systems, obtaining the right data when you need it in real time can be difficult. With cloud connectivity and distributed data pipelines, Pat Helland (Principal Architect, Salesforce) explains how to make educated partial answers when you need to use the Apache Kafka® platform. After all, you can’t get guarantees across a distance, making it critical to consider partial results.

Episode 160May 25, 2021 | 38 min

Running Apache Kafka Efficiently on the Cloud ft. Adithya Chandra

Focused on optimizing Kafka performance with maximized efficiency, Confluent’s Product Infrastructure team has been actively exploring opportunities for scaling out Kafka clusters. They are able to run Kafka workloads with half the typical memory usage while saving infrastructure costs, which they have tested and now safely rolled out across Confluent Cloud. In this episode, Adithya Chandra explains how.

Episode 161June 8, 2021 | 32 min

Adopting OpenTelemetry in Confluent and Beyond ft. Xavier Léauté

Collecting internal, operational telemetry from Confluent Cloud services and thousands of clusters is no small feat. Traditionally, this data needs to be collected in multiple ways to satisfy all the different requirements. However, this sometimes leads to discrepancies between various systems. With OpenTelemetry, we can collect data in a vendor-agnostic way. Many vendors already integrate with OpenTelemetry, which gives us the flexibility to try out different observability solutions with minimal effort, without the need to rewrite applications or deploy new agents.

Got questions?

If there's something you want to know about Apache Kafka, Confluent or event streaming, please send us an email with your question and we'll hope to answer it on the next episode of Ask Confluent.

Email Us

Never miss an episode!

Confluent Cloud is a fully managed Apache Kafka service available on all three major clouds. Try it for free today.

Try it for free