-
Clone the repository
git clone https://github.com/confluentinc/demo-scene.git cd demo-scene/kafka-connect-single-message-transforms
-
If you want to write to S3:
-
Create the S3 bucket, make a note of the region
-
Obtain your access key pair
-
Update
.env
with your AWS credentials (seesample.env
for the format to use)-
Alternatively, hard code them in the Docker Compose file - just make sure you don’t check it back in to GitHub :D
-
-
-
Bring the stack up
docker-compose up -d
-
Wait for Kafka Connect to start up
bash -c ' \ echo -e "\n\n=============\nWaiting for Kafka Connect to start listening on localhost ⏳\n=============\n" while [ $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) -ne 200 ] ; do echo -e "\t" $(date) " Kafka Connect listener HTTP state: " $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) " (waiting for 200)" sleep 5 done echo -e $(date) "\n\n--------------\n\o/ Kafka Connect is ready! Listener HTTP state: " $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) "\n--------------\n" '
curl -i -X PUT -H "Content-Type:application/json" \
http://localhost:8083/connectors/source-voluble-datagen-00/config \
-d '{
"connector.class" : "io.mdrogalis.voluble.VolubleSourceConnector",
"genkp.customers.with" : "#{Internet.uuid}",
"genv.customers.name.with" : "#{HitchhikersGuideToTheGalaxy.character}",
"genv.customers.email.with" : "#{Internet.emailAddress}",
"genv.customers.location->city.with" : "#{HitchhikersGuideToTheGalaxy.location}",
"genv.customers.location->planet.with" : "#{HitchhikersGuideToTheGalaxy.planet}",
"topic.customers.records.exactly" : 10,
"genkp.transactions.with" : "#{Internet.uuid}",
"genv.transactions.customer_id.matching" : "customers.key",
"genv.transactions.cost.with" : "#{Commerce.price}",
"genv.transactions.card_type.with" : "#{Business.creditCardType}",
"genv.transactions.item.with" : "#{Beer.name}",
"topic.transactions.throttle.ms" : 500
}'
See also 🎥 Kafka Connect in Action : S3 Sink (👾 kafka-to-s3 demo code
)
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/sink-s3-00/config \
-d '{
"connector.class" : "io.confluent.connect.s3.S3SinkConnector",
"storage.class" : "io.confluent.connect.s3.storage.S3Storage",
"s3.region" : "us-west-2",
"s3.bucket.name" : "rmoff-smt-demo-01",
"topics" : "customers,transactions",
"tasks.max" : "4",
"flush.size" : "16",
"format.class" : "io.confluent.connect.s3.format.json.JsonFormat",
"schema.generator.class" : "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
"schema.compatibility" : "NONE",
"partitioner.class" : "io.confluent.connect.storage.partitioner.DefaultPartitioner"
}'
flush.size
of 16
is stupidly low, but if it’s too high you have to wait for your files to show up in S3 and I get bored waiting.
See also 🎥 Kafka Connect in Action : JDBC Sink (👾 demo code
) and 🎥 ksqlDB & Kafka Connect JDBC Sink in action (👾 demo code
)
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-00/config \
-d '{
"connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url" : "jdbc:mysql://mysql:3306/demo",
"connection.user" : "mysqluser",
"connection.password" : "mysqlpw",
"topics" : "transactions",
"tasks.max" : "4",
"auto.create" : "true"
}'
See also 🎥 Exploring the Kafka Connect REST API
curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
column -s : -t| sed 's/\"//g'| sort
sink | sink-jdbc-mysql-00 | RUNNING | RUNNING | RUNNING | RUNNING | RUNNING | io.confluent.connect.jdbc.JdbcSinkConnector
sink | sink-s3-00 | RUNNING | RUNNING | RUNNING | RUNNING | RUNNING | io.confluent.connect.s3.S3SinkConnector
source | source-voluble-datagen-00 | RUNNING | RUNNING | io.mdrogalis.voluble.VolubleSourceConnector
-
InsertField
Single Message Transform
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-00/config \
-d '{
"connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url" : "jdbc:mysql://mysql:3306/demo",
"connection.user" : "mysqluser",
"connection.password" : "mysqlpw",
"topics" : "transactions",
"tasks.max" : "4",
"auto.create" : "true",
"auto.evolve" : "true",
"transforms" : "insertTS",
"transforms.insertTS.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.insertTS.timestamp.field": "messageTS"
}'
Note auto.evolve=true
otherwise the target table won’t hold the new field unless it happens to exist already.
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/sink-s3-00/config \
-d '{
"connector.class" : "io.confluent.connect.s3.S3SinkConnector",
"storage.class" : "io.confluent.connect.s3.storage.S3Storage",
"s3.region" : "us-west-2",
"s3.bucket.name" : "rmoff-smt-demo-01",
"topics" : "customers,transactions",
"tasks.max" : "4",
"flush.size" : "16",
"format.class" : "io.confluent.connect.s3.format.json.JsonFormat",
"schema.generator.class" : "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
"schema.compatibility" : "NONE",
"partitioner.class" : "io.confluent.connect.storage.partitioner.DefaultPartitioner",
"transforms" : "insertTS",
"transforms.insertTS.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.insertTS.timestamp.field": "messageTS"
}'
This writes it as a unix epoch value - if you’d rather it in a string then you can use an additional Single Message Transform, TimestampConverter
:
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/sink-s3-00/config \
-d '{
"connector.class" : "io.confluent.connect.s3.S3SinkConnector",
"storage.class" : "io.confluent.connect.s3.storage.S3Storage",
"s3.region" : "us-west-2",
"s3.bucket.name" : "rmoff-smt-demo-01",
"topics" : "customers,transactions",
"tasks.max" : "4",
"flush.size" : "16",
"format.class" : "io.confluent.connect.s3.format.json.JsonFormat",
"schema.generator.class" : "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
"schema.compatibility" : "NONE",
"partitioner.class" : "io.confluent.connect.storage.partitioner.DefaultPartitioner",
"transforms" : "insertTS,formatTS",
"transforms.insertTS.type" : "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.insertTS.timestamp.field" : "messageTS",
"transforms.formatTS.type" : "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.formatTS.format" : "yyyy-MM-dd HH:mm:ss:SSS",
"transforms.formatTS.field" : "messageTS",
"transforms.formatTS.target.type" : "string"
}'