-
Clone the repository
git clone https://github.com/confluentinc/demo-scene.git cd demo-scene/kafka-connect-single-message-transforms
-
Bring the stack up
docker-compose up -d
-
Wait for Kafka Connect to start up
bash -c ' \
echo -e "\n\n=============\nWaiting for Kafka Connect to start listening on localhost ⏳\n=============\n"
while [ $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) -ne 200 ] ; do
echo -e "\t" $(date) " Kafka Connect listener HTTP state: " $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) " (waiting for 200)"
sleep 5
done
echo -e $(date) "\n\n--------------\n\o/ Kafka Connect is ready! Listener HTTP state: " $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) "\n--------------\n"
'
curl -i -X PUT -H "Content-Type:application/json" \
http://localhost:8083/connectors/source-voluble-datagen-day5-00/config \
-d '{
"connector.class" : "io.mdrogalis.voluble.VolubleSourceConnector",
"genkp.day5-00-person.with" : "#{Internet.uuid}",
"genv.day5-00-person.firstName.with" : "#{Address.firstName}",
"genv.day5-00-person.lastName.with" : "#{Address.lastName}",
"genv.day5-00-person.fullAddress.with" : "#{Address.fullAddress}",
"genv.day5-00-person.phone_num.with" : "#{PhoneNumber.phoneNumber}",
"genv.day5-00-person.cc_num.with" : "#{Business.creditCardNumber}",
"genv.day5-00-person.cc_exp.with" : "#{Business.creditCardExpiry}",
"topic.day5-00-person.throttle.ms" : 500
}'
Here’s the source data:
$ docker exec kafkacat kafkacat -b broker:29092 -r http://schema-registry:8081 -s key=s -s value=avro -t day5-00-person -C -c1 -o beginning -u -q -J | jq '.'
{
"topic": "day5-00-person",
"partition": 0,
"offset": 0,
"tstype": "create",
"ts": 1607595830038,
"broker": 1,
"key": "648e4c63-ba83-4f3b-befe-505c07d9694f",
"payload": {
"Gen0": {
"phone_num": {
"string": "702.792.9316 x5223"
},
"lastName": {
"string": "Reinger"
},
"cc_exp": {
"string": "2015-11-11"
},
"firstName": {
"string": "Wayne"
},
"cc_num": {
"string": "1234-2121-1221-1211"
},
"fullAddress": {
"string": "2338 Andres Ville, North Tonymouth, OH 18398-3319"
}
}
}
}
Check out all that sensitive information! Plenty of times we don’t want to have that either stored in the Kafka topic in the first place, or we OK with that but we don’t want to stream it verbatim to a target sink.
We can mask the data from an source connector in Kafka Connect with the MaskField
Single Message Transform. Whilst the above Kafka Connect source connector is just generating dummy data, is still a Kafka Connect source connector nonetheless, so what I’m showing here is equally applicable to other source connectors—JDBC, Debezium, MQ, you name it—that’s the beauty of the Kafka Connect architecture, that you can mix & match connectors with transforms (with converters too, but that’s another subject for another day).
Let’s create a second version of the above Kafka Connect source connector, and this time mask out the credit card number at ingest:
curl -i -X PUT -H "Content-Type:application/json" \
http://localhost:8083/connectors/source-voluble-datagen-day5-01/config \
-d '{
"connector.class" : "io.mdrogalis.voluble.VolubleSourceConnector",
"genkp.day5-01-person.with" : "#{Internet.uuid}",
"genv.day5-01-person.firstName.with" : "#{Address.firstName}",
"genv.day5-01-person.lastName.with" : "#{Address.lastName}",
"genv.day5-01-person.fullAddress.with" : "#{Address.fullAddress}",
"genv.day5-01-person.phone_num.with" : "#{PhoneNumber.phoneNumber}",
"genv.day5-01-person.cc_num.with" : "#{Business.creditCardNumber}",
"genv.day5-01-person.cc_exp.with" : "#{Business.creditCardExpiry}",
"topic.day5-01-person.throttle.ms" : 500,
"transforms" : "maskCC",
"transforms.maskCC.type" : "org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.maskCC.fields" : "cc_num,cc_exp",
"transforms.maskCC.replacement" : "<masked>"
}'
Here’s what it does to the data written to Kafka:
$ docker exec kafkacat kafkacat -b broker:29092 -r http://schema-registry:8081 -s key=s -s value=avro -t day5-01-person -C -c1 -o end -u -q -J | jq '.'
{
"topic": "day5-01-person",
"partition": 0,
"offset": 201,
"tstype": "create",
"ts": 1607596416414,
"broker": 1,
"key": "f4d038da-bcc1-40d1-8b29-08d52305e796",
"payload": {
"Gen0": {
"fullAddress": {
"string": "Apt. 801 356 Frami Canyon, East Irene, CA 15876-9608"
},
"phone_num": {
"string": "(910) 077-1824 x74606"
},
"cc_exp": {
"string": "<masked>"
},
"lastName": {
"string": "Thompson"
},
"cc_num": {
"string": "<masked>"
},
"firstName": {
"string": "Wade"
}
}
}
}
See also 🎥 Kafka Connect in Action : JDBC Sink (👾 demo code
) and 🎥 ksqlDB & Kafka Connect JDBC Sink in action (👾 demo code
)
Here we’re going to stream the data from that topic above (day5-01-person
) to MySQL, with the assumption that whilst the owner of the Kafka topic is permitted to hold a user’s full address, the owner of the database to which we’re streaming the data is not, and thus will mask it out at egress:
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day5-person-01/config \
-d '{
"connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url" : "jdbc:mysql://mysql:3306/demo",
"connection.user" : "mysqluser",
"connection.password" : "mysqlpw",
"topics" : "day5-01-person",
"tasks.max" : "4",
"auto.create" : "true",
"auto.evolve" : "true",
"transforms" : "maskAddress",
"transforms.maskAddress.type" : "org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.maskAddress.fields" : "fullAddress",
"transforms.maskAddress.replacement" : "[❌redacted❌]"
}'
Now if we launch MySQL
docker exec -it mysql bash -c 'mysql -u root -p$MYSQL_ROOT_PASSWORD demo'
and check out the table we can see we have the full schema:
mysql> describe `day5-01-person`;
+-------------+------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+-------------+------+------+-----+---------+-------+
| fullAddress | text | YES | | NULL | |
| phone_num | text | YES | | NULL | |
| cc_exp | text | YES | | NULL | |
| lastName | text | YES | | NULL | |
| cc_num | text | YES | | NULL | |
| firstName | text | YES | | NULL | |
+-------------+------+------+-----+---------+-------+
6 rows in set (0.00 sec)
and the data is as it should be, with the fullAddress
masked:
mysql> select * from `day5-01-person` LIMIT 5;
+--------------+-----------------------+----------+---------------+----------+-------------+
| fullAddress | phone_num | cc_exp | lastName | cc_num | firstName |
+--------------+-----------------------+----------+---------------+----------+-------------+
| [?redacted?] | 383-349-1787 x792 | <masked> | Koss | <masked> | Courtney |
| [?redacted?] | 989-731-8207 | <masked> | Hand | <masked> | Trudy |
| [?redacted?] | 809.775.2196 x66858 | <masked> | Pollich | <masked> | Jame |
| [?redacted?] | 1-788-930-4978 | <masked> | Boyle | <masked> | Claud |
| [?redacted?] | (599) 706-0041 x0995 | <masked> | Stark | <masked> | Gertie |
+--------------+-----------------------+----------+---------------+----------+-------------+
5 rows in set (0.00 sec)
(maybe the ❌ emojis were just a step too far ;)
)