This demo automatically deploys the topology of services as defined in the Debezium Tutorial.
- Debezium Tutorial
# Start the topology as defined in https://debezium.io/docs/tutorial/
export DEBEZIUM_VERSION=1.1
docker-compose -f docker-compose-mysql.yaml up
# Start MySQL connector
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-mysql.json
# Consume messages from a Debezium topic
docker-compose -f docker-compose-mysql.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
--bootstrap-server kafka:9092 \
--from-beginning \
--property print.key=true \
--topic dbserver1.inventory.customers
# Modify records in the database via MySQL client
docker-compose -f docker-compose-mysql.yaml exec mysql bash -c 'mysql -u $MYSQL_USER -p$MYSQL_PASSWORD inventory'
# Shut down the cluster
docker-compose -f docker-compose-mysql.yaml down
To use Avro-style messages instead of JSON, Avro can be configured one of two ways, in the Kafka Connect worker configuration or in the connector configuration. Using Avro in conjunction with the schema registry allows for much more compact messages.
Configuring Avro at the Kafka Connect worker involves using the same steps above for MySQL but instead using the docker-compose-mysql-avro-worker.yaml configuration file instead. The Compose file configures the Connect service to use the Avro (de-)serializers for the Connect instance and starts one more additional service, the Confluent schema registry.
Configuring Avro at the Debezium Connector involves specifying the converter and schema registry as a part of the connectors configuration. To do this, follow the same steps above for MySQL but instead using the docker-compose-mysql-avro-connector.yaml and register-mysql-avro.json configuration files. The Compose file configures the Connect service to use the default (de-)serializers for the Connect instance and starts one additional service, the Confluent schema registry. The connector configuration file configures the connector but explicitly sets the (de-)serializers for the connector to use Avro and specifies the location of the schema registry.
You can access the first version of the schema for customers
values like so:
curl -X GET http://localhost:8081/subjects/dbserver1.inventory.customers-value/versions/1
Or, if you have the jq
utility installed, you can get a formatted output like this:
curl -X GET http://localhost:8081/subjects/dbserver1.inventory.customers-value/versions/1 | jq '.schema | fromjson'
If you alter the structure of the customers
table in the database and trigger another change event,
a new version of that schema will be available in the registry.
The service registry also comes with a console consumer that can read the Avro messages:
docker-compose -f docker-compose-mysql-avro-worker.yaml exec schema-registry /usr/bin/kafka-avro-console-consumer \
--bootstrap-server kafka:9092 \
--from-beginning \
--property print.key=true \
--property schema.registry.url=http://schema-registry:8081 \
--topic dbserver1.inventory.customers
Apicurio Registry is an open-source API and schema registry that amongst other things can be used to store schemas of Kafka records. It provides
- its own native Avro converter and Protobuf serializer
- a JSON converter that exports its schema into the registry
- a compatibility layer with other schema registries such as IBM's or Confluent's; it can be used with the Confluent Avro converter.
For the Apicurio examples we will use the following deployment topology:
Configuring JSON converter with externalized schema at the Debezium Connector involves specifying the converter and schema registry as a part of the connectors configuration. To do this, follow the same steps above for MySQL but instead using the docker-compose-mysql-apicurio.yaml and register-mysql-apicurio-converter-json.json configuration files. The Compose file configures the Connect service to use the default (de-)serializers for the Connect instance and starts one additional service, the Apicurio Registry. The connector configuration file configures the connector but explicitly sets the (de-)serializers for the connector to use Avro and specifies the location of the Apicurio registry.
You can access the first version of the schema for customers
values like so:
curl -X GET http://localhost:8080/artifacts/dbserver1.inventory.customers-value
Or, if you have the jq
utility installed, you can get a formatted output like this:
curl -X GET http://localhost:8080/artifacts/dbserver1.inventory.customers-value | jq .
If you alter the structure of the customers
table in the database and trigger another change event,
a new version of that schema will be available in the registry.
You can consume the JSON messages in the same way as with standard JSON converter
docker-compose -f docker-compose-mysql-apicurio.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
--bootstrap-server kafka:9092 \
--from-beginning \
--property print.key=true \
--topic dbserver1.inventory.customers
When you look at the data message you will notice that it contains only payload
but not schema
part as this is externalized into the registry.
Configuring Avro at the Debezium Connector involves specifying the converter and schema registry as a part of the connectors configuration. To do this, follow the same steps above for MySQL but instead using the docker-compose-mysql-apicurio.yaml and register-mysql-apicurio-converter-avro.json configuration files. The Compose file configures the Connect service to use the default (de-)serializers for the Connect instance and starts one additional service, the Apicurio Registry. The connector configuration file configures the connector but explicitly sets the (de-)serializers for the connector to use Avro and specifies the location of the Apicurio registry.
You can access the first version of the schema for customers
values like so:
curl -X GET http://localhost:8080/artifacts/dbserver1.inventory.customers-value
Or, if you have the jq
utility installed, you can get a formatted output like this:
curl -X GET http://localhost:8080/artifacts/dbserver1.inventory.customers-value | jq .
If you alter the structure of the customers
table in the database and trigger another change event,
a new version of that schema will be available in the registry.
Configuring Avro at the Debezium Connector involves specifying the converter and schema registry as a part of the connectors configuration. To do this, follow the same steps above for MySQL but instead using the docker-compose-mysql-apicurio.yaml and register-mysql-apicurio.json configuration files. The Compose file configures the Connect service to use the default (de-)serializers for the Connect instance and starts one additional service, the Apicurio Registry. The connector configuration file configures the connector but explicitly sets the (de-)serializers for the connector to use Avro and specifies the location of the Apicurio registry.
You can access the first version of the schema for customers
values like so:
curl -X GET http://localhost:8080/ccompat/subjects/dbserver1.inventory.customers-value/versions/1
Or, if you have the jq
utility installed, you can get a formatted output like this:
curl -X GET http://localhost:8080/ccompat/subjects/dbserver1.inventory.customers-value/versions/1 | jq '.schema | fromjson'
If you alter the structure of the customers
table in the database and trigger another change event,
a new version of that schema will be available in the registry.
To consume the Avro messages It is possible to use kafkacat
tool:
docker run --rm --tty \
--network tutorial_default \
debezium/tooling \
kafkacat -b kafka:9092 -C -o beginning -q -s value=avro -r http://apicurio:8080/ccompat \
-t dbserver1.inventory.customers | jq .
# Start the topology as defined in https://debezium.io/docs/tutorial/
export DEBEZIUM_VERSION=1.1
docker-compose -f docker-compose-postgres.yaml up
# Start Postgres connector
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-postgres.json
# Consume messages from a Debezium topic
docker-compose -f docker-compose-postgres.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
--bootstrap-server kafka:9092 \
--from-beginning \
--property print.key=true \
--topic dbserver1.inventory.customers
# Modify records in the database via Postgres client
docker-compose -f docker-compose-postgres.yaml exec postgres env PGOPTIONS="--search_path=inventory" bash -c 'psql -U $POSTGRES_USER postgres'
# Shut down the cluster
docker-compose -f docker-compose-postgres.yaml down
# Start the topology as defined in https://debezium.io/docs/tutorial/
export DEBEZIUM_VERSION=1.1
docker-compose -f docker-compose-mongodb.yaml up
# Initialize MongoDB replica set and insert some test data
docker-compose -f docker-compose-mongodb.yaml exec mongodb bash -c '/usr/local/bin/init-inventory.sh'
# Start MongoDB connector
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-mongodb.json
# Consume messages from a Debezium topic
docker-compose -f docker-compose-mongodb.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
--bootstrap-server kafka:9092 \
--from-beginning \
--property print.key=true \
--topic dbserver1.inventory.customers
# Modify records in the database via MongoDB client
docker-compose -f docker-compose-mongodb.yaml exec mongodb bash -c 'mongo -u $MONGODB_USER -p $MONGODB_PASSWORD --authenticationDatabase admin inventory'
db.customers.insert([
{ _id : NumberLong("1005"), first_name : 'Bob', last_name : 'Hopper', email : '[email protected]', unique_id : UUID() }
]);
# Shut down the cluster
docker-compose -f docker-compose-mongodb.yaml down
This assumes Oracle is running on localhost (or is reachable there, e.g. by means of running it within a VM or Docker container with appropriate port configurations) and set up with the configuration, users and grants described in the Debezium Vagrant set-up. Note that the connector is using the XStream API, which requires a license for the Golden Gate product (which itself is not required be installed, though).
Also you must download the Oracle instant client for Linux and put it under the directory debezium-with-oracle-jdbc/oracle_instantclient.
# Start the topology as defined in https://debezium.io/docs/tutorial/
export DEBEZIUM_VERSION=1.1
docker-compose -f docker-compose-oracle.yaml up --build
# Insert test data
cat debezium-with-oracle-jdbc/init/inventory.sql | docker exec -i dbz_oracle sqlplus debezium/dbz@//localhost:1521/ORCLPDB1
Adjust the host name of the database server and the name of the XStream outbound server in register-oracle.json
as per your environment.
Then register the Debezium Oracle connector:
# Start Oracle connector
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-oracle.json
# Create a test change record
echo "INSERT INTO customers VALUES (NULL, 'John', 'Doe', '[email protected]');" | docker exec -i dbz_oracle sqlplus debezium/dbz@//localhost:1521/ORCLPDB1
# Consume messages from a Debezium topic
docker-compose -f docker-compose-oracle.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
--bootstrap-server kafka:9092 \
--from-beginning \
--property print.key=true \
--topic server1.DEBEZIUM.CUSTOMERS
# Modify other records in the database via Oracle client
docker exec -i dbz_oracle sqlplus debezium/dbz@//localhost:1521/ORCLPDB1
# Shut down the cluster
docker-compose -f docker-compose-oracle.yaml down
# Start the topology as defined in https://debezium.io/docs/tutorial/
export DEBEZIUM_VERSION=1.1
docker-compose -f docker-compose-sqlserver.yaml up
# Initialize database and insert test data
cat debezium-sqlserver-init/inventory.sql | docker exec -i tutorial_sqlserver_1 bash -c '/opt/mssql-tools/bin/sqlcmd -U sa -P $SA_PASSWORD'
# Start SQL Server connector
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-sqlserver.json
# Consume messages from a Debezium topic
docker-compose -f docker-compose-sqlserver.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
--bootstrap-server kafka:9092 \
--from-beginning \
--property print.key=true \
--topic server1.dbo.customers
# Modify records in the database via SQL Server client (do not forget to add `GO` command to execute the statement)
docker-compose -f docker-compose-sqlserver.yaml exec sqlserver bash -c '/opt/mssql-tools/bin/sqlcmd -U sa -P $SA_PASSWORD -d testDB'
# Shut down the cluster
docker-compose -f docker-compose-sqlserver.yaml down
# Start the topology as defined in https://debezium.io/docs/tutorial/
export DEBEZIUM_VERSION=1.1
docker-compose -f docker-compose-db2.yaml up --build
# Start DB2 connector
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-db2.json
# Consume messages from a Debezium topic
docker-compose -f docker-compose-db2.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
--bootstrap-server kafka:9092 \
--from-beginning \
--property print.key=true \
--topic db2server.DB2INST1.CUSTOMERS
# Modify records in the database via DB2 client
docker-compose -f docker-compose-db2.yaml exec db2server bash -c 'su - db2inst1'
db2 connect to TESTDB
db2 "INSERT INTO DB2INST1.CUSTOMERS(first_name, last_name, email) VALUES ('John', 'Doe', '[email protected]');"
# Shut down the cluster
docker-compose -f docker-compose-db2.yaml down
Kafka Connect allows externalization of secrets into a separate configuration repository. The configuration is done at both worker and connector level.
# Start the topology as defined in https://debezium.io/docs/tutorial/
export DEBEZIUM_VERSION=1.1
docker-compose -f docker-compose-mysql-ext-secrets.yml up
# Start MySQL connector
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-mysql-ext-secrets.json
# Check plugin configuration to see that secrets are not visible
curl -s http://localhost:8083/connectors/inventory-connector/config | jq .
# Shut down the cluster
docker-compose -f docker-compose-mysql-ext-secrets.yml down
Should you need to establish a remote debugging session into a deployed connector, add the following to the environment
section of the connect
in the Compose file service:
- KAFKA_DEBUG=true
- DEBUG_SUSPEND_FLAG=n
- JAVA_DEBUG_PORT=*:5005
Also expose the debugging port 5005 under ports
:
- 5005:5005
You can then establish a remote debugging session from your IDE on localhost:5005.