The goal of this project is to play with Kafka
, Debezium
and ksqlDB
. For this, we have: research-service
that inserts/updates/deletes records in MySQL
; Source Connectors
that monitor change of records in MySQL and push messages related to those changes to Kafka; Sink Connectors
and kafka-research-consumer
that listen messages from Kafka and insert/update documents in Elasticsearch
; finally, ksqlDB-Server
that listens some topics in Kafka, does some joins and pushes new messages to new topics in Kafka.
On ivangfr.github.io, I have compiled my Proof-of-Concepts (PoCs) and articles. You can easily search for the technology you are interested in by using the filter. Who knows, perhaps I have already implemented a PoC or written an article about what you are looking for.
- [Medium] Streaming MySQL changes to Elasticsearch using Kafka Connect
- [Medium] Enhancing a MySQL-KafkaConnect-Elasticsearch Setup with Spring Boot Applications
-
Monolithic
Spring Boot
application that exposes a REST API to manageInstitutes
,Articles
,Researchers
andReviews
. The data is saved inMySQL
. -
Spring Boot
application that listens messages from the topicreviews_researchers_institutes_articles
(that is one ofksqlDB
outputs) and save the payload of those messages (i.e, reviews with detailed information) inElasticsearch
.
-
Open a terminal and, inside
springboot-kafka-connect-debezium-ksqldb
root folder, run the following commanddocker compose up -d
Note: During the first run, an image for
mysql
andkafka-connect
will be built, whose names arespringboot-kafka-connect-debezium-ksqldb_mysql
andspringboot-kafka-connect-debezium-ksqldb_kafka-connect
, respectively. To rebuild those images rundocker compose build
-
Wait for all Docker containers to be up and running. To check it, run
docker compose ps
In order to have topics in Kafka
with more than 1
partition, we must create them manually and not wait for the connectors to create for us. So, for it
-
In a terminal, make sure you are in
springboot-kafka-connect-debezium-ksqldb
root folder -
Run the script below
./create-kafka-topics.sh
It will create the topics
mysql.researchdb.institutes
,mysql.researchdb.researchers
,mysql.researchdb.articles
andmysql.researchdb.reviews
with5
partitions.
-
In a terminal, make sure you are in
springboot-kafka-connect-debezium-ksqldb
root folder -
Run the following
curl
commands to create oneDebezium
and twoElasticsearch-Sink
connectors inkafka-connect
curl -i -X POST localhost:8083/connectors -H 'Content-Type: application/json' -d @connectors/debezium-mysql-source-researchdb.json curl -i -X POST localhost:8083/connectors -H 'Content-Type: application/json' -d @connectors/elasticsearch-sink-institutes.json curl -i -X POST localhost:8083/connectors -H 'Content-Type: application/json' -d @connectors/elasticsearch-sink-articles.json
-
You can check the state of the connectors and their tasks on
Kafka Connect UI
(http://localhost:8086) or callingkafka-connect
endpointcurl localhost:8083/connectors/debezium-mysql-source-researchdb/status curl localhost:8083/connectors/elasticsearch-sink-institutes/status curl localhost:8083/connectors/elasticsearch-sink-articles/status
-
The state of the connectors and their tasks must be
RUNNING
. If there is any problem, you can checkkafka-connect
container logs.docker logs kafka-connect
-
Open a new terminal and navigate to
springboot-kafka-connect-debezium-ksqldb
root folder -
Run the command below to start the application
./mvnw clean spring-boot:run --projects research-service -Dspring-boot.run.jvmArguments="-Dserver.port=9080"
It will create some articles, institutes and researchers. If you don't want it, just set to
false
the propertiesload-samples.articles.enabled
,load-samples.institutes.enabled
andload-samples.researchers.enabled
inapplication.yml
. -
The Swagger link is http://localhost:9080/swagger-ui.html
-
Important: create at least one
review
so thatmysql.researchdb.reviews-key
andmysql.researchdb.reviews-value
are created inSchema Registry
. Below there is a sample request to create a review.curl -i -X POST localhost:9080/api/reviews \ -H "Content-Type: application/json" \ -d "{ \"researcherId\": 1, \"articleId\": 1, \"comment\": \"Ln 56: replace the 'a' by 'an'\"}"
-
Open a new terminal and, inside
springboot-kafka-connect-debezium-ksqldb
root folder, run thedocker
command below to startksqlDB-cli
docker run -it --rm --name ksqldb-cli \ --network springboot-kafka-connect-debezium-ksqldb_default \ -v $PWD/docker/ksql/researchers-institutes.ksql:/tmp/researchers-institutes.ksql \ -v $PWD/docker/ksql/reviews-researchers-institutes-articles.ksql:/tmp/reviews-researchers-institutes-articles.ksql \ confluentinc/cp-ksqldb-cli:7.4.1 http://ksqldb-server:8088
-
On
ksqlDB-cli
command line, run the following commands-
Set
auto.offset.reset
valueSET 'auto.offset.reset' = 'earliest';
-
Run the following script. It will create
researchers_institutes
topicRUN SCRIPT '/tmp/researchers-institutes.ksql';
-
Check whether the topic was created
DESCRIBE "researchers_institutes"; SELECT * FROM "researchers_institutes" EMIT CHANGES LIMIT 5;
-
Run the script below. It will create
reviews_researchers_institutes_articles
topicRUN SCRIPT '/tmp/reviews-researchers-institutes-articles.ksql';
-
Check whether the topic was created
DESCRIBE "reviews_researchers_institutes_articles"; SELECT * FROM "reviews_researchers_institutes_articles" EMIT CHANGES LIMIT 1;
-
-
In a terminal, make sure you are in
springboot-kafka-connect-debezium-ksqldb
root folder -
Run the
curl
command below to createelasticsearch-sink-researchers
connector inkafka-connect
curl -i -X POST localhost:8083/connectors -H 'Content-Type: application/json' -d @connectors/elasticsearch-sink-researchers.json
-
You can check the state of the connector and its task on
Kafka Connect UI
(http://localhost:8086) or callingkafka-connect
endpointcurl localhost:8083/connectors/elasticsearch-sink-researchers/status
-
Open a new terminal and navigate to
springboot-kafka-connect-debezium-ksqldb
root folder -
Run the command below to start the application
./mvnw clean spring-boot:run --projects kafka-research-consumer -Dspring-boot.run.jvmArguments="-Dserver.port=9081"
The command below generates the Java class
ReviewMessage
from Avro file present insrc/main/resources/avro
./mvnw generate-sources --projects kafka-research-consumer
-
This service runs on port
9081
. Thehealth
endpoint is http://localhost:9081/actuator/health -
[Optional] We can start another
kafka-research-consumer
instance by opening another terminal and running./mvnw clean spring-boot:run --projects kafka-research-consumer -Dspring-boot.run.jvmArguments="-Dserver.port=9082"
-
Go to the terminal where
ksql-cli
is running -
On
ksql-cli
command line, run the following querySELECT * FROM "reviews_researchers_institutes_articles" EMIT CHANGES;
-
In another terminal, call the
research-service
simulation endpointcurl -X POST localhost:9080/api/simulation/reviews \ -H "Content-Type: application/json" \ -d "{\"total\": 100, \"sleep\": 100}"
-
The GIF below shows it.
research-service
is running in the upper left terminal;kafka-research-consumer
is running in the upper right terminal; the middle terminal is used to submit the POST request toresearch-service
. The lower terminal is whereksql-cli
is running. -
You can also query
Elasticsearch
curl "localhost:9200/reviews/_search?pretty"
-
Kafka Topics UI
Kafka Topics UI
can be accessed at http://localhost:8085 -
Kafka Connect UI
Kafka Connect UI
can be accessed at http://localhost:8086 -
Schema Registry UI
Schema Registry UI
can be accessed at http://localhost:8001 -
Schema Registry
You can use
curl
to check the subjects inSchema Registry
- Get the list of subjects
curl localhost:8081/subjects
- Get the latest version of the subject
mysql.researchdb.researchers-value
curl localhost:8081/subjects/mysql.researchdb.researchers-value/versions/latest
- Get the list of subjects
-
Kafka Manager
Kafka Manager
can be accessed at http://localhost:9000Configuration
- First, you must create a new cluster. Click on
Cluster
(dropdown on the header) and then onAdd Cluster
- Type the name of your cluster in
Cluster Name
field, for example:MyCluster
- Type
zookeeper:2181
inCluster Zookeeper Hosts
field - Enable checkbox
Poll consumer information (Not recommended for large # of consumers if ZK is used for offsets tracking on older Kafka versions)
- Click on
Save
button at the bottom of the page.
- First, you must create a new cluster. Click on
-
Elasticsearch
Elasticsearch
can be accessed at http://localhost:9200- Get all indices
curl "localhost:9200/_cat/indices?v"
- Search for documents
curl "localhost:9200/articles/_search?pretty" curl "localhost:9200/institutes/_search?pretty" curl "localhost:9200/researchers/_search?pretty" curl "localhost:9200/reviews/_search?pretty"
- Get all indices
-
MySQL
-
Access
MySQL monitor
docker exec -it -e MYSQL_PWD=secret mysql mysql -uroot --database researchdb
-
Inside the monitor, run the following
SELECT
commandSELECT a.id AS review_id, c.id AS article_id, c.title AS article_title, b.id AS reviewer_id, b.first_name, b.last_name, b.institute_id, a.comment \ FROM reviews a, researchers b, articles c \ WHERE a.researcher_id = b.id and a.article_id = c.id;
Type
exit
to leaveMySQL
terminal -
- Go to the terminals where
research-service
andkafka-research-consumer
are running and pressCtrl+C
to stop them - Go to the terminal where
ksql-cli
is running and pressCtrl+C
to stop theSELECT
; then, typeexit
- To stop and remove docker compose containers, network and volumes, go to a terminal and, inside
springboot-kafka-connect-debezium-ksqldb
root folder, run the command belowdocker compose down -v
To remove the Docker images created by this project, go to a terminal and, inside springboot-kafka-connect-debezium-ksqldb
root folder, run the following script
./remove-docker-images.sh
- Create ES indices dynamically and add an
alias
for them.