Skip to content

Commit

Permalink
MongoDB: Use improved decoding machinery also for MongoDBCDCTranslator
Browse files Browse the repository at this point in the history
  • Loading branch information
amotl committed Sep 18, 2024
1 parent e6095fc commit 116f0c4
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 17 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
- Zyp: Added software test and documentation about flattening lists
- MongoDB: Use `bson` package to parse BSON CANONICAL representation
- MongoDB: Complete and verify BSON data type mapping end-to-end
- MongoDB: Use improved decoding machinery also for `MongoDBCDCTranslator`

## 2024/09/10 v0.0.15
- Added Zyp Treatments, a slightly tailored transformation subsystem
Expand Down
14 changes: 6 additions & 8 deletions src/commons_codec/transform/mongodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,9 @@ class MongoDBTranslatorBase:
# Define name of the column where CDC's record data will get materialized into.
DATA_COLUMN = "data"

def __init__(self, table_name: str):
super().__init__()
def __init__(self, table_name: str, converter: t.Union[MongoDBCrateDBConverter, None] = None):
self.table_name = quote_relation_name(table_name)
self.converter = converter or MongoDBCrateDBConverter()

@property
def sql_ddl(self):
Expand Down Expand Up @@ -193,10 +193,6 @@ class MongoDBFullLoadTranslator(MongoDBTranslatorBase):
Translate a MongoDB document into a CrateDB document.
"""

def __init__(self, table_name: str, converter: MongoDBCrateDBConverter):
super().__init__(table_name=table_name)
self.converter = converter

@staticmethod
def get_document_key(record: t.Mapping[str, t.Any]) -> str:
"""
Expand Down Expand Up @@ -270,15 +266,17 @@ def to_sql(self, event: t.Dict[str, t.Any]) -> t.Union[SQLOperation, None]:

if operation_type == "insert":
oid: str = self.get_document_key(event)
record = self.decode_bson(self.get_full_document(event))
document = self.get_full_document(event)
record = self.converter.decode_document(self.decode_bson(document))
sql = f"INSERT INTO {self.table_name} " f"({self.ID_COLUMN}, {self.DATA_COLUMN}) " "VALUES (:oid, :record);"
parameters = {"oid": oid, "record": record}

# In order to use "full document" representations from "update" events,
# you need to use `watch(full_document="updateLookup")`.
# https://www.mongodb.com/docs/manual/changeStreams/#lookup-full-document-for-update-operations
elif operation_type in ["update", "replace"]:
record = self.decode_bson(self.get_full_document(event))
document = self.get_full_document(event)
record = self.converter.decode_document(self.decode_bson(document))
where_clause = self.where_clause(event)
sql = f"UPDATE {self.table_name} " f"SET {self.DATA_COLUMN} = :record " f"WHERE {where_clause};"
parameters = {"record": record}
Expand Down
10 changes: 5 additions & 5 deletions tests/transform/mongodb/test_mongodb_cdc.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,10 @@ def test_decode_cdc_insert():
parameters={
"oid": "669683c2b0750b2c84893f3e",
"record": {
"_id": {"$oid": "669683c2b0750b2c84893f3e"},
"_id": "669683c2b0750b2c84893f3e",
"id": "5F9E",
"data": {"temperature": 42.42, "humidity": 84.84},
"meta": {"timestamp": {"$date": "2024-07-11T23:17:42Z"}, "device": "foo"},
"meta": {"timestamp": 1720739862000, "device": "foo"},
},
},
)
Expand All @@ -140,10 +140,10 @@ def test_decode_cdc_update():
statement="UPDATE foo SET data = :record WHERE oid = '669683c2b0750b2c84893f3e';",
parameters={
"record": {
"_id": {"$oid": "669683c2b0750b2c84893f3e"},
"_id": "669683c2b0750b2c84893f3e",
"id": "5F9E",
"data": {"temperature": 42.5},
"meta": {"timestamp": {"$date": "2024-07-11T23:17:42Z"}, "device": "foo"},
"meta": {"timestamp": 1720739862000, "device": "foo"},
}
},
)
Expand All @@ -152,7 +152,7 @@ def test_decode_cdc_update():
def test_decode_cdc_replace():
assert MongoDBCDCTranslator(table_name="foo").to_sql(MSG_REPLACE) == SQLOperation(
statement="UPDATE foo SET data = :record WHERE oid = '669683c2b0750b2c84893f3e';",
parameters={"record": {"_id": {"$oid": "669683c2b0750b2c84893f3e"}, "tags": ["deleted"]}},
parameters={"record": {"_id": "669683c2b0750b2c84893f3e", "tags": ["deleted"]}},
)


Expand Down
8 changes: 4 additions & 4 deletions tests/transform/mongodb/test_mongodb_full.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
import pytest

from commons_codec.model import SQLOperation
from commons_codec.transform.mongodb import MongoDBCrateDBConverter, MongoDBFullLoadTranslator
from commons_codec.transform.mongodb import MongoDBFullLoadTranslator
from tests.transform.mongodb.data import RECORD_IN_ALL_TYPES, RECORD_OUT_ALL_TYPES


def test_sql_ddl():
translator = MongoDBFullLoadTranslator(table_name="foo", converter=MongoDBCrateDBConverter())
translator = MongoDBFullLoadTranslator(table_name="foo")
assert translator.sql_ddl == "CREATE TABLE IF NOT EXISTS foo (oid TEXT, data OBJECT(DYNAMIC));"


def test_to_sql_operation():
"""
Verify outcome of `MongoDBFullLoadTranslator.to_sql` operation.
"""
translator = MongoDBFullLoadTranslator(table_name="foo", converter=MongoDBCrateDBConverter())
translator = MongoDBFullLoadTranslator(table_name="foo")
assert translator.to_sql([RECORD_IN_ALL_TYPES]) == SQLOperation(
statement="INSERT INTO foo (oid, data) VALUES (:oid, :record);",
parameters=[{"oid": "56027fcae4b09385a85f9344", "record": RECORD_OUT_ALL_TYPES}],
Expand All @@ -28,7 +28,7 @@ def test_to_sql_cratedb(caplog, cratedb):
"""

# Compute CrateDB operation (SQL+parameters) from MongoDB document.
translator = MongoDBFullLoadTranslator(table_name="from.mongodb", converter=MongoDBCrateDBConverter())
translator = MongoDBFullLoadTranslator(table_name="from.mongodb")
operation = translator.to_sql(RECORD_IN_ALL_TYPES)

# Insert into CrateDB.
Expand Down

0 comments on commit 116f0c4

Please sign in to comment.