This article will explain the process of sending Json schema formatted topics from an HDInsight managed Kafka standalone server to a MySQL DB. The steps can be extended for a distributed system also. We have used Ubuntu 18.0.4 machines for the cluster.
There are some prerequisite steps:
Detail Steps:
$sudo apt-get install -y libmysql-java
This will install MySQL JDBC driver in your machine in the location /usr/share/java
Please place the mysql-connector-java-{version}.jar file in the appropriate plugin path.
To find the plugin path use the below command
$sudo find / -name kafka-connect-jdbc\*.jar
You will find the plugin paths, please put the jar file in the appropriate plugin path, if you skip this step, you may get "No Suitable Driver" found error.
Acquire the Zookeeper and Kafka broker data. Set up password variable. Replace PASSWORD with the cluster login password, then enter the command
$export password='PASSWORD'
Extract the correctly cased cluster name
$export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
Extract the Kafka Zookeeper hosts
$export KAFKAZKHOSTS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2);
To extract Kafka Broker information into the variable KAFKABROKERS use the below command
$export KAFKABROKERS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
Copy the connect-standalone.properties to connect-standalone.properties-1 and populate the properties as shown below.
$sudo cp /usr/hdp/current/kafka-broker/config/connect-standalone.properties /usr/hdp/current/kafka-broker/config/connect-standalone-1.properties
bootstrap.servers=<Enter the full contents of $KAFKABROKERS>
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets1
offset.flush.interval.ms=10000
rest.port=8085
plugin.path=/usr/hdp/current/kafka-broker/connectors/confluentinc-kafka-connect-jdbc-10.2.0
Download the relevant Kafka Plugins from the Confluent Hub to your local desktop
Create a new folder path on the edge node and set its properties
$sudo mkdir /usr/hdp/current/kafka-broker/connectors
$sudo chmod 777 /usr/hdp/current/kafka-broker/connectors
Using WINSCP or any other SCP tool of your choice upload the Kafka Connect plugins into the folder path /usr/hdp/current/kafka-broker/connectors
$sudo vi /usr/hdp/current/kafka-broker/connectors/mysql.properties
name=<name of the connector>
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=3
connection.url=jdbc:mysql://<mysqlserverNameFQDN>:3306/<databaseName>
connection.user=<userName>@<mysqlserverName>
connection.password=<Password>
table.name.format=<tableName>
auto.create=true
topics=<topiceName>
In a new session start the sink connector
$sudo /usr/hdp/current/kafka-broker/bin/connect-standalone.sh /usr/hdp/current/kafka-broker/config/connect-standalone-1.properties /usr/hdp/current/kafka-broker/connectors/mysql.properties
In a new session create a new topic
$/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic <sampleTopic name> --zookeeper $KAFKAZKHOSTS
$/usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list $KAFKABROKERS --topic <sampleTopic name>
> {"schema": {"type": "struct", "fields": [{"type": "int32", "optional": true, "field": "id" }], "optional": false, "name": "foobar"},"payload": {"id": 40000}}
For more details regarding Json Schema formats please refer here.
Please login to the mysql db and you should find a new entry to the database.
References:
Kafka Connect with HDInsight Managed Kafka - Microsoft Tech Community
JDBC Connector (Source and Sink) | Confluent Hub
HDInsight Managed Kafka with OSS Kafka Schema Registry - Microsoft Tech Community
Kafka Connect in Action: JDBC Sink - YouTube
Installing a JDBC driver for the Kafka Connect JDBC connector - YouTube
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.