VP Developer Relations
We saw previously the outline of the data sources that we're going to use in our pipeline. We set up a data generator to create the ratings events. Now let's see about accessing our database to get the reference information about the customers who are leaving these ratings.
We'll use something called change data capture (often called CDC), plus snapshots. This approach enables us to capture everything already in the database, along with new changes made to the data. There are two flavors of CDC:
Query-based CDC
Log-based CDC
Which one you should use depends on several factors. First off, we need to understand a little bit about how each style of CDC works.
Query-based CDC uses a database query to pull new data from the database. The query will include a predicate to identify what has changed. This will be based on a timestamp field or an incrementing identifier column (or both).
Query-based CDC is provided by the JDBC connector for Kafka Connect, available as a fully managed service in Confluent, or as a self-managed connector.
Log-based CDC uses the database's transaction log to extract details of every change made. The particular transaction log implementation and specifics will vary by database, but all are based on the same principles. Every change made in the database is written to its transaction log (known in various different implementations as the redo log, binlog, etc.).
The changes written to the transaction log include inserts, updates, and even deletes. So when two rows are written to the database, two entries are added to the transaction log. Those two entries from the transaction log are decoded and the actual data from the database row is written to two new events in Apache Kafka.
One of the several benefits of log-based CDC is that it can capture not just what the table rows look like now, but also what they looked like before they were changed.
Popular implementations of log-based CDC are the connectors from the Debezium project, which are available fully managed on Confluent and support several databases, including PostgreSQL, MySQL, and SQL Server, as well as the Confluent Oracle CDC Source Connector.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.
Now in the previous exercise, if you saw that, we saw the outline of the data sources that we're going to use in our pipeline here. We set up a data generator to create the ratings events. Now let's see about getting the reference information about the customers who are leaving these ratings from our database. So, we're going to use something called Change Data Capture, sometimes abbreviated CDC. If you're in the US that's not the Centers for Disease Control, which kind of throughout 2020, early 2021 seemed to be in the news a little bit more than usual. I can't remember why. But a Change Data Capture a CDC is going to enable us to capture everything that's already in the database along with changes that happen are made to the data after that initial capture. Let's talk about how it works. There are two kinds of CDC, query-based and log-based. And you need to understand strengths and weaknesses, limitations, differences so you can make an informed decision about which one to use when it's your time to make a call on this. So let's dig into how each one works. Now, query-based CDC uses a regular database query to pull new stuff out of the database. So if the only interface you have to the database is just queries you can run, it's, doesn't seem like it's asking too much, you know, you can usually submit like a select and then get things back. That's kind of minimal. If that's the only interface you have, you don't have any view into the internals, then you can do this. The query will always include a predicate to identify what has changed. This will be based on a timestamp field or an monotonically incrementing identifier, or both. It's really it's one, the other, or both of those two things. I'm showing you a timestamp example here. So basically here's how that works. There's, these two records in the database right now, those two bars inside, and we want to get them to Kafka. And so if we run that query and we say, "Hey, give me everything that's greater than the previous timestamp." Which by the way, starts at zero. So yes, all of the timestamps are greater than that. We're going to capture those two and we see those two messages show up in Kafka. Those two blocks underneath the Kafka logo there. If another row shows up, we run that query again now with the timestamp of the last time we ran that query and we'll get that extra insert. We'll see that insert. And if there are timestamped columns and they're being faithfully updated, we'll capture inserts and updates this way with query-based CDC. But the query is polling the database. That's the thing you have to do every so often, maybe every second, maybe every 10 seconds. Maybe you'll get in a fight with your production DBA about what that's going to be, but it has to keep running with that updated timestamp to get those new records, to get those, those new events produced into your Kafka topic. The Kafka Connect JDBC Source Connector from Confluent is an example of this. This is what you'd usually use for query-based CDC. So it just uses the regular JDBC interface. Pretty much any database you're going to use is going to be JDBC compatible. And it does that polling and uses an ID or a timestamp or both to get new and changed records. Log-based CDC is a little bit different. It's no nice friendly JDBC interface. Let me give you some SQL and you give me a resultset back. No, we're actually looking at the database as transaction log. This is a binary file somewhere on the database. And the Connector has to in some way, get access to that data. Now, the way that works is entirely implementation dependent. Okay? If you look at how it works on MYSQL, absolutely not the same as PostgreSQL, totally different thing in Oracle. It's a new and fun adventure every time. But the principle is always the same. When a database receives a mutation, when someone says, I would like to insert a record or update a record, or now with log-base CDC, delete a record. This transaction log gets written to, and it's, it's called different things. It might be called the redo log or the bin log. Again, that's even the terminology is implementation dependent. Depends on the database, but it gets appended to inserts updates and delete every time you do one of those things that gets logged. Now, if you're thinking, "Wait, isn't Kafka a log?" And doesn't that seem kind of similar and feel like there's just kind of this idea that always emerges, which is that you start with a log and you build things on top of it. You're right and we're not going to get into that, but it's an interesting observation that there's a log inside the database. And when these two rows that we started with before, remember these two initial rows, when they're written, they get written to that transaction log and we are, in some sense, the Connector is subscribed to that log. Whatever that means, again, depends on the database and the implementation. It might mean like there's some network connection. It might means I'm looking at a file. If there's there's different things that could mean, but I'm subscribed to those changes I see them. And now I can decode them from their internal format in that binary file and write them as formatted messages in Avro or Protobuf or JSON or something inside the Kafka topic. The cool thing about log-based CDC is that can capture not just updates and new row creation, not just updates and inserts, not just what the table rows look like now, but it also can tell you about the history, what, how, how they got to where they are right now. If you've still got that stuff in the bin log, and they can tell you about deletes. So rather than showing you a new insert, we looked at that before. Now, if I delete that second row, well, that delete gets appended to the bin log or to the transaction log in the database. I'm subscribed to that log. And so now I produce that delete record to my Kafka topic. So I can see you deleting things. With query-based CDC you can't see deletes happen, but with log-based CDC you can. Also with query-based CDC, you will miss things. If there are multiple changes to a row in between polls, you just don't see them. If I'm polling every 10 seconds, because my DBA got mad if I did it more than that, and trust me, it was ugly. And we don't want to revisit that. We're just going to live with 10 seconds, but somebody updated that row five times since my last ten second query, I won't see those I'll just see the results of the last one. With log-based CDC I see every mutation. So it's more precise. And in general, it's a good thing to do if you can, but it's not always necessary. Examples of CDC or rather log-based CDC Connectors would be the Confluent Oracle CDC Connector and the, all the Connectors from the Debezium Project. Debezium is an open source project that does CDC really well. It's basically a bunch of Kafka Connect Connectors. That'll talk to PostgreSQL MYSQL SQL server. Some other things there's potentially support for Cassandra being worked on in the open there. So all kinds of neat things going on and a good use you can get out of Debezium for log-based CDC. And that wraps it up. That is a quick overview of Change Data Capture. Now let's apply this in an exercise.