For architectural or legacy purposes, data-centric applications may write directly to a database. Event Processing Applications will need to reliably consume data from these systems using Events on Event Streams.
How do we update a value in a database and create an associated Event with at-least-once guarantees?
Some applications write directly to a database table, therefore the database table is the Event Source. We can deploy a Change Data Capture (CDC) solution to continuously capture writes (inserts, updates, deletes) to that database table and produce them as Events onto an Event Stream. The events in stream can then be consumed by Event Processing Applications. Additionally, the Event Stream can be read into a Projection Table, for example with ksqlDB, so that it can be queried by other applications.
Kafka connectors provide the ability to scalably and reliably move data between Apache Kafka® and other data systems. There are many types of connectors that can connect to many types of databases. For example, using the JDBC Source connector for Kafka Connect allows for streaming of changes from a source database to Kafka topics.
{
"connector.class": "MySqlCdcSource",
"name": "MySqlCdcSourceConnector_0",
"kafka.api.key": "****************",
"kafka.api.secret": "******************************",
"database.hostname": "database-2.<host-ID>.us-west-2.rds.amazonaws.com",
"database.port": "3306",
"database.user": "admin",
"database.password": "**********",
"database.server.name": "mysql",
"database.whitelist": "employee",
"table.includelist":"employees.departments",
"snapshot.mode": "initial",
"output.data.format": "AVRO",
"tasks.max": "1"
}
The above shows an example configuration to deploy a MySQL Source CDC Connector (Debezium) streaming data from a MySQL database to Kafka topics. The configuration determines the database to connect to (database.hostname) along with the database elements to source into Kafka (table.includelist). The output.data.format configuration instructs Kafka Connect which format to use when writing records (in this case the Apache Avro format).
Confluent CLI can deploy a connector on the command line from a configuration, for example:
confluent connect create --config <connector-config-file.json>