graph LR
A[MongoDB] -- Change Stream --> B[Kafka Connect Worker - MongoDB Source Connector] --> C[Kafka] --> D[Kafka Connect Worker - Neo4j Sink Connector] --> E[Neo4j]
This sample project demonstrates how an insert operation in MongoDB can be captured via Change Stream listened to by a Kafka Connect Worker which acts as a producer to a Kafka topic. Another Kafka Connect Worker listens to the Kafka topic acting as a consumer and writes the data to Neo4j.
Such a setup can be used where the primary database is MongoDB and for recommendation systems, graph databases like Neo4j are used.
Setup on local using docker-compose
docker compose up -d
Note: The volume mapping is commented in the docker-compose.yml
file. Uncomment it if you want to persist the data.
-
First
initial_data_load.py
script adds some initial data such as the users and the items to MongoDB. -
For Kafka Connect Worker as a MongoDB Source Connector, download the self hosted version, extract it and place the folder in
kafka-connect-jars
folder. https://www.confluent.io/hub/mongodb/kafka-connect-mongodb -
Publish the
mongodb_source_connector.json
configuration to Kafka Connect.curl -X POST -H "Content-Type: application/json" --data @mongodb_source_connector.json http://localhost:8083/connectors
-
Execute
create_orders.py
to create two orders in MongoDB. This will trigger the Change Stream and the data will be published to Kafka.
-
Run query in Neo4j and see that there is no data.
MATCH (n) RETURN n
-
For Kafka Connect Worker as a Neo4j Sink Connector, download the self hosted version, extract it and place the folder in
kafka-connect-jars
folder. https://www.confluent.io/hub/neo4j/kafka-connect-neo4j -
Publish the
neo4j_sink_connector.json
configuration to Kafka Connect.curl -X POST -H "Content-Type: application/json" --data @neo4j_sink_connector.json http://localhost:8083/connectors
-
Doing so, the kafka connect worker will consume the data from the Kafka topic and write it to Neo4j
-
Run query in Neo4j to see all the data.
MATCH (n) RETURN n