diff --git a/CHANGES.md b/CHANGES.md index edfc188..d2dd940 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -6,6 +6,7 @@ - Zyp: Added software test and documentation about flattening lists - MongoDB: Use `bson` package to parse BSON CANONICAL representation - MongoDB: Complete and verify BSON data type mapping end-to-end +- MongoDB: Use improved decoding machinery also for `MongoDBCDCTranslator` ## 2024/09/10 v0.0.15 - Added Zyp Treatments, a slightly tailored transformation subsystem diff --git a/src/commons_codec/transform/mongodb.py b/src/commons_codec/transform/mongodb.py index 1498f3a..51fcd50 100644 --- a/src/commons_codec/transform/mongodb.py +++ b/src/commons_codec/transform/mongodb.py @@ -142,9 +142,9 @@ class MongoDBTranslatorBase: # Define name of the column where CDC's record data will get materialized into. DATA_COLUMN = "data" - def __init__(self, table_name: str): - super().__init__() + def __init__(self, table_name: str, converter: t.Union[MongoDBCrateDBConverter, None] = None): self.table_name = quote_relation_name(table_name) + self.converter = converter or MongoDBCrateDBConverter() @property def sql_ddl(self): @@ -193,10 +193,6 @@ class MongoDBFullLoadTranslator(MongoDBTranslatorBase): Translate a MongoDB document into a CrateDB document. """ - def __init__(self, table_name: str, converter: MongoDBCrateDBConverter): - super().__init__(table_name=table_name) - self.converter = converter - @staticmethod def get_document_key(record: t.Mapping[str, t.Any]) -> str: """ @@ -270,7 +266,8 @@ def to_sql(self, event: t.Dict[str, t.Any]) -> t.Union[SQLOperation, None]: if operation_type == "insert": oid: str = self.get_document_key(event) - record = self.decode_bson(self.get_full_document(event)) + document = self.get_full_document(event) + record = self.converter.decode_document(self.decode_bson(document)) sql = f"INSERT INTO {self.table_name} " f"({self.ID_COLUMN}, {self.DATA_COLUMN}) " "VALUES (:oid, :record);" parameters = {"oid": oid, "record": record} @@ -278,7 +275,8 @@ def to_sql(self, event: t.Dict[str, t.Any]) -> t.Union[SQLOperation, None]: # you need to use `watch(full_document="updateLookup")`. # https://www.mongodb.com/docs/manual/changeStreams/#lookup-full-document-for-update-operations elif operation_type in ["update", "replace"]: - record = self.decode_bson(self.get_full_document(event)) + document = self.get_full_document(event) + record = self.converter.decode_document(self.decode_bson(document)) where_clause = self.where_clause(event) sql = f"UPDATE {self.table_name} " f"SET {self.DATA_COLUMN} = :record " f"WHERE {where_clause};" parameters = {"record": record} diff --git a/tests/transform/mongodb/test_mongodb_cdc.py b/tests/transform/mongodb/test_mongodb_cdc.py index ff466af..9baf22b 100644 --- a/tests/transform/mongodb/test_mongodb_cdc.py +++ b/tests/transform/mongodb/test_mongodb_cdc.py @@ -126,10 +126,10 @@ def test_decode_cdc_insert(): parameters={ "oid": "669683c2b0750b2c84893f3e", "record": { - "_id": {"$oid": "669683c2b0750b2c84893f3e"}, + "_id": "669683c2b0750b2c84893f3e", "id": "5F9E", "data": {"temperature": 42.42, "humidity": 84.84}, - "meta": {"timestamp": {"$date": "2024-07-11T23:17:42Z"}, "device": "foo"}, + "meta": {"timestamp": 1720739862000, "device": "foo"}, }, }, ) @@ -140,10 +140,10 @@ def test_decode_cdc_update(): statement="UPDATE foo SET data = :record WHERE oid = '669683c2b0750b2c84893f3e';", parameters={ "record": { - "_id": {"$oid": "669683c2b0750b2c84893f3e"}, + "_id": "669683c2b0750b2c84893f3e", "id": "5F9E", "data": {"temperature": 42.5}, - "meta": {"timestamp": {"$date": "2024-07-11T23:17:42Z"}, "device": "foo"}, + "meta": {"timestamp": 1720739862000, "device": "foo"}, } }, ) @@ -152,7 +152,7 @@ def test_decode_cdc_update(): def test_decode_cdc_replace(): assert MongoDBCDCTranslator(table_name="foo").to_sql(MSG_REPLACE) == SQLOperation( statement="UPDATE foo SET data = :record WHERE oid = '669683c2b0750b2c84893f3e';", - parameters={"record": {"_id": {"$oid": "669683c2b0750b2c84893f3e"}, "tags": ["deleted"]}}, + parameters={"record": {"_id": "669683c2b0750b2c84893f3e", "tags": ["deleted"]}}, ) diff --git a/tests/transform/mongodb/test_mongodb_full.py b/tests/transform/mongodb/test_mongodb_full.py index 1fb30bb..dfbd9a5 100644 --- a/tests/transform/mongodb/test_mongodb_full.py +++ b/tests/transform/mongodb/test_mongodb_full.py @@ -1,12 +1,12 @@ import pytest from commons_codec.model import SQLOperation -from commons_codec.transform.mongodb import MongoDBCrateDBConverter, MongoDBFullLoadTranslator +from commons_codec.transform.mongodb import MongoDBFullLoadTranslator from tests.transform.mongodb.data import RECORD_IN_ALL_TYPES, RECORD_OUT_ALL_TYPES def test_sql_ddl(): - translator = MongoDBFullLoadTranslator(table_name="foo", converter=MongoDBCrateDBConverter()) + translator = MongoDBFullLoadTranslator(table_name="foo") assert translator.sql_ddl == "CREATE TABLE IF NOT EXISTS foo (oid TEXT, data OBJECT(DYNAMIC));" @@ -14,7 +14,7 @@ def test_to_sql_operation(): """ Verify outcome of `MongoDBFullLoadTranslator.to_sql` operation. """ - translator = MongoDBFullLoadTranslator(table_name="foo", converter=MongoDBCrateDBConverter()) + translator = MongoDBFullLoadTranslator(table_name="foo") assert translator.to_sql([RECORD_IN_ALL_TYPES]) == SQLOperation( statement="INSERT INTO foo (oid, data) VALUES (:oid, :record);", parameters=[{"oid": "56027fcae4b09385a85f9344", "record": RECORD_OUT_ALL_TYPES}], @@ -28,7 +28,7 @@ def test_to_sql_cratedb(caplog, cratedb): """ # Compute CrateDB operation (SQL+parameters) from MongoDB document. - translator = MongoDBFullLoadTranslator(table_name="from.mongodb", converter=MongoDBCrateDBConverter()) + translator = MongoDBFullLoadTranslator(table_name="from.mongodb") operation = translator.to_sql(RECORD_IN_ALL_TYPES) # Insert into CrateDB.