From 77731c33a10887e9219cf092224ab3f125002ead Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Wed, 18 Sep 2024 02:35:02 +0200 Subject: [PATCH 01/10] MongoDB: Add `MongoDBFullLoadTranslator` and `MongoDBCrateDBConverter` --- CHANGES.md | 1 + examples/mongodb_cdc_cratedb.py | 11 +- pyproject.toml | 1 + src/commons_codec/transform/mongodb.py | 184 +++++++++++++++--- src/zyp/model/base.py | 2 +- tests/transform/conftest.py | 1 + .../{test_mongodb.py => test_mongodb_cdc.py} | 22 +-- tests/transform/test_mongodb_convert.py | 158 +++++++++++++++ tests/transform/test_mongodb_data.py | 90 +++++++++ tests/transform/test_mongodb_full.py | 44 +++++ 10 files changed, 470 insertions(+), 44 deletions(-) rename tests/transform/{test_mongodb.py => test_mongodb_cdc.py} (86%) create mode 100644 tests/transform/test_mongodb_convert.py create mode 100644 tests/transform/test_mongodb_data.py create mode 100644 tests/transform/test_mongodb_full.py diff --git a/CHANGES.md b/CHANGES.md index edb1646..fb1ca83 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,6 +1,7 @@ # Changelog ## Unreleased +- MongoDB: Added `MongoDBFullLoadTranslator` and `MongoDBCrateDBConverter` ## 2024/09/10 v0.0.15 - Added Zyp Treatments, a slightly tailored transformation subsystem diff --git a/examples/mongodb_cdc_cratedb.py b/examples/mongodb_cdc_cratedb.py index effa17c..07d7a19 100644 --- a/examples/mongodb_cdc_cratedb.py +++ b/examples/mongodb_cdc_cratedb.py @@ -15,7 +15,7 @@ import sqlalchemy as sa from sqlalchemy_cratedb.support import quote_relation_name -from commons_codec.transform.mongodb import MongoDBCDCTranslatorCrateDB +from commons_codec.transform.mongodb import MongoDBCDCTranslator class MiniRelay: @@ -35,7 +35,7 @@ def __init__( self.mongodb_client = pymongo.MongoClient(mongodb_url) self.mongodb_collection = self.mongodb_client[mongodb_database][mongodb_collection] self.table_name = quote_relation_name(cratedb_table) - self.cdc = MongoDBCDCTranslatorCrateDB(table_name=self.table_name) + self.cdc = MongoDBCDCTranslator(table_name=self.table_name) def start(self): """ @@ -43,10 +43,9 @@ def start(self): """ with self.cratedb_client.connect() as connection: connection.execute(sa.text(self.cdc.sql_ddl)) - for sql in self.cdc_to_sql(): - if sql: - connection.execute(sa.text(sql)) - connection.execute(sa.text(f"REFRESH TABLE {self.table_name};")) + for operation in self.cdc_to_sql(): + connection.execute(sa.text(operation.statement), parameters=operation.parameters) + connection.execute(sa.text(f"REFRESH TABLE {self.table_name};")) def cdc_to_sql(self): """ diff --git a/pyproject.toml b/pyproject.toml index 3a62573..81b2054 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -107,6 +107,7 @@ dependencies = [ "attrs<25", "backports-strenum<1.3; python_version<'3.11'", "cattrs<25", + "python-dateutil<3", "simplejson<4", "sqlalchemy-cratedb>=0.39.0", "toolz<0.13", diff --git a/src/commons_codec/transform/mongodb.py b/src/commons_codec/transform/mongodb.py index 88298f1..744b2f5 100644 --- a/src/commons_codec/transform/mongodb.py +++ b/src/commons_codec/transform/mongodb.py @@ -1,20 +1,113 @@ -# Copyright (c) 2023-2024, The Kotori Developers and contributors. +# Copyright (c) 2021-2024, Crate.io Inc. # Distributed under the terms of the LGPLv3 license, see LICENSE. - -# ruff: noqa: S608 FIXME: Possible SQL injection vector through string-based query construction - +# ruff: noqa: S608 +import base64 +import calendar import logging import typing as t +from typing import Any, Iterable +from uuid import UUID +import dateutil.parser as dateparser +from attr import Factory +from attrs import define from bson.json_util import _json_convert +from pymongo.cursor import Cursor from sqlalchemy_cratedb.support import quote_relation_name from commons_codec.model import SQLOperation +from zyp.model.collection import CollectionTransformation logger = logging.getLogger(__name__) -class MongoDBCDCTranslatorBase: +Document = t.Mapping[str, t.Any] +DocumentCollection = t.List[Document] + + +def date_converter(value): + if isinstance(value, int): + return value + dt = dateparser.parse(value) + return calendar.timegm(dt.utctimetuple()) * 1000 + + +def timestamp_converter(value): + if len(str(value)) <= 10: + return value * 1000 + return value + + +type_converter = { + "date": date_converter, + "timestamp": timestamp_converter, + "undefined": lambda x: None, +} + + +@define +class MongoDBCrateDBConverter: + """ + Convert MongoDB Extended JSON to representation consumable by CrateDB. + + Extracted from cratedb-toolkit, earlier migr8. + """ + + transformation: CollectionTransformation = Factory(CollectionTransformation) + + def decode_documents(self, data: t.List[Document]) -> Iterable[dict[str, Any]]: + """ + Decode MongoDB Extended JSON, considering CrateDB specifics. + """ + return self.transformation.apply(map(self.extract_value, data)) + + def decode_document(self, data: Document) -> Document: + """ + Decode MongoDB Extended JSON, considering CrateDB specifics. + """ + return self.extract_value(data) + + def extract_value(self, value: t.Any, parent_type: t.Optional[str] = None) -> t.Any: + """ + Decode MongoDB Extended JSON. + + - https://www.mongodb.com/docs/manual/reference/mongodb-extended-json-v1/ + - https://www.mongodb.com/docs/manual/reference/mongodb-extended-json/ + """ + if isinstance(value, dict): + # Custom adjustments to compensate shape anomalies in source data. + self.apply_special_treatments(value) + if len(value) == 1: + if "$binary" in value and value["$binary"]["subType"] in ["03", "04"]: + decoded = str(UUID(bytes=base64.b64decode(value["$binary"]["base64"]))) + return self.extract_value(decoded, parent_type) + for k, v in value.items(): + if k.startswith("$"): + return self.extract_value(v, k.lstrip("$")) + return {k.lstrip("$"): self.extract_value(v, parent_type) for (k, v) in value.items()} + if isinstance(value, list): + return [self.extract_value(v, parent_type) for v in value] + if parent_type: + converter = type_converter.get(parent_type) + if converter: + return converter(value) + return value + + def apply_special_treatments(self, value: t.Any): + """ + Apply special treatments to value that can't be described otherwise up until now. + # Ignore certain items including anomalies that are not resolved, yet. + + TODO: Needs an integration test feeding two records instead of just one. + """ + + if self.transformation is None or self.transformation.treatment is None: + return None + + return self.transformation.treatment.apply(value) + + +class MongoDBTranslatorBase: """ Translate MongoDB CDC events into different representations. @@ -30,7 +123,27 @@ class MongoDBCDCTranslatorBase: - https://www.mongodb.com/developer/languages/python/python-change-streams/ """ - def decode_bson(self, item: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]: + # Define name of the column where MongoDB's OID for a document will be stored. + ID_COLUMN = "oid" + + # Define name of the column where CDC's record data will get materialized into. + DATA_COLUMN = "data" + + def __init__(self, table_name: str): + super().__init__() + self.table_name = quote_relation_name(table_name) + + @property + def sql_ddl(self): + """ + Define SQL DDL statement for creating table in CrateDB that stores re-materialized CDC events. + """ + return ( + f"CREATE TABLE IF NOT EXISTS {self.table_name} ({self.ID_COLUMN} TEXT, {self.DATA_COLUMN} OBJECT(DYNAMIC));" + ) + + @staticmethod + def decode_bson(item: t.Mapping[str, t.Any]) -> t.Mapping[str, t.Any]: """ Convert MongoDB Extended JSON to vanilla Python dictionary. @@ -62,7 +175,45 @@ def decode_bson(self, item: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]: return _json_convert(item) -class MongoDBCDCTranslatorCrateDB(MongoDBCDCTranslatorBase): +class MongoDBFullLoadTranslator(MongoDBTranslatorBase): + """ + Translate a MongoDB document into a CrateDB document. + """ + + def __init__(self, table_name: str, converter: MongoDBCrateDBConverter): + super().__init__(table_name=table_name) + self.converter = converter + + @staticmethod + def get_document_key(record: t.Mapping[str, t.Any]) -> str: + """ + Return value of document key (MongoDB document OID). + + "documentKey": {"_id": ObjectId("669683c2b0750b2c84893f3e")} + """ + return record["_id"] + + def to_sql(self, data: t.Union[Document, t.List[Document]]) -> SQLOperation: + """ + Produce CrateDB SQL INSERT batch operation from multiple MongoDB documents. + """ + if not isinstance(data, Cursor) and not isinstance(data, list): + data = [data] + + # Define SQL INSERT statement. + sql = f"INSERT INTO {self.table_name} ({self.ID_COLUMN}, {self.DATA_COLUMN}) VALUES (:oid, :record);" + + # Converge multiple MongoDB documents into SQL parameters for `executemany` operation. + parameters: t.List[Document] = [] + for document in data: + record = self.converter.decode_document(self.decode_bson(document)) + oid: str = self.get_document_key(record) + parameters.append({"oid": oid, "record": record}) + + return SQLOperation(sql, parameters) + + +class MongoDBCDCTranslator(MongoDBTranslatorBase): """ Translate MongoDB CDC events into CrateDB SQL statements that materialize them again. @@ -94,25 +245,6 @@ class MongoDBCDCTranslatorCrateDB(MongoDBCDCTranslatorBase): CREATE TABLE (oid TEXT, data OBJECT(DYNAMIC)); """ - # Define name of the column where MongoDB's OID for a document will be stored. - ID_COLUMN = "oid" - - # Define name of the column where CDC's record data will get materialized into. - DATA_COLUMN = "data" - - def __init__(self, table_name: str): - super().__init__() - self.table_name = quote_relation_name(table_name) - - @property - def sql_ddl(self): - """ - Define SQL DDL statement for creating table in CrateDB that stores re-materialized CDC events. - """ - return ( - f"CREATE TABLE IF NOT EXISTS {self.table_name} ({self.ID_COLUMN} TEXT, {self.DATA_COLUMN} OBJECT(DYNAMIC));" - ) - def to_sql(self, event: t.Dict[str, t.Any]) -> t.Union[SQLOperation, None]: """ Produce INSERT|UPDATE|DELETE SQL statement from insert|update|replace|delete CDC event record. diff --git a/src/zyp/model/base.py b/src/zyp/model/base.py index 91a766d..33cab8c 100644 --- a/src/zyp/model/base.py +++ b/src/zyp/model/base.py @@ -10,7 +10,7 @@ from zyp.util.data import no_privates_no_nulls_no_empties Record = t.Dict[str, t.Any] -Collection = t.List[Record] +Collection = t.Iterable[Record] DictOrList = t.Union[Record, Collection] diff --git a/tests/transform/conftest.py b/tests/transform/conftest.py index 9a9f3b1..2f0b3c4 100644 --- a/tests/transform/conftest.py +++ b/tests/transform/conftest.py @@ -3,6 +3,7 @@ RESET_TABLES = [ "from.dynamodb", "from.generic", + "from.mongodb", ] diff --git a/tests/transform/test_mongodb.py b/tests/transform/test_mongodb_cdc.py similarity index 86% rename from tests/transform/test_mongodb.py rename to tests/transform/test_mongodb_cdc.py index 404e453..ff466af 100644 --- a/tests/transform/test_mongodb.py +++ b/tests/transform/test_mongodb_cdc.py @@ -11,7 +11,7 @@ from bson import ObjectId, Timestamp -from commons_codec.transform.mongodb import MongoDBCDCTranslatorCrateDB +from commons_codec.transform.mongodb import MongoDBCDCTranslator MSG_OPERATION_UNKNOWN = { "operationType": "foobar", @@ -97,31 +97,31 @@ def test_decode_cdc_sql_ddl(): assert ( - MongoDBCDCTranslatorCrateDB(table_name="foo").sql_ddl + MongoDBCDCTranslator(table_name="foo").sql_ddl == "CREATE TABLE IF NOT EXISTS foo (oid TEXT, data OBJECT(DYNAMIC));" ) def test_decode_cdc_unknown_event(): with pytest.raises(ValueError) as ex: - MongoDBCDCTranslatorCrateDB(table_name="foo").to_sql(MSG_OPERATION_UNKNOWN) + MongoDBCDCTranslator(table_name="foo").to_sql(MSG_OPERATION_UNKNOWN) assert ex.match("Unknown CDC operation type: foobar") def test_decode_cdc_optype_missing(): with pytest.raises(ValueError) as ex: - MongoDBCDCTranslatorCrateDB(table_name="foo").to_sql(MSG_OPERATION_MISSING) + MongoDBCDCTranslator(table_name="foo").to_sql(MSG_OPERATION_MISSING) assert ex.match("Operation Type missing or empty: {}") def test_decode_cdc_optype_empty(): with pytest.raises(ValueError) as ex: - MongoDBCDCTranslatorCrateDB(table_name="foo").to_sql(MSG_OPERATION_EMPTY) + MongoDBCDCTranslator(table_name="foo").to_sql(MSG_OPERATION_EMPTY) assert ex.match("Operation Type missing or empty: {'operationType': ''}") def test_decode_cdc_insert(): - assert MongoDBCDCTranslatorCrateDB(table_name="foo").to_sql(MSG_INSERT) == SQLOperation( + assert MongoDBCDCTranslator(table_name="foo").to_sql(MSG_INSERT) == SQLOperation( statement="INSERT INTO foo (oid, data) VALUES (:oid, :record);", parameters={ "oid": "669683c2b0750b2c84893f3e", @@ -136,7 +136,7 @@ def test_decode_cdc_insert(): def test_decode_cdc_update(): - assert MongoDBCDCTranslatorCrateDB(table_name="foo").to_sql(MSG_UPDATE) == SQLOperation( + assert MongoDBCDCTranslator(table_name="foo").to_sql(MSG_UPDATE) == SQLOperation( statement="UPDATE foo SET data = :record WHERE oid = '669683c2b0750b2c84893f3e';", parameters={ "record": { @@ -150,21 +150,21 @@ def test_decode_cdc_update(): def test_decode_cdc_replace(): - assert MongoDBCDCTranslatorCrateDB(table_name="foo").to_sql(MSG_REPLACE) == SQLOperation( + assert MongoDBCDCTranslator(table_name="foo").to_sql(MSG_REPLACE) == SQLOperation( statement="UPDATE foo SET data = :record WHERE oid = '669683c2b0750b2c84893f3e';", parameters={"record": {"_id": {"$oid": "669683c2b0750b2c84893f3e"}, "tags": ["deleted"]}}, ) def test_decode_cdc_delete(): - assert MongoDBCDCTranslatorCrateDB(table_name="foo").to_sql(MSG_DELETE) == SQLOperation( + assert MongoDBCDCTranslator(table_name="foo").to_sql(MSG_DELETE) == SQLOperation( statement="DELETE FROM foo WHERE oid = '669693c5002ef91ea9c7a562';", parameters=None ) def test_decode_cdc_drop(): - assert MongoDBCDCTranslatorCrateDB(table_name="foo").to_sql(MSG_DROP) is None + assert MongoDBCDCTranslator(table_name="foo").to_sql(MSG_DROP) is None def test_decode_cdc_invalidate(): - assert MongoDBCDCTranslatorCrateDB(table_name="foo").to_sql(MSG_INVALIDATE) is None + assert MongoDBCDCTranslator(table_name="foo").to_sql(MSG_INVALIDATE) is None diff --git a/tests/transform/test_mongodb_convert.py b/tests/transform/test_mongodb_convert.py new file mode 100644 index 0000000..eac7b3e --- /dev/null +++ b/tests/transform/test_mongodb_convert.py @@ -0,0 +1,158 @@ +import pytest + +from commons_codec.transform.mongodb import MongoDBCrateDBConverter, date_converter, timestamp_converter +from zyp.model.bucket import BucketTransformation, ValueConverter +from zyp.model.collection import CollectionTransformation +from zyp.model.treatment import Treatment + +pytestmark = pytest.mark.mongodb + + +def test_date_converter_int(): + """ + Datetime values encoded as integer values will be returned unmodified. + """ + assert date_converter(42) == 42 + + +def test_timestamp_converter_s(): + """ + Timestamps encoded as integer values in seconds will be converted to milliseconds. + """ + assert timestamp_converter(1726612077) == 1726612077000 + + +def test_timestamp_converter_ms(): + """ + Timestamps in milliseconds will be returned unmodified. + """ + assert timestamp_converter(1726612077000) == 1726612077000 + + +def test_convert_with_treatment_ignore_complex_lists(): + """ + The `ignore_complex_lists` treatment ignores lists of dictionaries, often having deviating substructures. + """ + data_in = { + "_id": { + "$oid": "56027fcae4b09385a85f9344", + }, + "value": { + "id": 42, + "date": {"$date": "2015-09-23T10:32:42.33Z"}, + "some_complex_list": [ + {"id": "foo", "value": "something"}, + {"id": "bar", "value": {"$date": "2015-09-24T10:32:42.33Z"}}, + ], + }, + } + data_out = { + "_id": "56027fcae4b09385a85f9344", + "value": { + "id": 42, + "date": 1443004362000, + }, + } + + treatment = Treatment(ignore_complex_lists=True) + transformation = CollectionTransformation(treatment=treatment) + converter = MongoDBCrateDBConverter(transformation=transformation) + assert converter.decode_document(data_in) == data_out + + +def test_convert_with_treatment_normalize_complex_lists(): + """ + The `normalize_complex_lists` treatment converts inner values within lists of dictionaries. + """ + data_in = { + "_id": { + "$oid": "56027fcae4b09385a85f9344", + }, + "value": { + "id": 42, + "date": {"$date": "2015-09-23T10:32:42.33Z"}, + "some_complex_list": [ + {"id": "foo", "value": "something"}, + {"id": "bar", "value": {"$date": "2015-09-24T10:32:42.33Z"}}, + ], + }, + } + data_out = { + "_id": "56027fcae4b09385a85f9344", + "value": { + "id": 42, + "date": 1443004362000, + "some_complex_list": [ + {"id": "foo", "value": "something"}, + # FIXME: `normalize_complex_lists` does not see it's a timestamp. + {"id": "bar", "value": "{'$date': '2015-09-24T10:32:42.33Z'}"}, + ], + }, + } + + treatment = Treatment(normalize_complex_lists=True) + transformation = CollectionTransformation(treatment=treatment) + converter = MongoDBCrateDBConverter(transformation=transformation) + assert converter.decode_document(data_in) == data_out + + +def test_convert_with_treatment_all_options(): + """ + Validate all treatment options. + """ + data_in = { + "_id": { + "$oid": "56027fcae4b09385a85f9344", + }, + "ignore_toplevel": 42, + "value": { + "id": 42, + "date": {"$date": "2015-09-23T10:32:42.33Z"}, + "ignore_nested": 42, + }, + "to_list": 42, + "to_string": 42, + "to_dict_scalar": 42, + "to_dict_list": [{"user": 42}], + } + data_out = { + "_id": "56027fcae4b09385a85f9344", + "value": { + "date": 1443004362000, + "id": 42, + }, + "to_list": [42], + "to_string": "42", + "to_dict_scalar": {"id": 42}, + "to_dict_list": [{"user": {"id": 42}}], + } + + treatment = Treatment( + ignore_complex_lists=False, + ignore_field=["ignore_toplevel", "ignore_nested"], + convert_list=["to_list"], + convert_string=["to_string"], + convert_dict=[ + {"name": "to_dict_scalar", "wrapper_name": "id"}, + {"name": "user", "wrapper_name": "id"}, + ], + ) + transformation = CollectionTransformation(treatment=treatment) + converter = MongoDBCrateDBConverter(transformation=transformation) + assert converter.decode_document(data_in) == data_out + + +def test_convert_transform_timestamp(): + """ + Validate a Zyp transformation that converts datetime values in text format. + """ + data_in = [{"device": "Hotzenplotz", "temperature": 42.42, "timestamp": "06/14/2022 12:42:24"}] + data_out = [{"device": "Hotzenplotz", "temperature": 42.42, "timestamp": 1655203344}] + + bucket_transformation = BucketTransformation( + values=ValueConverter().add(pointer="/timestamp", transformer="to_unixtime"), + ) + transformation = CollectionTransformation(bucket=bucket_transformation) + converter = MongoDBCrateDBConverter(transformation=transformation) + data = converter.decode_documents(data_in) + assert data == data_out diff --git a/tests/transform/test_mongodb_data.py b/tests/transform/test_mongodb_data.py new file mode 100644 index 0000000..f53383b --- /dev/null +++ b/tests/transform/test_mongodb_data.py @@ -0,0 +1,90 @@ +import datetime as dt +from unittest import mock + +import bson + +RECORD_IN_ALL_TYPES = { + "_id": { + "$oid": "56027fcae4b09385a85f9344", + }, + "python": { + "bool": True, + "datetime": dt.datetime(2024, 7, 16, 14, 29, 22, 907000), + "dict_basic": {"foo": "bar"}, + "float": 42.42, + "int": 42, + "list_bool": [True, False], + "list_dict": [{"foo": "bar"}], + "list_float": [1.1, 2.2, 3.3], + "list_int": [1, 2, 3], + "list_string": ["foo", "bar"], + "set_int": {1, 2, 3}, + "set_str": {"Räuber", "Hotzenplotz"}, + "str": "Hotzenplotz", + "tuple_int": (1, 2, 3), + "tuple_str": ("Räuber", "Hotzenplotz"), + }, + "bson": { + "decimal128": bson.Decimal128("42.42"), + "dbref": bson.DBRef(id="foo", collection="bar", database="baz"), + "int64": bson.Int64(42.42), + "objectid": bson.ObjectId("669683c2b0750b2c84893f3e"), + "timestamp": bson.Timestamp(1721140162, 2), + }, + "canonical": { + "date_text": {"$date": "2015-09-23T10:32:42.33Z"}, + "date_unixtime": {"$date": {"$numberLong": "-284643869501"}}, + "double": {"$numberDouble": "-1.2345678921232E+18"}, + "int32": {"$numberInt": "-2147483648"}, + "int64": {"$numberLong": "-9223372036854775808"}, + "list_dict": [ + {"id": "bar", "value": {"$date": "2015-09-24T10:32:42.33Z"}}, + ], + "oid": {"$oid": "56027fcae4b09385a85f9344"}, + "uuid": {"$binary": {"base64": "c//SZESzTGmQ6OfR38A11A==", "subType": "04"}}, + }, +} + +RECORD_OUT_ALL_TYPES = { + "_id": "56027fcae4b09385a85f9344", + "python": { + "bool": True, + "datetime": 1721140162000, + "dict_basic": {"foo": "bar"}, + "float": 42.42, + "int": 42, + "list_bool": [True, False], + "list_dict": [{"foo": "bar"}], + "list_float": [1.1, 2.2, 3.3], + "list_int": [1, 2, 3], + "list_string": ["foo", "bar"], + "set_int": [1, 2, 3], + "set_str": mock.ANY, + "str": "Hotzenplotz", + "tuple_int": [1, 2, 3], + "tuple_str": ["Räuber", "Hotzenplotz"], + }, + "bson": { + "decimal128": "42.42", + "dbref": { + "id": "foo", + "ref": "bar", + "db": "baz", + }, + "int64": 42, + "objectid": "669683c2b0750b2c84893f3e", + "timestamp": {"i": 2000, "t": 1721140162000}, + }, + "canonical": { + "date_text": 1443004362000, + "date_unixtime": "-284643869501", + "double": "-1.2345678921232E+18", + "int32": "-2147483648", + "int64": "-9223372036854775808", + "list_dict": [ + {"id": "bar", "value": 1443090762000}, + ], + "oid": "56027fcae4b09385a85f9344", + "uuid": "73ffd264-44b3-4c69-90e8-e7d1dfc035d4", + }, +} diff --git a/tests/transform/test_mongodb_full.py b/tests/transform/test_mongodb_full.py new file mode 100644 index 0000000..2807a6d --- /dev/null +++ b/tests/transform/test_mongodb_full.py @@ -0,0 +1,44 @@ +import pytest + +from commons_codec.model import SQLOperation +from commons_codec.transform.mongodb import MongoDBCrateDBConverter, MongoDBFullLoadTranslator +from tests.transform.test_mongodb_data import RECORD_IN_ALL_TYPES, RECORD_OUT_ALL_TYPES + + +def test_sql_ddl(): + translator = MongoDBFullLoadTranslator(table_name="foo", converter=MongoDBCrateDBConverter()) + assert translator.sql_ddl == "CREATE TABLE IF NOT EXISTS foo (oid TEXT, data OBJECT(DYNAMIC));" + + +def test_to_sql_operation(): + """ + Verify outcome of `MongoDBFullLoadTranslator.to_sql` operation. + """ + translator = MongoDBFullLoadTranslator(table_name="foo", converter=MongoDBCrateDBConverter()) + assert translator.to_sql([RECORD_IN_ALL_TYPES]) == SQLOperation( + statement="INSERT INTO foo (oid, data) VALUES (:oid, :record);", + parameters=[{"oid": "56027fcae4b09385a85f9344", "record": RECORD_OUT_ALL_TYPES}], + ) + + +@pytest.mark.integration +def test_to_sql_cratedb(caplog, cratedb): + """ + Verify writing converted MongoDB document to CrateDB. + """ + + # Compute CrateDB operation (SQL+parameters) from MongoDB document. + translator = MongoDBFullLoadTranslator(table_name="from.mongodb", converter=MongoDBCrateDBConverter()) + operation = translator.to_sql(RECORD_IN_ALL_TYPES) + + # Insert into CrateDB. + cratedb.database.run_sql(translator.sql_ddl) + cratedb.database.run_sql(operation.statement, operation.parameters) + + # Verify data in target database. + assert cratedb.database.table_exists("from.mongodb") is True + assert cratedb.database.refresh_table("from.mongodb") is True + assert cratedb.database.count_records("from.mongodb") == 1 + + results = cratedb.database.run_sql('SELECT * FROM "from".mongodb;', records=True) # noqa: S608 + assert results[0]["data"] == RECORD_OUT_ALL_TYPES From d6f2209ec303bcffca4e130e34d3b3c7daa1a1eb Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Wed, 18 Sep 2024 17:40:55 +0200 Subject: [PATCH 02/10] Zyp: Fix execution of collection transformation --- CHANGES.md | 1 + src/zyp/model/collection.py | 8 ++++---- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index fb1ca83..cf02b3e 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -2,6 +2,7 @@ ## Unreleased - MongoDB: Added `MongoDBFullLoadTranslator` and `MongoDBCrateDBConverter` +- Zyp: Fixed execution of collection transformation ## 2024/09/10 v0.0.15 - Added Zyp Treatments, a slightly tailored transformation subsystem diff --git a/src/zyp/model/collection.py b/src/zyp/model/collection.py index f22ea1a..f757df2 100644 --- a/src/zyp/model/collection.py +++ b/src/zyp/model/collection.py @@ -26,7 +26,6 @@ class CollectionTransformation(Dumpable): def apply(self, data: DictOrList) -> Collection: collection = t.cast(Collection, data) - collection_out = collection if self.pre: collection = t.cast(Collection, self.pre.apply(collection)) if self.bucket: @@ -34,8 +33,9 @@ def apply(self, data: DictOrList) -> Collection: for item in collection: item = self.bucket.apply(item) collection_out.append(item) + collection = collection_out if self.post: - collection_out = t.cast(Collection, self.post.apply(collection_out)) + collection = t.cast(Collection, self.post.apply(collection)) if self.treatment: - collection_out = t.cast(Collection, self.treatment.apply(collection_out)) - return collection_out + collection = t.cast(Collection, self.treatment.apply(collection)) + return collection From 39b089e031ccb201f25f957e785a1e3ba7a3f6ba Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Wed, 18 Sep 2024 17:55:03 +0200 Subject: [PATCH 03/10] Zyp: Add software test and documentation about flattening lists --- CHANGES.md | 1 + doc/zyp/index.md | 50 ++++++++++++++++++++++++++++++-- tests/zyp/test_collection.py | 56 +++++++++++++++++++++++++++++++----- tests/zyp/test_moksha.py | 10 +++++++ 4 files changed, 107 insertions(+), 10 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index cf02b3e..738c1d1 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -3,6 +3,7 @@ ## Unreleased - MongoDB: Added `MongoDBFullLoadTranslator` and `MongoDBCrateDBConverter` - Zyp: Fixed execution of collection transformation +- Zyp: Added software test and documentation about flattening lists ## 2024/09/10 v0.0.15 - Added Zyp Treatments, a slightly tailored transformation subsystem diff --git a/doc/zyp/index.md b/doc/zyp/index.md index 8e7ee70..f3f85a8 100644 --- a/doc/zyp/index.md +++ b/doc/zyp/index.md @@ -20,7 +20,7 @@ implemented using [attrs] and [cattrs]. system before applying subsequent transformations. Zyp is working directly within the data pipeline, before data is inserted into the target system. -## Synopsis I +## Example I A basic transformation example for individual data records. ```python @@ -52,7 +52,7 @@ The result is a transformed data collection. ] ``` -## Synopsis II +## Example II A more advanced transformation example for a collection of data records. Consider a messy collection of input data. @@ -120,7 +120,7 @@ transformation = CollectionTransformation( post=MokshaTransformation().jq(".[] |= (.data.value /= 100)"), ) -data_out = transformation.apply(data_in) +assert transformation.apply(data_in) == data_out ``` Alternatively, serialize the `zyp-collection` transformation description, for example into YAML format. @@ -152,6 +152,50 @@ post: type: jq ``` +## Example III +A compact transformation example that uses `jq` to: + +- Unwrap the actual collection which is nested within the top-level `records` item. +- Flatten the item `nested-list` which contains nested lists. + +```python +from zyp.model.collection import CollectionTransformation +from zyp.model.moksha import MokshaTransformation + +data_in = { + "message-source": "system-3000", + "message-type": "eai-warehouse", + "records": [ + {"nested-list": [{"foo": 1}, [{"foo": 2}, {"foo": 3}]]}, + ], +} + +data_out = [ + {"nested-list": [{"foo": 1}, {"foo": 2}, {"foo": 3}]}, +] + +transformation = CollectionTransformation( + pre=MokshaTransformation() + .jq(".records") + .jq('.[] |= (."nested-list" |= flatten)'), +) + +assert transformation.apply(data_in) == data_out +``` + +The same transformation represented in YAML format looks like this. +```yaml +meta: + type: zyp-collection + version: 1 +pre: + rules: + - expression: .records + type: jq + - expression: .[] |= (.data."nested-list" |= flatten) + type: jq +``` + ## Prior Art - [Singer Transformer] diff --git a/tests/zyp/test_collection.py b/tests/zyp/test_collection.py index b30015d..c947366 100644 --- a/tests/zyp/test_collection.py +++ b/tests/zyp/test_collection.py @@ -8,8 +8,10 @@ from zyp.model.moksha import MokshaTransformation -class ComplexRecipe: +class CompositeRecipe: """ + Composite recipe composed of demo input and output data and a corresponding transformation. + It executes the following steps, in order of appearance: - Unwrap `records` attribute from container dictionary into actual collection. @@ -53,18 +55,58 @@ class ComplexRecipe: ) -def test_collection_transformation_success(): +class ListRecipes: + """ + Demo transformation recipe for handling anomalies in lists. + """ + + # Define a messy input data collection. + data_in = { + "message-source": "community", + "message-type": "mixed-pickles", + "records": [ + { + "data": { + "to-list-flat": [{"foo": 1}, [{"foo": 2}, {"foo": 3}]], + }, + }, + ], + } + + # Define expectation of the cleansed data collection. + data_out = [ + { + "data": { + "to-list-flat": [{"foo": 1}, {"foo": 2}, {"foo": 3}], + }, + }, + ] + + # Define transformation. + transformation = CollectionTransformation( + pre=MokshaTransformation().jq(".records").jq('.[] |= (.data."to-list-flat" |= flatten)'), + ) + + +def test_collection_transformation_composite_success(): """ Verify transformation recipe for re-shaping a collection of records. """ - assert ComplexRecipe.transformation.apply(ComplexRecipe.data_in) == ComplexRecipe.data_out + assert CompositeRecipe.transformation.apply(CompositeRecipe.data_in) == CompositeRecipe.data_out + + +def test_collection_transformation_lists_success(): + """ + Verify transformation recipe for re-shaping anomalies in lists. + """ + assert ListRecipes.transformation.apply(ListRecipes.data_in) == ListRecipes.data_out def test_collection_transformation_serialize(): """ Verify collection transformation description can be serialized to a data structure and back. """ - transformation = ComplexRecipe.transformation + transformation = CompositeRecipe.transformation transformation_dict = { "meta": {"version": 1, "type": "zyp-collection"}, "pre": { @@ -99,8 +141,8 @@ def test_collection_transformation_regular_load_and_apply(): """ payload = Path("tests/zyp/transformation-collection.yaml").read_text() transformation = CollectionTransformation.from_yaml(payload) - result = transformation.apply(deepcopy(ComplexRecipe.data_in)) - assert result == ComplexRecipe.data_out + result = transformation.apply(deepcopy(CompositeRecipe.data_in)) + assert result == CompositeRecipe.data_out def test_collection_transformation_treatment_load_and_apply(): @@ -109,7 +151,7 @@ def test_collection_transformation_treatment_load_and_apply(): """ payload = Path("tests/zyp/transformation-collection-treatment.yaml").read_text() transformation = CollectionTransformation.from_yaml(payload) - result = transformation.apply(deepcopy(ComplexRecipe.data_in)) + result = transformation.apply(deepcopy(CompositeRecipe.data_in)) assert result == { "message-source": "system-3000", "message-type": "eai-warehouse", diff --git a/tests/zyp/test_moksha.py b/tests/zyp/test_moksha.py index 566ca92..5393ffd 100644 --- a/tests/zyp/test_moksha.py +++ b/tests/zyp/test_moksha.py @@ -13,6 +13,16 @@ def test_moksha_jq_compute_nested(): assert transformation.apply([{"data": {"abc": 123}}]) == [{"data": {"abc": 246}}] +def test_moksha_jq_flatten_list(): + """ + Verify flattening nested list, using moksha/jq. + """ + data_in = [{"data": {"abc": [{"foo": 1}, [{"foo": 2}, {"foo": 3}]]}}] + data_out = [{"data": {"abc": [{"foo": 1}, {"foo": 2}, {"foo": 3}]}}] + transformation = MokshaTransformation().jq(".[] |= (.data.abc |= flatten)") + assert transformation.apply(data_in) == data_out + + def test_transon_duplicate_records(): """ Verify record duplication works well. From 95b4298dccc634ce4e2df77c58f904c0c283e0a6 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Wed, 18 Sep 2024 20:11:00 +0200 Subject: [PATCH 04/10] MongoDB: Use `bson` package to parse BSON CANONICAL representation --- CHANGES.md | 1 + src/commons_codec/transform/mongodb.py | 75 +++++++++++++------------ tests/transform/test_mongodb_convert.py | 19 ++++--- tests/transform/test_mongodb_data.py | 35 ++++++++---- 4 files changed, 76 insertions(+), 54 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 738c1d1..31dec19 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -4,6 +4,7 @@ - MongoDB: Added `MongoDBFullLoadTranslator` and `MongoDBCrateDBConverter` - Zyp: Fixed execution of collection transformation - Zyp: Added software test and documentation about flattening lists +- MongoDB: Use `bson` package to parse BSON CANONICAL representation ## 2024/09/10 v0.0.15 - Added Zyp Treatments, a slightly tailored transformation subsystem diff --git a/src/commons_codec/transform/mongodb.py b/src/commons_codec/transform/mongodb.py index 744b2f5..27dda33 100644 --- a/src/commons_codec/transform/mongodb.py +++ b/src/commons_codec/transform/mongodb.py @@ -1,17 +1,17 @@ # Copyright (c) 2021-2024, Crate.io Inc. # Distributed under the terms of the LGPLv3 license, see LICENSE. # ruff: noqa: S608 -import base64 import calendar +import datetime as dt import logging import typing as t from typing import Any, Iterable -from uuid import UUID +import bson import dateutil.parser as dateparser from attr import Factory from attrs import define -from bson.json_util import _json_convert +from bson.json_util import _json_convert, object_hook from pymongo.cursor import Cursor from sqlalchemy_cratedb.support import quote_relation_name @@ -28,21 +28,13 @@ def date_converter(value): if isinstance(value, int): return value - dt = dateparser.parse(value) - return calendar.timegm(dt.utctimetuple()) * 1000 - - -def timestamp_converter(value): - if len(str(value)) <= 10: - return value * 1000 - return value - - -type_converter = { - "date": date_converter, - "timestamp": timestamp_converter, - "undefined": lambda x: None, -} + elif isinstance(value, (str, bytes)): + datetime = dateparser.parse(value) + elif isinstance(value, dt.datetime): + datetime = value + else: + raise ValueError(f"Unable to convert datetime value: {value}") + return calendar.timegm(datetime.utctimetuple()) * 1000 @define @@ -59,15 +51,15 @@ def decode_documents(self, data: t.List[Document]) -> Iterable[dict[str, Any]]: """ Decode MongoDB Extended JSON, considering CrateDB specifics. """ - return self.transformation.apply(map(self.extract_value, data)) + return self.transformation.apply(map(self.decode_value, data)) def decode_document(self, data: Document) -> Document: """ Decode MongoDB Extended JSON, considering CrateDB specifics. """ - return self.extract_value(data) + return self.decode_value(data) - def extract_value(self, value: t.Any, parent_type: t.Optional[str] = None) -> t.Any: + def decode_value(self, value: t.Any) -> t.Any: """ Decode MongoDB Extended JSON. @@ -75,22 +67,35 @@ def extract_value(self, value: t.Any, parent_type: t.Optional[str] = None) -> t. - https://www.mongodb.com/docs/manual/reference/mongodb-extended-json/ """ if isinstance(value, dict): + # Decode item in BSON CANONICAL format. + if len(value) == 1: + return self.decode_canonical(value) + # Custom adjustments to compensate shape anomalies in source data. self.apply_special_treatments(value) - if len(value) == 1: - if "$binary" in value and value["$binary"]["subType"] in ["03", "04"]: - decoded = str(UUID(bytes=base64.b64decode(value["$binary"]["base64"]))) - return self.extract_value(decoded, parent_type) - for k, v in value.items(): - if k.startswith("$"): - return self.extract_value(v, k.lstrip("$")) - return {k.lstrip("$"): self.extract_value(v, parent_type) for (k, v) in value.items()} - if isinstance(value, list): - return [self.extract_value(v, parent_type) for v in value] - if parent_type: - converter = type_converter.get(parent_type) - if converter: - return converter(value) + + return {k: self.decode_value(v) for (k, v) in value.items()} + elif isinstance(value, list): + return [self.decode_value(v) for v in value] + + return value + + @staticmethod + def decode_canonical(value: t.Dict[str, t.Any]) -> t.Any: + type_ = list(value.keys())[0] + # Special handling for datetime representation in NUMBERLONG format (emulated depth-first). + if type_ == "$date" and "$numberLong" in value["$date"]: + value["$date"] = object_hook(value["$date"]) + value = object_hook(value) + is_bson = type(value).__module__.startswith("bson") + if isinstance(value, bson.Binary) and value.subtype in bson.ALL_UUID_SUBTYPES: + value = value.as_uuid() + if isinstance(value, bson.Timestamp): + value = value.as_datetime() + if isinstance(value, dt.datetime): + return date_converter(value) + if is_bson: + return str(value) return value def apply_special_treatments(self, value: t.Any): diff --git a/tests/transform/test_mongodb_convert.py b/tests/transform/test_mongodb_convert.py index eac7b3e..9f17186 100644 --- a/tests/transform/test_mongodb_convert.py +++ b/tests/transform/test_mongodb_convert.py @@ -1,6 +1,6 @@ import pytest -from commons_codec.transform.mongodb import MongoDBCrateDBConverter, date_converter, timestamp_converter +from commons_codec.transform.mongodb import MongoDBCrateDBConverter, date_converter from zyp.model.bucket import BucketTransformation, ValueConverter from zyp.model.collection import CollectionTransformation from zyp.model.treatment import Treatment @@ -12,21 +12,24 @@ def test_date_converter_int(): """ Datetime values encoded as integer values will be returned unmodified. """ - assert date_converter(42) == 42 + assert date_converter(1443004362000) == 1443004362000 -def test_timestamp_converter_s(): +def test_date_converter_iso8601(): """ - Timestamps encoded as integer values in seconds will be converted to milliseconds. + Datetime values encoded as ISO8601 values will be parsed. """ - assert timestamp_converter(1726612077) == 1726612077000 + assert date_converter("2015-09-23T10:32:42.33Z") == 1443004362000 + assert date_converter(b"2015-09-23T10:32:42.33Z") == 1443004362000 -def test_timestamp_converter_ms(): +def test_date_converter_invalid(): """ - Timestamps in milliseconds will be returned unmodified. + Incorrect datetime values will not be parsed. """ - assert timestamp_converter(1726612077000) == 1726612077000 + with pytest.raises(ValueError) as ex: + date_converter(None) + assert ex.match("Unable to convert datetime value: None") def test_convert_with_treatment_ignore_complex_lists(): diff --git a/tests/transform/test_mongodb_data.py b/tests/transform/test_mongodb_data.py index f53383b..82aaf0d 100644 --- a/tests/transform/test_mongodb_data.py +++ b/tests/transform/test_mongodb_data.py @@ -1,3 +1,10 @@ +""" +A few samples of MongoDB BSON / JSON structures. + +Derived from: +https://github.com/mongodb/bson-ruby/tree/v5.0.1/spec/spec_tests/data/corpus +""" + import datetime as dt from unittest import mock @@ -32,14 +39,17 @@ "timestamp": bson.Timestamp(1721140162, 2), }, "canonical": { - "date_text": {"$date": "2015-09-23T10:32:42.33Z"}, - "date_unixtime": {"$date": {"$numberLong": "-284643869501"}}, + "date_iso8601": {"$date": "2015-09-23T10:32:42.33Z"}, + "date_numberlong": {"$date": {"$numberLong": "1356351330501"}}, "double": {"$numberDouble": "-1.2345678921232E+18"}, "int32": {"$numberInt": "-2147483648"}, "int64": {"$numberLong": "-9223372036854775808"}, "list_dict": [ {"id": "bar", "value": {"$date": "2015-09-24T10:32:42.33Z"}}, ], + "list_int": [ + {"$numberInt": "-2147483648"}, + ], "oid": {"$oid": "56027fcae4b09385a85f9344"}, "uuid": {"$binary": {"base64": "c//SZESzTGmQ6OfR38A11A==", "subType": "04"}}, }, @@ -67,23 +77,26 @@ "bson": { "decimal128": "42.42", "dbref": { - "id": "foo", - "ref": "bar", - "db": "baz", + "$id": "foo", + "$ref": "bar", + "$db": "baz", }, "int64": 42, "objectid": "669683c2b0750b2c84893f3e", - "timestamp": {"i": 2000, "t": 1721140162000}, + "timestamp": 1721140162000, }, "canonical": { - "date_text": 1443004362000, - "date_unixtime": "-284643869501", - "double": "-1.2345678921232E+18", - "int32": "-2147483648", - "int64": "-9223372036854775808", + "date_iso8601": 1443004362000, + "date_numberlong": 1356351330000, + "double": -1.2345678921232e18, + "int32": -2147483648, + "int64": "-9223372036854775808", # TODO: Encoding as string is just fine? "list_dict": [ {"id": "bar", "value": 1443090762000}, ], + "list_int": [ + -2147483648, + ], "oid": "56027fcae4b09385a85f9344", "uuid": "73ffd264-44b3-4c69-90e8-e7d1dfc035d4", }, From 0869688a89807a1ab8d9ee8d3ace71d10ba1d792 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Wed, 18 Sep 2024 20:14:42 +0200 Subject: [PATCH 05/10] MongoDB: Refactor software tests --- tests/transform/mongodb/__init__.py | 0 tests/transform/{test_mongodb_data.py => mongodb/data.py} | 0 tests/transform/{ => mongodb}/test_mongodb_cdc.py | 0 tests/transform/{ => mongodb}/test_mongodb_convert.py | 0 tests/transform/{ => mongodb}/test_mongodb_full.py | 2 +- 5 files changed, 1 insertion(+), 1 deletion(-) create mode 100644 tests/transform/mongodb/__init__.py rename tests/transform/{test_mongodb_data.py => mongodb/data.py} (100%) rename tests/transform/{ => mongodb}/test_mongodb_cdc.py (100%) rename tests/transform/{ => mongodb}/test_mongodb_convert.py (100%) rename tests/transform/{ => mongodb}/test_mongodb_full.py (95%) diff --git a/tests/transform/mongodb/__init__.py b/tests/transform/mongodb/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/transform/test_mongodb_data.py b/tests/transform/mongodb/data.py similarity index 100% rename from tests/transform/test_mongodb_data.py rename to tests/transform/mongodb/data.py diff --git a/tests/transform/test_mongodb_cdc.py b/tests/transform/mongodb/test_mongodb_cdc.py similarity index 100% rename from tests/transform/test_mongodb_cdc.py rename to tests/transform/mongodb/test_mongodb_cdc.py diff --git a/tests/transform/test_mongodb_convert.py b/tests/transform/mongodb/test_mongodb_convert.py similarity index 100% rename from tests/transform/test_mongodb_convert.py rename to tests/transform/mongodb/test_mongodb_convert.py diff --git a/tests/transform/test_mongodb_full.py b/tests/transform/mongodb/test_mongodb_full.py similarity index 95% rename from tests/transform/test_mongodb_full.py rename to tests/transform/mongodb/test_mongodb_full.py index 2807a6d..1fb30bb 100644 --- a/tests/transform/test_mongodb_full.py +++ b/tests/transform/mongodb/test_mongodb_full.py @@ -2,7 +2,7 @@ from commons_codec.model import SQLOperation from commons_codec.transform.mongodb import MongoDBCrateDBConverter, MongoDBFullLoadTranslator -from tests.transform.test_mongodb_data import RECORD_IN_ALL_TYPES, RECORD_OUT_ALL_TYPES +from tests.transform.mongodb.data import RECORD_IN_ALL_TYPES, RECORD_OUT_ALL_TYPES def test_sql_ddl(): From dfbf8d73f35d47ec7b883cedbc92e109241c0317 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Wed, 18 Sep 2024 23:03:05 +0200 Subject: [PATCH 06/10] MongoDB: Complete and verify BSON data type mapping end-to-end --- CHANGES.md | 1 + src/commons_codec/transform/mongodb.py | 13 ++- tests/transform/mongodb/data.py | 136 +++++++++++++++++++++++-- 3 files changed, 135 insertions(+), 15 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 31dec19..edfc188 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -5,6 +5,7 @@ - Zyp: Fixed execution of collection transformation - Zyp: Added software test and documentation about flattening lists - MongoDB: Use `bson` package to parse BSON CANONICAL representation +- MongoDB: Complete and verify BSON data type mapping end-to-end ## 2024/09/10 v0.0.15 - Added Zyp Treatments, a slightly tailored transformation subsystem diff --git a/src/commons_codec/transform/mongodb.py b/src/commons_codec/transform/mongodb.py index 27dda33..4795724 100644 --- a/src/commons_codec/transform/mongodb.py +++ b/src/commons_codec/transform/mongodb.py @@ -82,13 +82,18 @@ def decode_value(self, value: t.Any) -> t.Any: @staticmethod def decode_canonical(value: t.Dict[str, t.Any]) -> t.Any: + """ + Decode BSON CANONICAL representation. + """ type_ = list(value.keys())[0] # Special handling for datetime representation in NUMBERLONG format (emulated depth-first). - if type_ == "$date" and "$numberLong" in value["$date"]: - value["$date"] = object_hook(value["$date"]) - value = object_hook(value) + is_date_numberlong = type_ == "$date" and "$numberLong" in value["$date"] + if is_date_numberlong: + return int(object_hook(value["$date"])) + else: + value = object_hook(value) is_bson = type(value).__module__.startswith("bson") - if isinstance(value, bson.Binary) and value.subtype in bson.ALL_UUID_SUBTYPES: + if isinstance(value, bson.Binary) and value.subtype == bson.UUID_SUBTYPE: value = value.as_uuid() if isinstance(value, bson.Timestamp): value = value.as_datetime() diff --git a/tests/transform/mongodb/data.py b/tests/transform/mongodb/data.py index 82aaf0d..ac29876 100644 --- a/tests/transform/mongodb/data.py +++ b/tests/transform/mongodb/data.py @@ -4,6 +4,7 @@ Derived from: https://github.com/mongodb/bson-ruby/tree/v5.0.1/spec/spec_tests/data/corpus """ +# ruff: noqa: ERA001 import datetime as dt from unittest import mock @@ -15,16 +16,23 @@ "$oid": "56027fcae4b09385a85f9344", }, "python": { - "bool": True, + "boolean": True, "datetime": dt.datetime(2024, 7, 16, 14, 29, 22, 907000), "dict_basic": {"foo": "bar"}, + "dict_dollarkey": {"$a": "foo"}, + # "dict_dottedkey": {"a.b": "foo"}, # TODO: InvalidColumnNameException["." contains a dot] + "dict_empty": {}, + "dict_emptykey": {"": "foo"}, + # "dict_specialkey": {".": "foo", "$": "foo"}, # TODO: InvalidColumnNameException["." contains a dot] "float": 42.42, "int": 42, - "list_bool": [True, False], + "list_boolean": [True, False], "list_dict": [{"foo": "bar"}], + "list_empty": [], "list_float": [1.1, 2.2, 3.3], "list_int": [1, 2, 3], "list_string": ["foo", "bar"], + "null": None, "set_int": {1, 2, 3}, "set_str": {"Räuber", "Hotzenplotz"}, "str": "Hotzenplotz", @@ -32,25 +40,71 @@ "tuple_str": ("Räuber", "Hotzenplotz"), }, "bson": { + "code": bson.code.Code("console.write(foo)", scope={"foo": "bar"}), + "binary_uuid": bson.Binary(data=b"sccsccsccsccsccs", subtype=bson.binary.UUID_SUBTYPE), + "datetimems": bson.DatetimeMS(1721140162987), "decimal128": bson.Decimal128("42.42"), - "dbref": bson.DBRef(id="foo", collection="bar", database="baz"), + "dbref": bson.DBRef(collection="foo", id="bar", database="baz"), "int64": bson.Int64(42.42), + "maxkey": bson.MaxKey(), + "minkey": bson.MinKey(), "objectid": bson.ObjectId("669683c2b0750b2c84893f3e"), + "regex": bson.regex.Regex(".*"), "timestamp": bson.Timestamp(1721140162, 2), }, "canonical": { + "code_ascii": {"$code": "abab"}, + "code_bytes": {"$code": "ab\u0000ab\u0000"}, + "code_scope": {"$code": "abab", "$scope": {"x": {"$numberInt": 42}}}, "date_iso8601": {"$date": "2015-09-23T10:32:42.33Z"}, - "date_numberlong": {"$date": {"$numberLong": "1356351330501"}}, - "double": {"$numberDouble": "-1.2345678921232E+18"}, + "date_numberlong": {"$date": {"$numberLong": "1356351330000"}}, + "dbref": { + "$id": {"$oid": "56027fcae4b09385a85f9344"}, + "$ref": "foo", + "$db": "bar", + }, + "decimal_infinity": {"$numberDecimal": "Infinity"}, + "decimal_largest": {"$numberDecimal": "1234567890123456789012345678901234"}, + "decimal_nan": {"$numberDecimal": "NaN"}, + "decimal_regular": {"$numberDecimal": "0.000001234567890123456789012345678901234"}, + # "double_infinity": {"$numberDouble": "Infinity"}, # TODO: SQLParseException[Failed to parse source + "double_regular": {"$numberDouble": "-1.2345678921232E+18"}, "int32": {"$numberInt": "-2147483648"}, "int64": {"$numberLong": "-9223372036854775808"}, + "list_date": [ + {"$date": "2015-09-24T10:32:42.33Z"}, + {"$date": {"$numberLong": "2147483647000"}}, + {"$date": {"$numberLong": "-2147483648000"}}, + ], "list_dict": [ {"id": "bar", "value": {"$date": "2015-09-24T10:32:42.33Z"}}, ], "list_int": [ {"$numberInt": "-2147483648"}, ], + "list_oid": [ + {"$oid": "56027fcae4b09385a85f9344"}, + ], + "list_uuid": [ + # TODO: TypeError: Object of type bytes is not JSON serializable + # {"$binary": {"base64": "c//SZESzTGmQ6OfR38A11A==", "subType": "00"}}, + {"$binary": {"base64": "c//SZESzTGmQ6OfR38A11A==", "subType": "01"}}, + {"$binary": {"base64": "c//SZESzTGmQ6OfR38A11A==", "subType": "02"}}, + {"$binary": {"base64": "c//SZESzTGmQ6OfR38A11A==", "subType": "03"}}, + {"$binary": {"base64": "c//AYDC420csII3929483B==", "subType": "04"}}, + {"$binary": {"base64": "c//AYDC420csII3929483B==", "subType": "05"}}, + {"$binary": {"base64": "c//AYDC420csII3929483B==", "subType": "06"}}, + {"$binary": {"base64": "c//AYDC420csII3929483B==", "subType": "80"}}, + ], + "maxkey": {"$maxKey": 1}, + "minkey": {"$minKey": 1}, "oid": {"$oid": "56027fcae4b09385a85f9344"}, + "regex": {"$regularExpression": {"pattern": ".*", "options": ""}}, + "symbol": {"$symbol": "foo"}, + "timestamp": {"$timestamp": {"t": 123456789, "i": 42}}, + # TODO: Implement other UUID subtypes. + # https://github.com/mongodb/bson-ruby/blob/v5.0.1/spec/spec_tests/data/corpus/binary.json + "undefined": {"$undefined": True}, "uuid": {"$binary": {"base64": "c//SZESzTGmQ6OfR38A11A==", "subType": "04"}}, }, } @@ -58,16 +112,23 @@ RECORD_OUT_ALL_TYPES = { "_id": "56027fcae4b09385a85f9344", "python": { - "bool": True, + "boolean": True, "datetime": 1721140162000, "dict_basic": {"foo": "bar"}, + "dict_dollarkey": {"$a": "foo"}, + # "dict_dottedkey": {'a.b': 'foo'}, # TODO: InvalidColumnNameException["." contains a dot] + "dict_empty": {}, + "dict_emptykey": {"": "foo"}, + # "dict_specialkey": {'$': 'foo', '.': 'foo'}, # TODO: InvalidColumnNameException["." contains a dot] "float": 42.42, "int": 42, - "list_bool": [True, False], + "list_boolean": [True, False], "list_dict": [{"foo": "bar"}], + "list_empty": [], "list_float": [1.1, 2.2, 3.3], "list_int": [1, 2, 3], "list_string": ["foo", "bar"], + "null": None, "set_int": [1, 2, 3], "set_str": mock.ANY, "str": "Hotzenplotz", @@ -75,29 +136,82 @@ "tuple_str": ["Räuber", "Hotzenplotz"], }, "bson": { + "code": { + "$code": "console.write(foo)", + "$scope": { + "foo": "bar", + }, + }, + "datetimems": 1721140162000, + "binary_uuid": "73636373-6363-7363-6373-636373636373", "decimal128": "42.42", "dbref": { - "$id": "foo", - "$ref": "bar", + "$ref": "foo", + "$id": "bar", "$db": "baz", }, "int64": 42, + "maxkey": "MaxKey()", + "minkey": "MinKey()", "objectid": "669683c2b0750b2c84893f3e", + "regex": "Regex('.*', 0)", "timestamp": 1721140162000, }, "canonical": { + "code_ascii": "abab", + "code_bytes": "ab\x00ab\x00", + "code_scope": { + "$code": "abab", + "$scope": { + "x": {"$numberInt": 42}, + }, + }, "date_iso8601": 1443004362000, "date_numberlong": 1356351330000, - "double": -1.2345678921232e18, + "dbref": { + "$id": "56027fcae4b09385a85f9344", + "$ref": "foo", + "$db": "bar", + }, + "decimal_infinity": "Infinity", + "decimal_largest": "1234567890123456789012345678901234", + "decimal_nan": "NaN", + "decimal_regular": "0.000001234567890123456789012345678901234", + "double_regular": -1.2345678921232e18, "int32": -2147483648, - "int64": "-9223372036854775808", # TODO: Encoding as string is just fine? + "int64": "-9223372036854775808", # TODO: Representation as string is just fine? + "list_date": [ + 1443090762000, + 2147483647000, + -2147483648000, + ], "list_dict": [ {"id": "bar", "value": 1443090762000}, ], "list_int": [ -2147483648, ], + "list_oid": [ + "56027fcae4b09385a85f9344", + ], + "list_uuid": [ + # TODO: TypeError: Object of type bytes is not JSON serializable + # b's\xff\xd2dD\xb3Li\x90\xe8\xe7\xd1\xdf\xc05\xd4', + "b's\\xff\\xd2dD\\xb3Li\\x90\\xe8\\xe7\\xd1\\xdf\\xc05\\xd4'", + "b's\\xff\\xd2dD\\xb3Li\\x90\\xe8\\xe7\\xd1\\xdf\\xc05\\xd4'", + "b's\\xff\\xd2dD\\xb3Li\\x90\\xe8\\xe7\\xd1\\xdf\\xc05\\xd4'", + "73ffc060-30b8-db47-2c20-8dfddbde3cdc", + "b's\\xff\\xc0`0\\xb8\\xdbG, \\x8d\\xfd\\xdb\\xde<\\xdc'", + "b's\\xff\\xc0`0\\xb8\\xdbG, \\x8d\\xfd\\xdb\\xde<\\xdc'", + "b's\\xff\\xc0`0\\xb8\\xdbG, \\x8d\\xfd\\xdb\\xde<\\xdc'", + ], + "maxkey": "MaxKey()", + "minkey": "MinKey()", "oid": "56027fcae4b09385a85f9344", + "regex": "Regex('.*', 0)", + "symbol": "foo", + "timestamp": 123456789000, + "undefined": None, "uuid": "73ffd264-44b3-4c69-90e8-e7d1dfc035d4", }, } From 6e754576a30f1dbda3778f4884dd01597ed63cec Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Wed, 18 Sep 2024 23:27:00 +0200 Subject: [PATCH 07/10] MongoDB: Allow running decoder without any transformation at all --- src/commons_codec/transform/mongodb.py | 13 +++++++---- .../transform/mongodb/test_mongodb_convert.py | 23 +++++++++++++++++++ 2 files changed, 31 insertions(+), 5 deletions(-) diff --git a/src/commons_codec/transform/mongodb.py b/src/commons_codec/transform/mongodb.py index 4795724..1498f3a 100644 --- a/src/commons_codec/transform/mongodb.py +++ b/src/commons_codec/transform/mongodb.py @@ -5,17 +5,17 @@ import datetime as dt import logging import typing as t -from typing import Any, Iterable +from typing import Iterable import bson import dateutil.parser as dateparser -from attr import Factory from attrs import define from bson.json_util import _json_convert, object_hook from pymongo.cursor import Cursor from sqlalchemy_cratedb.support import quote_relation_name from commons_codec.model import SQLOperation +from zyp.model.base import DictOrList from zyp.model.collection import CollectionTransformation logger = logging.getLogger(__name__) @@ -45,13 +45,16 @@ class MongoDBCrateDBConverter: Extracted from cratedb-toolkit, earlier migr8. """ - transformation: CollectionTransformation = Factory(CollectionTransformation) + transformation: t.Union[CollectionTransformation, None] = None - def decode_documents(self, data: t.List[Document]) -> Iterable[dict[str, Any]]: + def decode_documents(self, data: t.Iterable[Document]) -> Iterable[Document]: """ Decode MongoDB Extended JSON, considering CrateDB specifics. """ - return self.transformation.apply(map(self.decode_value, data)) + data = map(self.decode_value, data) + if self.transformation is not None: + data = self.transformation.apply(t.cast(DictOrList, data)) + return data def decode_document(self, data: Document) -> Document: """ diff --git a/tests/transform/mongodb/test_mongodb_convert.py b/tests/transform/mongodb/test_mongodb_convert.py index 9f17186..91827ba 100644 --- a/tests/transform/mongodb/test_mongodb_convert.py +++ b/tests/transform/mongodb/test_mongodb_convert.py @@ -32,6 +32,29 @@ def test_date_converter_invalid(): assert ex.match("Unable to convert datetime value: None") +def test_convert_basic(): + """ + Just a basic conversion, without transformation. + """ + data_in = { + "_id": { + "$oid": "56027fcae4b09385a85f9344", + }, + "value": { + "id": 42, + }, + } + data_out = { + "_id": "56027fcae4b09385a85f9344", + "value": { + "id": 42, + }, + } + + converter = MongoDBCrateDBConverter() + assert list(converter.decode_documents([data_in])) == [data_out] + + def test_convert_with_treatment_ignore_complex_lists(): """ The `ignore_complex_lists` treatment ignores lists of dictionaries, often having deviating substructures. From 50820a52f98e097023f8cbbbbc294c82214b4c8c Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Wed, 18 Sep 2024 23:35:01 +0200 Subject: [PATCH 08/10] MongoDB: Use improved decoding machinery also for `MongoDBCDCTranslator` --- CHANGES.md | 1 + src/commons_codec/transform/mongodb.py | 14 ++++++-------- tests/transform/mongodb/test_mongodb_cdc.py | 10 +++++----- tests/transform/mongodb/test_mongodb_full.py | 8 ++++---- 4 files changed, 16 insertions(+), 17 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index edfc188..d2dd940 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -6,6 +6,7 @@ - Zyp: Added software test and documentation about flattening lists - MongoDB: Use `bson` package to parse BSON CANONICAL representation - MongoDB: Complete and verify BSON data type mapping end-to-end +- MongoDB: Use improved decoding machinery also for `MongoDBCDCTranslator` ## 2024/09/10 v0.0.15 - Added Zyp Treatments, a slightly tailored transformation subsystem diff --git a/src/commons_codec/transform/mongodb.py b/src/commons_codec/transform/mongodb.py index 1498f3a..51fcd50 100644 --- a/src/commons_codec/transform/mongodb.py +++ b/src/commons_codec/transform/mongodb.py @@ -142,9 +142,9 @@ class MongoDBTranslatorBase: # Define name of the column where CDC's record data will get materialized into. DATA_COLUMN = "data" - def __init__(self, table_name: str): - super().__init__() + def __init__(self, table_name: str, converter: t.Union[MongoDBCrateDBConverter, None] = None): self.table_name = quote_relation_name(table_name) + self.converter = converter or MongoDBCrateDBConverter() @property def sql_ddl(self): @@ -193,10 +193,6 @@ class MongoDBFullLoadTranslator(MongoDBTranslatorBase): Translate a MongoDB document into a CrateDB document. """ - def __init__(self, table_name: str, converter: MongoDBCrateDBConverter): - super().__init__(table_name=table_name) - self.converter = converter - @staticmethod def get_document_key(record: t.Mapping[str, t.Any]) -> str: """ @@ -270,7 +266,8 @@ def to_sql(self, event: t.Dict[str, t.Any]) -> t.Union[SQLOperation, None]: if operation_type == "insert": oid: str = self.get_document_key(event) - record = self.decode_bson(self.get_full_document(event)) + document = self.get_full_document(event) + record = self.converter.decode_document(self.decode_bson(document)) sql = f"INSERT INTO {self.table_name} " f"({self.ID_COLUMN}, {self.DATA_COLUMN}) " "VALUES (:oid, :record);" parameters = {"oid": oid, "record": record} @@ -278,7 +275,8 @@ def to_sql(self, event: t.Dict[str, t.Any]) -> t.Union[SQLOperation, None]: # you need to use `watch(full_document="updateLookup")`. # https://www.mongodb.com/docs/manual/changeStreams/#lookup-full-document-for-update-operations elif operation_type in ["update", "replace"]: - record = self.decode_bson(self.get_full_document(event)) + document = self.get_full_document(event) + record = self.converter.decode_document(self.decode_bson(document)) where_clause = self.where_clause(event) sql = f"UPDATE {self.table_name} " f"SET {self.DATA_COLUMN} = :record " f"WHERE {where_clause};" parameters = {"record": record} diff --git a/tests/transform/mongodb/test_mongodb_cdc.py b/tests/transform/mongodb/test_mongodb_cdc.py index ff466af..9baf22b 100644 --- a/tests/transform/mongodb/test_mongodb_cdc.py +++ b/tests/transform/mongodb/test_mongodb_cdc.py @@ -126,10 +126,10 @@ def test_decode_cdc_insert(): parameters={ "oid": "669683c2b0750b2c84893f3e", "record": { - "_id": {"$oid": "669683c2b0750b2c84893f3e"}, + "_id": "669683c2b0750b2c84893f3e", "id": "5F9E", "data": {"temperature": 42.42, "humidity": 84.84}, - "meta": {"timestamp": {"$date": "2024-07-11T23:17:42Z"}, "device": "foo"}, + "meta": {"timestamp": 1720739862000, "device": "foo"}, }, }, ) @@ -140,10 +140,10 @@ def test_decode_cdc_update(): statement="UPDATE foo SET data = :record WHERE oid = '669683c2b0750b2c84893f3e';", parameters={ "record": { - "_id": {"$oid": "669683c2b0750b2c84893f3e"}, + "_id": "669683c2b0750b2c84893f3e", "id": "5F9E", "data": {"temperature": 42.5}, - "meta": {"timestamp": {"$date": "2024-07-11T23:17:42Z"}, "device": "foo"}, + "meta": {"timestamp": 1720739862000, "device": "foo"}, } }, ) @@ -152,7 +152,7 @@ def test_decode_cdc_update(): def test_decode_cdc_replace(): assert MongoDBCDCTranslator(table_name="foo").to_sql(MSG_REPLACE) == SQLOperation( statement="UPDATE foo SET data = :record WHERE oid = '669683c2b0750b2c84893f3e';", - parameters={"record": {"_id": {"$oid": "669683c2b0750b2c84893f3e"}, "tags": ["deleted"]}}, + parameters={"record": {"_id": "669683c2b0750b2c84893f3e", "tags": ["deleted"]}}, ) diff --git a/tests/transform/mongodb/test_mongodb_full.py b/tests/transform/mongodb/test_mongodb_full.py index 1fb30bb..dfbd9a5 100644 --- a/tests/transform/mongodb/test_mongodb_full.py +++ b/tests/transform/mongodb/test_mongodb_full.py @@ -1,12 +1,12 @@ import pytest from commons_codec.model import SQLOperation -from commons_codec.transform.mongodb import MongoDBCrateDBConverter, MongoDBFullLoadTranslator +from commons_codec.transform.mongodb import MongoDBFullLoadTranslator from tests.transform.mongodb.data import RECORD_IN_ALL_TYPES, RECORD_OUT_ALL_TYPES def test_sql_ddl(): - translator = MongoDBFullLoadTranslator(table_name="foo", converter=MongoDBCrateDBConverter()) + translator = MongoDBFullLoadTranslator(table_name="foo") assert translator.sql_ddl == "CREATE TABLE IF NOT EXISTS foo (oid TEXT, data OBJECT(DYNAMIC));" @@ -14,7 +14,7 @@ def test_to_sql_operation(): """ Verify outcome of `MongoDBFullLoadTranslator.to_sql` operation. """ - translator = MongoDBFullLoadTranslator(table_name="foo", converter=MongoDBCrateDBConverter()) + translator = MongoDBFullLoadTranslator(table_name="foo") assert translator.to_sql([RECORD_IN_ALL_TYPES]) == SQLOperation( statement="INSERT INTO foo (oid, data) VALUES (:oid, :record);", parameters=[{"oid": "56027fcae4b09385a85f9344", "record": RECORD_OUT_ALL_TYPES}], @@ -28,7 +28,7 @@ def test_to_sql_cratedb(caplog, cratedb): """ # Compute CrateDB operation (SQL+parameters) from MongoDB document. - translator = MongoDBFullLoadTranslator(table_name="from.mongodb", converter=MongoDBCrateDBConverter()) + translator = MongoDBFullLoadTranslator(table_name="from.mongodb") operation = translator.to_sql(RECORD_IN_ALL_TYPES) # Insert into CrateDB. From 21c4b776a8fa34d17378a7b7396f81bd57f5c1fa Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Wed, 18 Sep 2024 23:39:55 +0200 Subject: [PATCH 09/10] Dependencies: Make MongoDB subsystem not strictly depend on Zyp --- .github/workflows/tests.yml | 4 ++-- CHANGES.md | 1 + src/commons_codec/transform/mongodb.py | 13 ++++++------- tests/transform/mongodb/__init__.py | 3 +++ tests/transform/mongodb/test_mongodb_cdc.py | 5 +---- tests/transform/mongodb/test_mongodb_convert.py | 5 +++-- tests/transform/mongodb/test_mongodb_full.py | 3 +++ 7 files changed, 19 insertions(+), 15 deletions(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 9cd1b67..3b6e22e 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -106,7 +106,7 @@ jobs: pip install "setuptools>=64" --upgrade # Install package in editable mode. - pip install --use-pep517 --prefer-binary --editable=.[mongodb,develop,test] + pip install --use-pep517 --prefer-binary --editable=.[develop,test] - name: Run linters and software tests run: poe test -- -m 'dynamodb' @@ -161,7 +161,7 @@ jobs: pip install "setuptools>=64" --upgrade # Install package in editable mode. - pip install --use-pep517 --prefer-binary --editable=.[mongodb,develop,test] + pip install --use-pep517 --prefer-binary --editable=.[mongodb,zyp,develop,test] - name: Run linters and software tests run: poe test -- -m 'mongodb' diff --git a/CHANGES.md b/CHANGES.md index d2dd940..b8d368d 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -7,6 +7,7 @@ - MongoDB: Use `bson` package to parse BSON CANONICAL representation - MongoDB: Complete and verify BSON data type mapping end-to-end - MongoDB: Use improved decoding machinery also for `MongoDBCDCTranslator` +- Dependencies: Make MongoDB subsystem not strictly depend on Zyp ## 2024/09/10 v0.0.15 - Added Zyp Treatments, a slightly tailored transformation subsystem diff --git a/src/commons_codec/transform/mongodb.py b/src/commons_codec/transform/mongodb.py index 51fcd50..e7cf450 100644 --- a/src/commons_codec/transform/mongodb.py +++ b/src/commons_codec/transform/mongodb.py @@ -15,16 +15,14 @@ from sqlalchemy_cratedb.support import quote_relation_name from commons_codec.model import SQLOperation -from zyp.model.base import DictOrList -from zyp.model.collection import CollectionTransformation - -logger = logging.getLogger(__name__) - Document = t.Mapping[str, t.Any] DocumentCollection = t.List[Document] +logger = logging.getLogger(__name__) + + def date_converter(value): if isinstance(value, int): return value @@ -45,15 +43,16 @@ class MongoDBCrateDBConverter: Extracted from cratedb-toolkit, earlier migr8. """ - transformation: t.Union[CollectionTransformation, None] = None + transformation: t.Any = None def decode_documents(self, data: t.Iterable[Document]) -> Iterable[Document]: """ Decode MongoDB Extended JSON, considering CrateDB specifics. """ data = map(self.decode_value, data) + # TODO: This is currently untyped. Types are defined in Zyp, see `zyp.model.base`. if self.transformation is not None: - data = self.transformation.apply(t.cast(DictOrList, data)) + data = self.transformation.apply(data) return data def decode_document(self, data: Document) -> Document: diff --git a/tests/transform/mongodb/__init__.py b/tests/transform/mongodb/__init__.py index e69de29..663f2d6 100644 --- a/tests/transform/mongodb/__init__.py +++ b/tests/transform/mongodb/__init__.py @@ -0,0 +1,3 @@ +import pytest + +pytest.importorskip("bson", reason="Skipping MongoDB/BSON tests because 'bson' package is not installed") diff --git a/tests/transform/mongodb/test_mongodb_cdc.py b/tests/transform/mongodb/test_mongodb_cdc.py index 9baf22b..9980b8b 100644 --- a/tests/transform/mongodb/test_mongodb_cdc.py +++ b/tests/transform/mongodb/test_mongodb_cdc.py @@ -5,12 +5,9 @@ import datetime -from commons_codec.model import SQLOperation - -pytest.importorskip("pymongo") - from bson import ObjectId, Timestamp +from commons_codec.model import SQLOperation from commons_codec.transform.mongodb import MongoDBCDCTranslator MSG_OPERATION_UNKNOWN = { diff --git a/tests/transform/mongodb/test_mongodb_convert.py b/tests/transform/mongodb/test_mongodb_convert.py index 91827ba..85031f9 100644 --- a/tests/transform/mongodb/test_mongodb_convert.py +++ b/tests/transform/mongodb/test_mongodb_convert.py @@ -1,12 +1,13 @@ +# ruff: noqa: E402 import pytest +pytestmark = pytest.mark.mongodb + from commons_codec.transform.mongodb import MongoDBCrateDBConverter, date_converter from zyp.model.bucket import BucketTransformation, ValueConverter from zyp.model.collection import CollectionTransformation from zyp.model.treatment import Treatment -pytestmark = pytest.mark.mongodb - def test_date_converter_int(): """ diff --git a/tests/transform/mongodb/test_mongodb_full.py b/tests/transform/mongodb/test_mongodb_full.py index dfbd9a5..5151a3e 100644 --- a/tests/transform/mongodb/test_mongodb_full.py +++ b/tests/transform/mongodb/test_mongodb_full.py @@ -1,5 +1,8 @@ +# ruff: noqa: E402 import pytest +pytestmark = pytest.mark.mongodb + from commons_codec.model import SQLOperation from commons_codec.transform.mongodb import MongoDBFullLoadTranslator from tests.transform.mongodb.data import RECORD_IN_ALL_TYPES, RECORD_OUT_ALL_TYPES From d2044342648355ca75aa6497f10efebf6c0288af Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Thu, 19 Sep 2024 00:27:11 +0200 Subject: [PATCH 10/10] MongoDB: Fix timezone woes when testing on CI/GHA --- tests/transform/mongodb/test_mongodb_convert.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/transform/mongodb/test_mongodb_convert.py b/tests/transform/mongodb/test_mongodb_convert.py index 85031f9..fa3e58b 100644 --- a/tests/transform/mongodb/test_mongodb_convert.py +++ b/tests/transform/mongodb/test_mongodb_convert.py @@ -173,8 +173,8 @@ def test_convert_transform_timestamp(): """ Validate a Zyp transformation that converts datetime values in text format. """ - data_in = [{"device": "Hotzenplotz", "temperature": 42.42, "timestamp": "06/14/2022 12:42:24"}] - data_out = [{"device": "Hotzenplotz", "temperature": 42.42, "timestamp": 1655203344}] + data_in = [{"device": "Hotzenplotz", "temperature": 42.42, "timestamp": "06/14/2022 12:42:24.4242 UTC"}] + data_out = [{"device": "Hotzenplotz", "temperature": 42.42, "timestamp": 1655210544.4242}] bucket_transformation = BucketTransformation( values=ValueConverter().add(pointer="/timestamp", transformer="to_unixtime"),