-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add transformer for MongoDB CDC to CrateDB SQL conversion
- Loading branch information
Showing
8 changed files
with
628 additions
and
6 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,85 @@ | ||
# Relay MongoDB Change Stream into CrateDB table | ||
|
||
## Introduction | ||
|
||
> Change streams allow applications to access real-time data changes without the prior | ||
> complexity and risk of manually tailing the oplog. Applications can use change streams | ||
> to subscribe to all data changes on a single collection, a database, or an entire | ||
> deployment, and immediately react to them. | ||
> | ||
> - https://www.mongodb.com/docs/manual/changeStreams/ | ||
> - https://www.mongodb.com/developer/languages/python/python-change-streams/ | ||
## Services | ||
|
||
### CrateDB | ||
Start CrateDB. | ||
```shell | ||
docker run --rm -it --name=cratedb --publish=4200:4200 --env=CRATE_HEAP_SIZE=2g \ | ||
crate:5.7 -Cdiscovery.type=single-node | ||
``` | ||
|
||
### MongoDB | ||
Start MongoDB. | ||
```shell | ||
docker run -it --rm --name=mongodb --publish=27017:27017 \ | ||
mongo:7 mongod --replSet rs-testdrive | ||
``` | ||
|
||
Please note that change streams are only available for replica sets and | ||
sharded clusters, so let's initialize the replica set defined with | ||
`--replSet rs-testdrive`, by using the `mongosh` command to invoke | ||
the `rs.initiate()` operation. | ||
|
||
```shell | ||
export MONGODB_URL="mongodb://localhost/" | ||
docker run -i --rm --network=host mongo:7 mongosh ${MONGODB_URL} <<EOF | ||
config = { | ||
_id: "rs-testdrive", | ||
members: [{ _id : 0, host : "localhost:27017"}] | ||
}; | ||
rs.initiate(config); | ||
EOF | ||
``` | ||
|
||
|
||
## Install | ||
Acquire and set up the relay program. | ||
```shell | ||
wget https://github.com/daq-tools/commons-codec/raw/main/examples/mongodb_cdc_cratedb.py | ||
pip install commons-codec pymongo sqlalchemy-cratedb | ||
``` | ||
|
||
|
||
## Usage | ||
|
||
Configure settings. | ||
```shell | ||
export CRATEDB_SQLALCHEMY_URL="crate://" | ||
export MONGODB_URL="mongodb://localhost/" | ||
``` | ||
|
||
Invoke relay program. | ||
```shell | ||
python mongodb_cdc_cratedb.py cdc-relay | ||
``` | ||
|
||
Invoke database workload. | ||
```shell | ||
python mongodb_cdc_cratedb.py db-workload | ||
``` | ||
|
||
|
||
## Troubleshooting | ||
|
||
```text | ||
pymongo.errors.OperationFailure: The $changeStream stage is only supported on | ||
replica sets, full error: {'ok': 0.0, 'errmsg': 'The $changeStream stage is | ||
only supported on replica sets', 'code': 40573, 'codeName': 'Location40573'} | ||
``` | ||
|
||
Failed to refresh key cache | ||
- https://stackoverflow.com/questions/70518350/mongodb-replicaset-failed-to-refresh-key-cache | ||
- https://www.mongodb.com/community/forums/t/how-to-recover-mongodb-from-failed-to-refresh-key-cache/239079 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,106 @@ | ||
""" | ||
Basic example relaying a MongoDB Change Stream into CrateDB table. | ||
Documentation: | ||
- https://github.com/daq-tools/commons-codec/blob/main/doc/mongodb.md | ||
- https://www.mongodb.com/docs/manual/changeStreams/ | ||
- https://www.mongodb.com/developer/languages/python/python-change-streams/ | ||
""" | ||
|
||
import datetime as dt | ||
import os | ||
import sys | ||
|
||
import pymongo | ||
import sqlalchemy as sa | ||
from commons_codec.transform.mongodb import MongoDBCDCTranslatorCrateDB | ||
|
||
|
||
class MiniRelay: | ||
""" | ||
Relay MongoDB Change Stream into CrateDB table, and provide basic example workload generator. | ||
""" | ||
|
||
def __init__( | ||
self, | ||
mongodb_url: str, | ||
mongodb_database: str, | ||
mongodb_collection: str, | ||
cratedb_sqlalchemy_url: str, | ||
cratedb_table: str, | ||
): | ||
self.cratedb_client = sa.create_engine(cratedb_sqlalchemy_url, echo=True) | ||
self.mongodb_client = pymongo.MongoClient(mongodb_url) | ||
self.mongodb_collection = self.mongodb_client[mongodb_database][mongodb_collection] | ||
self.table_name = cratedb_table | ||
self.cdc = MongoDBCDCTranslatorCrateDB(table_name=self.table_name) | ||
|
||
def start(self): | ||
""" | ||
Subscribe to change stream events, convert to SQL, and submit to CrateDB. | ||
""" | ||
with self.cratedb_client.connect() as connection: | ||
connection.execute(sa.text(self.cdc.sql_ddl)) | ||
for sql in self.cdc_to_sql(): | ||
if sql: | ||
connection.execute(sa.text(sql)) | ||
connection.execute(sa.text(f'REFRESH TABLE "{self.table_name}";')) | ||
|
||
def cdc_to_sql(self): | ||
""" | ||
Subscribe to change stream events, and emit corresponding SQL statements. | ||
""" | ||
# Note that `.watch()` will block until events are ready for consumption, so | ||
# this is not a busy loop. Also note that the routine doesn't perform any sensible | ||
# error handling yet. | ||
while True: | ||
with self.mongodb_collection.watch(full_document="updateLookup") as change_stream: | ||
for change in change_stream: | ||
print("MongoDB Change Stream event:", change, file=sys.stderr) | ||
yield self.cdc.to_sql(change) | ||
|
||
def db_workload(self): | ||
""" | ||
Run insert_one, update_one, and delete_one operations to generate a very basic workload. | ||
""" | ||
example_record = { | ||
"id": "5F9E", | ||
"data": {"temperature": 42.42, "humidity": 84.84}, | ||
"meta": {"timestamp": dt.datetime.fromisoformat("2024-07-12T01:17:42+02:00"), "device": "foo"}, | ||
} | ||
|
||
print(self.mongodb_collection.insert_one(example_record)) | ||
print(self.mongodb_collection.update_one({"id": "5F9E"}, {"$set": {"data": {"temperature": 42.50}}})) | ||
|
||
# TODO: Investigate: When applying the "replace" operation, subsequent "delete" operations | ||
# will not be reported to the change stream any longer. Is it a bug? | ||
# print(self.mongodb_collection.replace_one({"id": "5F9E"}, {"tags": ["deleted"]})) | ||
|
||
print(self.mongodb_collection.delete_one({"id": "5F9E"})) | ||
|
||
# Drop operations are ignored anyway. | ||
# print(self.mongodb_collection.drop()) | ||
|
||
|
||
if __name__ == "__main__": | ||
# Decode subcommand from command line argument. | ||
if len(sys.argv) < 2: | ||
raise ValueError("Subcommand missing. Accepted subcommands: subscribe, workload") | ||
subcommand = sys.argv[1] | ||
|
||
# Configure machinery. | ||
relay = MiniRelay( | ||
mongodb_url=os.environ["MONGODB_URL"], | ||
mongodb_database="testdrive", | ||
mongodb_collection="data", | ||
cratedb_sqlalchemy_url=os.environ["CRATEDB_SQLALCHEMY_URL"], | ||
cratedb_table="cdc-testdrive", | ||
) | ||
|
||
# Invoke machinery. | ||
if subcommand == "cdc-relay": | ||
relay.start() | ||
elif subcommand == "db-workload": | ||
relay.db_workload() | ||
else: | ||
raise ValueError("Accepted subcommands: subscribe, workload") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.