Change Data Capture (CDC) is a technique you can use to track row-level changes in database tables in response to create, update, and delete operations. Debezium is a distributed platform that builds on top of CDC features available in different databases (for example, binlog replication with MySQL). Debezium provides a set of Kafka Connect connectors that tap into row-level changes in database table(s), convert those changes into event streams, and then send those streams to Apache Kafka.
Most commonly, Debezium is deployed by using Apache Kafka Connect. Kafka Connect is a framework and runtime for implementing and operating:
This tutorial walks you through setting up a CDC-based system on Azure by using Azure Event Hubs (for Kafka), Azure Database for MySQL- Flexible Server, and Debezium. In this case, the Debezium MySQL connector will stream database modifications from MySQL to Kafka topics in Azure Event Hubs.
The architecture of a CDC pipeline based on Debezium, Apache Kafka, and Azure Event Hubs is shown in the following diagram:
In this post, you’ll learn how to:
Before you begin completing the process outlined in this post, ensure that you have:
This section covers the following topics:
curl https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.6.2.Final/debezium-connector-mysql-1.6.2.Final-plugin.tar.gz --output debezium-connector-mysql.tar.gz
2. To extract the tar file contents, run the following command:
tar -xvzf debezium-connector-mysql.tar.gz
You should now see a new folder named debezium-connector-mysql.
3. To copy the connector JAR files to your Kafka installation, run the following command:
export KAFKA_HOME=[path to kafka installation e.g./home/kafka/kafka]
cp debezium-connector-mysql/*.jar $KAFKA_HOME/libs
Note: To confirm that the binaries have been copied, run the following command:
ls -lrt $KAFKA_HOME/libs | grep mysql
Note: For more information on how to install and latest builds, see the Debezium documentation.
When redirecting Kafka Connect throughput from Kafka to Event Hubs, some minimal reconfiguration is necessary. The following sample configuration file, contains detail that shows how to configure the Kafka endpoint on Event Hubs.
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
group.id=connect-cluster-group
# connect internal topic names, auto-created if not exists
config.storage.topic=dbz-mysql-cdc-connect-cluster-configs
offset.storage.topic=dbz-mysql-cdc-connect-cluster-offsets
status.storage.topic=dbz-mysql-cdc-connect-cluster-status
# internal topic replication factors - auto 3x replication in Azure Storage
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
rest.advertised.host.name=connect
offset.flush.interval.ms=10000
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
# required EH Kafka security settings
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
plugin.path={KAFKA.DIRECTORY}/libs
2. After you replace the values, save the updated connect-distributed.properties configuration file.
./bin/connect-distributed.sh /PATH/TO/connect-distributed.properties
Kafka Connect uses the Kafka AdminClient API to automatically create topics with recommended configurations, including compaction. After the connector has started, a quick check of the Event Hub namespace in the Azure portal helps to confirm that the Connect worker's internal topics have been created automatically. Kafka Connect internal topics must use compaction.
3. In the Azure Portal, navigate to Event Hub Namespace --> Event Hubs.
You should see internal topic names that we mentioned in connect-distributed.properties configuration file have been auto-created.
{
"name": "classicmodel-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "{SERVER-NAME}.mysql.database.azure.com",
"database.user": "{USER-NAME}",
"database.port": "3306",
"database.password": "{PASSWORD}",
"database.server.id": "1",
"database.server.name": "mysql-connector-demo",
"database.history.kafka.bootstrap.servers": "127.0.0.1:9092",
"database.history.kafka.topic": "dbhistory.classicmodels",
"include.schema.changes": "true"
}
}
2. Edit the mysql-source-connector.json configuration file and replace the details below to match your setup.
Note: For more details on the see Debezium Connector for MySQL.
After you replace the values, save the updated mysql-source-connector.json configuration file.
3. To create an instance of the connector, use the Kafka Connect REST API endpoint by running the following command:
curl -X POST -H "Content-Type: application/json" --data @mysql-source-connector.json http://localhost:8083/connectors
4. To check the status of the connector, run the following command:
curl -s http://localhost:8083/connectors/classicmodel-connector/status
Note that a new topic ID has been created, as shown below:
To see CDC in action, you’ll need to create/update/delete records in the Azure Database for MySQL database.
mysql -h {SERVER-NAME}.mysql.database.azure.com -u mysqladmin –p
2. Create a table and insert records by running the following command:
CREATE TABLE todos (id SERIAL, description VARCHAR(50), todo_status VARCHAR(12), PRIMARY KEY(id));
INSERT INTO todos (description, todo_status) VALUES ('setup mysql on azure', 'complete');
INSERT INTO todos (description, todo_status) VALUES ('setup kafka connect', 'complete');
INSERT INTO todos (description, todo_status) VALUES ('configure and install connector', 'in-progress');
INSERT INTO todos (description, todo_status) VALUES ('start connector', 'pending');
If you navigate to the Event Hub, you should see counts of requests and messages getting updated to the Event Hub.
We can now look at the contents of the topic to make sure everything is working as expected. The below example uses kafkacat, but you can also create a consumer using any of the options listed in the article Apache Kafka developer guide for Azure Event Hubs.
For the purposes of this blog post, we’re using version 1.5, which you can check by running the command:
kafkacat -V
metadata.broker.list={YOUR.EVENTHUBS.FQDN}:9093
security.protocol=SASL_SSL
sasl.mechanisms=PLAIN
sasl.username=$ConnectionString
sasl.password={YOUR.EVENTHUBS.CONNECTION.STRING}
2. Edit the kafkacat.conf file and replace the details below to match your setup.
For instructions on getting the connection string, see Get an Event Hubs connection string.
Here's an example configuration:
sasl.password=Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX
6. In a different terminal, to start a session for the Kafka connect, run the following commands:
export KAFKACAT_CONFIG=kafkacat.conf
export BROKER={YOUR.EVENTHUBS.FQDN}:9093
export TOPIC=mysql-connector-demo.classicmodels.todos
kafkacat -b $BROKER -t $TOPIC -o beginning
Here’s a snippet of the payload:
{
"payload":{
"before":null,
"after":{
"id":104,
"description":"to check from kafkacat",
"todo_status":"complete"
},
"source":{
"version":"1.6.2.Final",
"connector":"mysql",
"name":"mysql-connector-demo",
"ts_ms":1631598198000,
"snapshot":"false",
"db":"classicmodels",
"sequence":null,
"table":"todos",
"server_id":3237408199,
"gtid":"d5978bbf-bf8f-11eb-9c11-000d3a1e8e2e:347",
"file":"mysql-bin.000053",
"pos":51636,
"row":0,
"thread":null,
"query":null
},
"op":"c",
"ts_ms":1631598198431,
"transaction":null
}
}
Now that all the todos table changes are being captured in the Event Hubs topic, we’ll use the FileStreamSink connector (available by default in Kafka Connect) to consume these events.
{
"name": "cdc-file-sink",
"config": {
"connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"tasks.max": "1",
"topics": "mysql-connector-demo.classicmodels.todos",
"file": "./todos-cdc.txt>"}
}
2. Replace the file attribute based on your file system, and then save the file.
3. To create the connector and check its status, run the following command:
curl -X POST -H "Content-Type: application/json" --data @file-sink-connector.json http://localhost:8083/connectors
4. To check the status, run the following command:
curl http://localhost:8083/connectors/cdc-file-sink/status
5. Insert/update/delete database records, and monitor the records in the configured output sink file by running the following command:
tail -f ./todos-cdc.txt
Note: This approach also works with Azure Database for MySQL - Single Server.
This concludes my guidance on setting up a CDC-based system on Azure by using Azure Event Hubs (for Kafka), Azure Database for MySQL- Flexible Server, and Debezium.
CDC captures incremental changes in the original database so that they can be propagated to other databases or applications in near real-time. Though for demonstration purposes I have used managed Azure services, these instructions should work for any other setup, for example a local Kafka cluster and MYSQL instance. After change event records are in Apache Kafka, different connectors in the Kafka Connect ecosystem can stream the records to other systems and databases such as Elasticsearch, data warehouses, and analytics systems, as well as caches such as Infinispan.
Use of the Apache Kafka Connect framework as well as the Debezium platform and its connectors are not eligible for product support through Microsoft.
Many Apache Kafka Connect scenarios will be functional, but the conceptual differences between Apache Kafka's and Azure Event Hubs' retention models may result in certain configurations that do not work as expected.
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.