diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 9cd1b67..3b6e22e 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -106,7 +106,7 @@ jobs: pip install "setuptools>=64" --upgrade # Install package in editable mode. - pip install --use-pep517 --prefer-binary --editable=.[mongodb,develop,test] + pip install --use-pep517 --prefer-binary --editable=.[develop,test] - name: Run linters and software tests run: poe test -- -m 'dynamodb' @@ -161,7 +161,7 @@ jobs: pip install "setuptools>=64" --upgrade # Install package in editable mode. - pip install --use-pep517 --prefer-binary --editable=.[mongodb,develop,test] + pip install --use-pep517 --prefer-binary --editable=.[mongodb,zyp,develop,test] - name: Run linters and software tests run: poe test -- -m 'mongodb' diff --git a/CHANGES.md b/CHANGES.md index edb1646..b8d368d 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,6 +1,13 @@ # Changelog ## Unreleased +- MongoDB: Added `MongoDBFullLoadTranslator` and `MongoDBCrateDBConverter` +- Zyp: Fixed execution of collection transformation +- Zyp: Added software test and documentation about flattening lists +- MongoDB: Use `bson` package to parse BSON CANONICAL representation +- MongoDB: Complete and verify BSON data type mapping end-to-end +- MongoDB: Use improved decoding machinery also for `MongoDBCDCTranslator` +- Dependencies: Make MongoDB subsystem not strictly depend on Zyp ## 2024/09/10 v0.0.15 - Added Zyp Treatments, a slightly tailored transformation subsystem diff --git a/doc/zyp/index.md b/doc/zyp/index.md index 8e7ee70..f3f85a8 100644 --- a/doc/zyp/index.md +++ b/doc/zyp/index.md @@ -20,7 +20,7 @@ implemented using [attrs] and [cattrs]. system before applying subsequent transformations. Zyp is working directly within the data pipeline, before data is inserted into the target system. -## Synopsis I +## Example I A basic transformation example for individual data records. ```python @@ -52,7 +52,7 @@ The result is a transformed data collection. ] ``` -## Synopsis II +## Example II A more advanced transformation example for a collection of data records. Consider a messy collection of input data. @@ -120,7 +120,7 @@ transformation = CollectionTransformation( post=MokshaTransformation().jq(".[] |= (.data.value /= 100)"), ) -data_out = transformation.apply(data_in) +assert transformation.apply(data_in) == data_out ``` Alternatively, serialize the `zyp-collection` transformation description, for example into YAML format. @@ -152,6 +152,50 @@ post: type: jq ``` +## Example III +A compact transformation example that uses `jq` to: + +- Unwrap the actual collection which is nested within the top-level `records` item. +- Flatten the item `nested-list` which contains nested lists. + +```python +from zyp.model.collection import CollectionTransformation +from zyp.model.moksha import MokshaTransformation + +data_in = { + "message-source": "system-3000", + "message-type": "eai-warehouse", + "records": [ + {"nested-list": [{"foo": 1}, [{"foo": 2}, {"foo": 3}]]}, + ], +} + +data_out = [ + {"nested-list": [{"foo": 1}, {"foo": 2}, {"foo": 3}]}, +] + +transformation = CollectionTransformation( + pre=MokshaTransformation() + .jq(".records") + .jq('.[] |= (."nested-list" |= flatten)'), +) + +assert transformation.apply(data_in) == data_out +``` + +The same transformation represented in YAML format looks like this. +```yaml +meta: + type: zyp-collection + version: 1 +pre: + rules: + - expression: .records + type: jq + - expression: .[] |= (.data."nested-list" |= flatten) + type: jq +``` + ## Prior Art - [Singer Transformer] diff --git a/examples/mongodb_cdc_cratedb.py b/examples/mongodb_cdc_cratedb.py index effa17c..07d7a19 100644 --- a/examples/mongodb_cdc_cratedb.py +++ b/examples/mongodb_cdc_cratedb.py @@ -15,7 +15,7 @@ import sqlalchemy as sa from sqlalchemy_cratedb.support import quote_relation_name -from commons_codec.transform.mongodb import MongoDBCDCTranslatorCrateDB +from commons_codec.transform.mongodb import MongoDBCDCTranslator class MiniRelay: @@ -35,7 +35,7 @@ def __init__( self.mongodb_client = pymongo.MongoClient(mongodb_url) self.mongodb_collection = self.mongodb_client[mongodb_database][mongodb_collection] self.table_name = quote_relation_name(cratedb_table) - self.cdc = MongoDBCDCTranslatorCrateDB(table_name=self.table_name) + self.cdc = MongoDBCDCTranslator(table_name=self.table_name) def start(self): """ @@ -43,10 +43,9 @@ def start(self): """ with self.cratedb_client.connect() as connection: connection.execute(sa.text(self.cdc.sql_ddl)) - for sql in self.cdc_to_sql(): - if sql: - connection.execute(sa.text(sql)) - connection.execute(sa.text(f"REFRESH TABLE {self.table_name};")) + for operation in self.cdc_to_sql(): + connection.execute(sa.text(operation.statement), parameters=operation.parameters) + connection.execute(sa.text(f"REFRESH TABLE {self.table_name};")) def cdc_to_sql(self): """ diff --git a/pyproject.toml b/pyproject.toml index 3a62573..81b2054 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -107,6 +107,7 @@ dependencies = [ "attrs<25", "backports-strenum<1.3; python_version<'3.11'", "cattrs<25", + "python-dateutil<3", "simplejson<4", "sqlalchemy-cratedb>=0.39.0", "toolz<0.13", diff --git a/src/commons_codec/transform/mongodb.py b/src/commons_codec/transform/mongodb.py index 88298f1..e7cf450 100644 --- a/src/commons_codec/transform/mongodb.py +++ b/src/commons_codec/transform/mongodb.py @@ -1,20 +1,125 @@ -# Copyright (c) 2023-2024, The Kotori Developers and contributors. +# Copyright (c) 2021-2024, Crate.io Inc. # Distributed under the terms of the LGPLv3 license, see LICENSE. - -# ruff: noqa: S608 FIXME: Possible SQL injection vector through string-based query construction - +# ruff: noqa: S608 +import calendar +import datetime as dt import logging import typing as t +from typing import Iterable -from bson.json_util import _json_convert +import bson +import dateutil.parser as dateparser +from attrs import define +from bson.json_util import _json_convert, object_hook +from pymongo.cursor import Cursor from sqlalchemy_cratedb.support import quote_relation_name from commons_codec.model import SQLOperation +Document = t.Mapping[str, t.Any] +DocumentCollection = t.List[Document] + + logger = logging.getLogger(__name__) -class MongoDBCDCTranslatorBase: +def date_converter(value): + if isinstance(value, int): + return value + elif isinstance(value, (str, bytes)): + datetime = dateparser.parse(value) + elif isinstance(value, dt.datetime): + datetime = value + else: + raise ValueError(f"Unable to convert datetime value: {value}") + return calendar.timegm(datetime.utctimetuple()) * 1000 + + +@define +class MongoDBCrateDBConverter: + """ + Convert MongoDB Extended JSON to representation consumable by CrateDB. + + Extracted from cratedb-toolkit, earlier migr8. + """ + + transformation: t.Any = None + + def decode_documents(self, data: t.Iterable[Document]) -> Iterable[Document]: + """ + Decode MongoDB Extended JSON, considering CrateDB specifics. + """ + data = map(self.decode_value, data) + # TODO: This is currently untyped. Types are defined in Zyp, see `zyp.model.base`. + if self.transformation is not None: + data = self.transformation.apply(data) + return data + + def decode_document(self, data: Document) -> Document: + """ + Decode MongoDB Extended JSON, considering CrateDB specifics. + """ + return self.decode_value(data) + + def decode_value(self, value: t.Any) -> t.Any: + """ + Decode MongoDB Extended JSON. + + - https://www.mongodb.com/docs/manual/reference/mongodb-extended-json-v1/ + - https://www.mongodb.com/docs/manual/reference/mongodb-extended-json/ + """ + if isinstance(value, dict): + # Decode item in BSON CANONICAL format. + if len(value) == 1: + return self.decode_canonical(value) + + # Custom adjustments to compensate shape anomalies in source data. + self.apply_special_treatments(value) + + return {k: self.decode_value(v) for (k, v) in value.items()} + elif isinstance(value, list): + return [self.decode_value(v) for v in value] + + return value + + @staticmethod + def decode_canonical(value: t.Dict[str, t.Any]) -> t.Any: + """ + Decode BSON CANONICAL representation. + """ + type_ = list(value.keys())[0] + # Special handling for datetime representation in NUMBERLONG format (emulated depth-first). + is_date_numberlong = type_ == "$date" and "$numberLong" in value["$date"] + if is_date_numberlong: + return int(object_hook(value["$date"])) + else: + value = object_hook(value) + is_bson = type(value).__module__.startswith("bson") + if isinstance(value, bson.Binary) and value.subtype == bson.UUID_SUBTYPE: + value = value.as_uuid() + if isinstance(value, bson.Timestamp): + value = value.as_datetime() + if isinstance(value, dt.datetime): + return date_converter(value) + if is_bson: + return str(value) + return value + + def apply_special_treatments(self, value: t.Any): + """ + Apply special treatments to value that can't be described otherwise up until now. + # Ignore certain items including anomalies that are not resolved, yet. + + TODO: Needs an integration test feeding two records instead of just one. + """ + + if self.transformation is None or self.transformation.treatment is None: + return None + + return self.transformation.treatment.apply(value) + + +class MongoDBTranslatorBase: """ Translate MongoDB CDC events into different representations. @@ -30,7 +135,27 @@ class MongoDBCDCTranslatorBase: - https://www.mongodb.com/developer/languages/python/python-change-streams/ """ - def decode_bson(self, item: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]: + # Define name of the column where MongoDB's OID for a document will be stored. + ID_COLUMN = "oid" + + # Define name of the column where CDC's record data will get materialized into. + DATA_COLUMN = "data" + + def __init__(self, table_name: str, converter: t.Union[MongoDBCrateDBConverter, None] = None): + self.table_name = quote_relation_name(table_name) + self.converter = converter or MongoDBCrateDBConverter() + + @property + def sql_ddl(self): + """ + Define SQL DDL statement for creating table in CrateDB that stores re-materialized CDC events. + """ + return ( + f"CREATE TABLE IF NOT EXISTS {self.table_name} ({self.ID_COLUMN} TEXT, {self.DATA_COLUMN} OBJECT(DYNAMIC));" + ) + + @staticmethod + def decode_bson(item: t.Mapping[str, t.Any]) -> t.Mapping[str, t.Any]: """ Convert MongoDB Extended JSON to vanilla Python dictionary. @@ -62,7 +187,41 @@ def decode_bson(self, item: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]: return _json_convert(item) -class MongoDBCDCTranslatorCrateDB(MongoDBCDCTranslatorBase): +class MongoDBFullLoadTranslator(MongoDBTranslatorBase): + """ + Translate a MongoDB document into a CrateDB document. + """ + + @staticmethod + def get_document_key(record: t.Mapping[str, t.Any]) -> str: + """ + Return value of document key (MongoDB document OID). + + "documentKey": {"_id": ObjectId("669683c2b0750b2c84893f3e")} + """ + return record["_id"] + + def to_sql(self, data: t.Union[Document, t.List[Document]]) -> SQLOperation: + """ + Produce CrateDB SQL INSERT batch operation from multiple MongoDB documents. + """ + if not isinstance(data, Cursor) and not isinstance(data, list): + data = [data] + + # Define SQL INSERT statement. + sql = f"INSERT INTO {self.table_name} ({self.ID_COLUMN}, {self.DATA_COLUMN}) VALUES (:oid, :record);" + + # Converge multiple MongoDB documents into SQL parameters for `executemany` operation. + parameters: t.List[Document] = [] + for document in data: + record = self.converter.decode_document(self.decode_bson(document)) + oid: str = self.get_document_key(record) + parameters.append({"oid": oid, "record": record}) + + return SQLOperation(sql, parameters) + + +class MongoDBCDCTranslator(MongoDBTranslatorBase): """ Translate MongoDB CDC events into CrateDB SQL statements that materialize them again. @@ -94,25 +253,6 @@ class MongoDBCDCTranslatorCrateDB(MongoDBCDCTranslatorBase): CREATE TABLE (oid TEXT, data OBJECT(DYNAMIC)); """ - # Define name of the column where MongoDB's OID for a document will be stored. - ID_COLUMN = "oid" - - # Define name of the column where CDC's record data will get materialized into. - DATA_COLUMN = "data" - - def __init__(self, table_name: str): - super().__init__() - self.table_name = quote_relation_name(table_name) - - @property - def sql_ddl(self): - """ - Define SQL DDL statement for creating table in CrateDB that stores re-materialized CDC events. - """ - return ( - f"CREATE TABLE IF NOT EXISTS {self.table_name} ({self.ID_COLUMN} TEXT, {self.DATA_COLUMN} OBJECT(DYNAMIC));" - ) - def to_sql(self, event: t.Dict[str, t.Any]) -> t.Union[SQLOperation, None]: """ Produce INSERT|UPDATE|DELETE SQL statement from insert|update|replace|delete CDC event record. @@ -125,7 +265,8 @@ def to_sql(self, event: t.Dict[str, t.Any]) -> t.Union[SQLOperation, None]: if operation_type == "insert": oid: str = self.get_document_key(event) - record = self.decode_bson(self.get_full_document(event)) + document = self.get_full_document(event) + record = self.converter.decode_document(self.decode_bson(document)) sql = f"INSERT INTO {self.table_name} " f"({self.ID_COLUMN}, {self.DATA_COLUMN}) " "VALUES (:oid, :record);" parameters = {"oid": oid, "record": record} @@ -133,7 +274,8 @@ def to_sql(self, event: t.Dict[str, t.Any]) -> t.Union[SQLOperation, None]: # you need to use `watch(full_document="updateLookup")`. # https://www.mongodb.com/docs/manual/changeStreams/#lookup-full-document-for-update-operations elif operation_type in ["update", "replace"]: - record = self.decode_bson(self.get_full_document(event)) + document = self.get_full_document(event) + record = self.converter.decode_document(self.decode_bson(document)) where_clause = self.where_clause(event) sql = f"UPDATE {self.table_name} " f"SET {self.DATA_COLUMN} = :record " f"WHERE {where_clause};" parameters = {"record": record} diff --git a/src/zyp/model/base.py b/src/zyp/model/base.py index 91a766d..33cab8c 100644 --- a/src/zyp/model/base.py +++ b/src/zyp/model/base.py @@ -10,7 +10,7 @@ from zyp.util.data import no_privates_no_nulls_no_empties Record = t.Dict[str, t.Any] -Collection = t.List[Record] +Collection = t.Iterable[Record] DictOrList = t.Union[Record, Collection] diff --git a/src/zyp/model/collection.py b/src/zyp/model/collection.py index f22ea1a..f757df2 100644 --- a/src/zyp/model/collection.py +++ b/src/zyp/model/collection.py @@ -26,7 +26,6 @@ class CollectionTransformation(Dumpable): def apply(self, data: DictOrList) -> Collection: collection = t.cast(Collection, data) - collection_out = collection if self.pre: collection = t.cast(Collection, self.pre.apply(collection)) if self.bucket: @@ -34,8 +33,9 @@ def apply(self, data: DictOrList) -> Collection: for item in collection: item = self.bucket.apply(item) collection_out.append(item) + collection = collection_out if self.post: - collection_out = t.cast(Collection, self.post.apply(collection_out)) + collection = t.cast(Collection, self.post.apply(collection)) if self.treatment: - collection_out = t.cast(Collection, self.treatment.apply(collection_out)) - return collection_out + collection = t.cast(Collection, self.treatment.apply(collection)) + return collection diff --git a/tests/transform/conftest.py b/tests/transform/conftest.py index 9a9f3b1..2f0b3c4 100644 --- a/tests/transform/conftest.py +++ b/tests/transform/conftest.py @@ -3,6 +3,7 @@ RESET_TABLES = [ "from.dynamodb", "from.generic", + "from.mongodb", ] diff --git a/tests/transform/mongodb/__init__.py b/tests/transform/mongodb/__init__.py new file mode 100644 index 0000000..663f2d6 --- /dev/null +++ b/tests/transform/mongodb/__init__.py @@ -0,0 +1,3 @@ +import pytest + +pytest.importorskip("bson", reason="Skipping MongoDB/BSON tests because 'bson' package is not installed") diff --git a/tests/transform/mongodb/data.py b/tests/transform/mongodb/data.py new file mode 100644 index 0000000..ac29876 --- /dev/null +++ b/tests/transform/mongodb/data.py @@ -0,0 +1,217 @@ +""" +A few samples of MongoDB BSON / JSON structures. + +Derived from: +https://github.com/mongodb/bson-ruby/tree/v5.0.1/spec/spec_tests/data/corpus +""" +# ruff: noqa: ERA001 + +import datetime as dt +from unittest import mock + +import bson + +RECORD_IN_ALL_TYPES = { + "_id": { + "$oid": "56027fcae4b09385a85f9344", + }, + "python": { + "boolean": True, + "datetime": dt.datetime(2024, 7, 16, 14, 29, 22, 907000), + "dict_basic": {"foo": "bar"}, + "dict_dollarkey": {"$a": "foo"}, + # "dict_dottedkey": {"a.b": "foo"}, # TODO: InvalidColumnNameException["." contains a dot] + "dict_empty": {}, + "dict_emptykey": {"": "foo"}, + # "dict_specialkey": {".": "foo", "$": "foo"}, # TODO: InvalidColumnNameException["." contains a dot] + "float": 42.42, + "int": 42, + "list_boolean": [True, False], + "list_dict": [{"foo": "bar"}], + "list_empty": [], + "list_float": [1.1, 2.2, 3.3], + "list_int": [1, 2, 3], + "list_string": ["foo", "bar"], + "null": None, + "set_int": {1, 2, 3}, + "set_str": {"Räuber", "Hotzenplotz"}, + "str": "Hotzenplotz", + "tuple_int": (1, 2, 3), + "tuple_str": ("Räuber", "Hotzenplotz"), + }, + "bson": { + "code": bson.code.Code("console.write(foo)", scope={"foo": "bar"}), + "binary_uuid": bson.Binary(data=b"sccsccsccsccsccs", subtype=bson.binary.UUID_SUBTYPE), + "datetimems": bson.DatetimeMS(1721140162987), + "decimal128": bson.Decimal128("42.42"), + "dbref": bson.DBRef(collection="foo", id="bar", database="baz"), + "int64": bson.Int64(42.42), + "maxkey": bson.MaxKey(), + "minkey": bson.MinKey(), + "objectid": bson.ObjectId("669683c2b0750b2c84893f3e"), + "regex": bson.regex.Regex(".*"), + "timestamp": bson.Timestamp(1721140162, 2), + }, + "canonical": { + "code_ascii": {"$code": "abab"}, + "code_bytes": {"$code": "ab\u0000ab\u0000"}, + "code_scope": {"$code": "abab", "$scope": {"x": {"$numberInt": 42}}}, + "date_iso8601": {"$date": "2015-09-23T10:32:42.33Z"}, + "date_numberlong": {"$date": {"$numberLong": "1356351330000"}}, + "dbref": { + "$id": {"$oid": "56027fcae4b09385a85f9344"}, + "$ref": "foo", + "$db": "bar", + }, + "decimal_infinity": {"$numberDecimal": "Infinity"}, + "decimal_largest": {"$numberDecimal": "1234567890123456789012345678901234"}, + "decimal_nan": {"$numberDecimal": "NaN"}, + "decimal_regular": {"$numberDecimal": "0.000001234567890123456789012345678901234"}, + # "double_infinity": {"$numberDouble": "Infinity"}, # TODO: SQLParseException[Failed to parse source + "double_regular": {"$numberDouble": "-1.2345678921232E+18"}, + "int32": {"$numberInt": "-2147483648"}, + "int64": {"$numberLong": "-9223372036854775808"}, + "list_date": [ + {"$date": "2015-09-24T10:32:42.33Z"}, + {"$date": {"$numberLong": "2147483647000"}}, + {"$date": {"$numberLong": "-2147483648000"}}, + ], + "list_dict": [ + {"id": "bar", "value": {"$date": "2015-09-24T10:32:42.33Z"}}, + ], + "list_int": [ + {"$numberInt": "-2147483648"}, + ], + "list_oid": [ + {"$oid": "56027fcae4b09385a85f9344"}, + ], + "list_uuid": [ + # TODO: TypeError: Object of type bytes is not JSON serializable + # {"$binary": {"base64": "c//SZESzTGmQ6OfR38A11A==", "subType": "00"}}, + {"$binary": {"base64": "c//SZESzTGmQ6OfR38A11A==", "subType": "01"}}, + {"$binary": {"base64": "c//SZESzTGmQ6OfR38A11A==", "subType": "02"}}, + {"$binary": {"base64": "c//SZESzTGmQ6OfR38A11A==", "subType": "03"}}, + {"$binary": {"base64": "c//AYDC420csII3929483B==", "subType": "04"}}, + {"$binary": {"base64": "c//AYDC420csII3929483B==", "subType": "05"}}, + {"$binary": {"base64": "c//AYDC420csII3929483B==", "subType": "06"}}, + {"$binary": {"base64": "c//AYDC420csII3929483B==", "subType": "80"}}, + ], + "maxkey": {"$maxKey": 1}, + "minkey": {"$minKey": 1}, + "oid": {"$oid": "56027fcae4b09385a85f9344"}, + "regex": {"$regularExpression": {"pattern": ".*", "options": ""}}, + "symbol": {"$symbol": "foo"}, + "timestamp": {"$timestamp": {"t": 123456789, "i": 42}}, + # TODO: Implement other UUID subtypes. + # https://github.com/mongodb/bson-ruby/blob/v5.0.1/spec/spec_tests/data/corpus/binary.json + "undefined": {"$undefined": True}, + "uuid": {"$binary": {"base64": "c//SZESzTGmQ6OfR38A11A==", "subType": "04"}}, + }, +} + +RECORD_OUT_ALL_TYPES = { + "_id": "56027fcae4b09385a85f9344", + "python": { + "boolean": True, + "datetime": 1721140162000, + "dict_basic": {"foo": "bar"}, + "dict_dollarkey": {"$a": "foo"}, + # "dict_dottedkey": {'a.b': 'foo'}, # TODO: InvalidColumnNameException["." contains a dot] + "dict_empty": {}, + "dict_emptykey": {"": "foo"}, + # "dict_specialkey": {'$': 'foo', '.': 'foo'}, # TODO: InvalidColumnNameException["." contains a dot] + "float": 42.42, + "int": 42, + "list_boolean": [True, False], + "list_dict": [{"foo": "bar"}], + "list_empty": [], + "list_float": [1.1, 2.2, 3.3], + "list_int": [1, 2, 3], + "list_string": ["foo", "bar"], + "null": None, + "set_int": [1, 2, 3], + "set_str": mock.ANY, + "str": "Hotzenplotz", + "tuple_int": [1, 2, 3], + "tuple_str": ["Räuber", "Hotzenplotz"], + }, + "bson": { + "code": { + "$code": "console.write(foo)", + "$scope": { + "foo": "bar", + }, + }, + "datetimems": 1721140162000, + "binary_uuid": "73636373-6363-7363-6373-636373636373", + "decimal128": "42.42", + "dbref": { + "$ref": "foo", + "$id": "bar", + "$db": "baz", + }, + "int64": 42, + "maxkey": "MaxKey()", + "minkey": "MinKey()", + "objectid": "669683c2b0750b2c84893f3e", + "regex": "Regex('.*', 0)", + "timestamp": 1721140162000, + }, + "canonical": { + "code_ascii": "abab", + "code_bytes": "ab\x00ab\x00", + "code_scope": { + "$code": "abab", + "$scope": { + "x": {"$numberInt": 42}, + }, + }, + "date_iso8601": 1443004362000, + "date_numberlong": 1356351330000, + "dbref": { + "$id": "56027fcae4b09385a85f9344", + "$ref": "foo", + "$db": "bar", + }, + "decimal_infinity": "Infinity", + "decimal_largest": "1234567890123456789012345678901234", + "decimal_nan": "NaN", + "decimal_regular": "0.000001234567890123456789012345678901234", + "double_regular": -1.2345678921232e18, + "int32": -2147483648, + "int64": "-9223372036854775808", # TODO: Representation as string is just fine? + "list_date": [ + 1443090762000, + 2147483647000, + -2147483648000, + ], + "list_dict": [ + {"id": "bar", "value": 1443090762000}, + ], + "list_int": [ + -2147483648, + ], + "list_oid": [ + "56027fcae4b09385a85f9344", + ], + "list_uuid": [ + # TODO: TypeError: Object of type bytes is not JSON serializable + # b's\xff\xd2dD\xb3Li\x90\xe8\xe7\xd1\xdf\xc05\xd4', + "b's\\xff\\xd2dD\\xb3Li\\x90\\xe8\\xe7\\xd1\\xdf\\xc05\\xd4'", + "b's\\xff\\xd2dD\\xb3Li\\x90\\xe8\\xe7\\xd1\\xdf\\xc05\\xd4'", + "b's\\xff\\xd2dD\\xb3Li\\x90\\xe8\\xe7\\xd1\\xdf\\xc05\\xd4'", + "73ffc060-30b8-db47-2c20-8dfddbde3cdc", + "b's\\xff\\xc0`0\\xb8\\xdbG, \\x8d\\xfd\\xdb\\xde<\\xdc'", + "b's\\xff\\xc0`0\\xb8\\xdbG, \\x8d\\xfd\\xdb\\xde<\\xdc'", + "b's\\xff\\xc0`0\\xb8\\xdbG, \\x8d\\xfd\\xdb\\xde<\\xdc'", + ], + "maxkey": "MaxKey()", + "minkey": "MinKey()", + "oid": "56027fcae4b09385a85f9344", + "regex": "Regex('.*', 0)", + "symbol": "foo", + "timestamp": 123456789000, + "undefined": None, + "uuid": "73ffd264-44b3-4c69-90e8-e7d1dfc035d4", + }, +} diff --git a/tests/transform/test_mongodb.py b/tests/transform/mongodb/test_mongodb_cdc.py similarity index 80% rename from tests/transform/test_mongodb.py rename to tests/transform/mongodb/test_mongodb_cdc.py index 404e453..9980b8b 100644 --- a/tests/transform/test_mongodb.py +++ b/tests/transform/mongodb/test_mongodb_cdc.py @@ -5,13 +5,10 @@ import datetime -from commons_codec.model import SQLOperation - -pytest.importorskip("pymongo") - from bson import ObjectId, Timestamp -from commons_codec.transform.mongodb import MongoDBCDCTranslatorCrateDB +from commons_codec.model import SQLOperation +from commons_codec.transform.mongodb import MongoDBCDCTranslator MSG_OPERATION_UNKNOWN = { "operationType": "foobar", @@ -97,74 +94,74 @@ def test_decode_cdc_sql_ddl(): assert ( - MongoDBCDCTranslatorCrateDB(table_name="foo").sql_ddl + MongoDBCDCTranslator(table_name="foo").sql_ddl == "CREATE TABLE IF NOT EXISTS foo (oid TEXT, data OBJECT(DYNAMIC));" ) def test_decode_cdc_unknown_event(): with pytest.raises(ValueError) as ex: - MongoDBCDCTranslatorCrateDB(table_name="foo").to_sql(MSG_OPERATION_UNKNOWN) + MongoDBCDCTranslator(table_name="foo").to_sql(MSG_OPERATION_UNKNOWN) assert ex.match("Unknown CDC operation type: foobar") def test_decode_cdc_optype_missing(): with pytest.raises(ValueError) as ex: - MongoDBCDCTranslatorCrateDB(table_name="foo").to_sql(MSG_OPERATION_MISSING) + MongoDBCDCTranslator(table_name="foo").to_sql(MSG_OPERATION_MISSING) assert ex.match("Operation Type missing or empty: {}") def test_decode_cdc_optype_empty(): with pytest.raises(ValueError) as ex: - MongoDBCDCTranslatorCrateDB(table_name="foo").to_sql(MSG_OPERATION_EMPTY) + MongoDBCDCTranslator(table_name="foo").to_sql(MSG_OPERATION_EMPTY) assert ex.match("Operation Type missing or empty: {'operationType': ''}") def test_decode_cdc_insert(): - assert MongoDBCDCTranslatorCrateDB(table_name="foo").to_sql(MSG_INSERT) == SQLOperation( + assert MongoDBCDCTranslator(table_name="foo").to_sql(MSG_INSERT) == SQLOperation( statement="INSERT INTO foo (oid, data) VALUES (:oid, :record);", parameters={ "oid": "669683c2b0750b2c84893f3e", "record": { - "_id": {"$oid": "669683c2b0750b2c84893f3e"}, + "_id": "669683c2b0750b2c84893f3e", "id": "5F9E", "data": {"temperature": 42.42, "humidity": 84.84}, - "meta": {"timestamp": {"$date": "2024-07-11T23:17:42Z"}, "device": "foo"}, + "meta": {"timestamp": 1720739862000, "device": "foo"}, }, }, ) def test_decode_cdc_update(): - assert MongoDBCDCTranslatorCrateDB(table_name="foo").to_sql(MSG_UPDATE) == SQLOperation( + assert MongoDBCDCTranslator(table_name="foo").to_sql(MSG_UPDATE) == SQLOperation( statement="UPDATE foo SET data = :record WHERE oid = '669683c2b0750b2c84893f3e';", parameters={ "record": { - "_id": {"$oid": "669683c2b0750b2c84893f3e"}, + "_id": "669683c2b0750b2c84893f3e", "id": "5F9E", "data": {"temperature": 42.5}, - "meta": {"timestamp": {"$date": "2024-07-11T23:17:42Z"}, "device": "foo"}, + "meta": {"timestamp": 1720739862000, "device": "foo"}, } }, ) def test_decode_cdc_replace(): - assert MongoDBCDCTranslatorCrateDB(table_name="foo").to_sql(MSG_REPLACE) == SQLOperation( + assert MongoDBCDCTranslator(table_name="foo").to_sql(MSG_REPLACE) == SQLOperation( statement="UPDATE foo SET data = :record WHERE oid = '669683c2b0750b2c84893f3e';", - parameters={"record": {"_id": {"$oid": "669683c2b0750b2c84893f3e"}, "tags": ["deleted"]}}, + parameters={"record": {"_id": "669683c2b0750b2c84893f3e", "tags": ["deleted"]}}, ) def test_decode_cdc_delete(): - assert MongoDBCDCTranslatorCrateDB(table_name="foo").to_sql(MSG_DELETE) == SQLOperation( + assert MongoDBCDCTranslator(table_name="foo").to_sql(MSG_DELETE) == SQLOperation( statement="DELETE FROM foo WHERE oid = '669693c5002ef91ea9c7a562';", parameters=None ) def test_decode_cdc_drop(): - assert MongoDBCDCTranslatorCrateDB(table_name="foo").to_sql(MSG_DROP) is None + assert MongoDBCDCTranslator(table_name="foo").to_sql(MSG_DROP) is None def test_decode_cdc_invalidate(): - assert MongoDBCDCTranslatorCrateDB(table_name="foo").to_sql(MSG_INVALIDATE) is None + assert MongoDBCDCTranslator(table_name="foo").to_sql(MSG_INVALIDATE) is None diff --git a/tests/transform/mongodb/test_mongodb_convert.py b/tests/transform/mongodb/test_mongodb_convert.py new file mode 100644 index 0000000..fa3e58b --- /dev/null +++ b/tests/transform/mongodb/test_mongodb_convert.py @@ -0,0 +1,185 @@ +# ruff: noqa: E402 +import pytest + +pytestmark = pytest.mark.mongodb + +from commons_codec.transform.mongodb import MongoDBCrateDBConverter, date_converter +from zyp.model.bucket import BucketTransformation, ValueConverter +from zyp.model.collection import CollectionTransformation +from zyp.model.treatment import Treatment + + +def test_date_converter_int(): + """ + Datetime values encoded as integer values will be returned unmodified. + """ + assert date_converter(1443004362000) == 1443004362000 + + +def test_date_converter_iso8601(): + """ + Datetime values encoded as ISO8601 values will be parsed. + """ + assert date_converter("2015-09-23T10:32:42.33Z") == 1443004362000 + assert date_converter(b"2015-09-23T10:32:42.33Z") == 1443004362000 + + +def test_date_converter_invalid(): + """ + Incorrect datetime values will not be parsed. + """ + with pytest.raises(ValueError) as ex: + date_converter(None) + assert ex.match("Unable to convert datetime value: None") + + +def test_convert_basic(): + """ + Just a basic conversion, without transformation. + """ + data_in = { + "_id": { + "$oid": "56027fcae4b09385a85f9344", + }, + "value": { + "id": 42, + }, + } + data_out = { + "_id": "56027fcae4b09385a85f9344", + "value": { + "id": 42, + }, + } + + converter = MongoDBCrateDBConverter() + assert list(converter.decode_documents([data_in])) == [data_out] + + +def test_convert_with_treatment_ignore_complex_lists(): + """ + The `ignore_complex_lists` treatment ignores lists of dictionaries, often having deviating substructures. + """ + data_in = { + "_id": { + "$oid": "56027fcae4b09385a85f9344", + }, + "value": { + "id": 42, + "date": {"$date": "2015-09-23T10:32:42.33Z"}, + "some_complex_list": [ + {"id": "foo", "value": "something"}, + {"id": "bar", "value": {"$date": "2015-09-24T10:32:42.33Z"}}, + ], + }, + } + data_out = { + "_id": "56027fcae4b09385a85f9344", + "value": { + "id": 42, + "date": 1443004362000, + }, + } + + treatment = Treatment(ignore_complex_lists=True) + transformation = CollectionTransformation(treatment=treatment) + converter = MongoDBCrateDBConverter(transformation=transformation) + assert converter.decode_document(data_in) == data_out + + +def test_convert_with_treatment_normalize_complex_lists(): + """ + The `normalize_complex_lists` treatment converts inner values within lists of dictionaries. + """ + data_in = { + "_id": { + "$oid": "56027fcae4b09385a85f9344", + }, + "value": { + "id": 42, + "date": {"$date": "2015-09-23T10:32:42.33Z"}, + "some_complex_list": [ + {"id": "foo", "value": "something"}, + {"id": "bar", "value": {"$date": "2015-09-24T10:32:42.33Z"}}, + ], + }, + } + data_out = { + "_id": "56027fcae4b09385a85f9344", + "value": { + "id": 42, + "date": 1443004362000, + "some_complex_list": [ + {"id": "foo", "value": "something"}, + # FIXME: `normalize_complex_lists` does not see it's a timestamp. + {"id": "bar", "value": "{'$date': '2015-09-24T10:32:42.33Z'}"}, + ], + }, + } + + treatment = Treatment(normalize_complex_lists=True) + transformation = CollectionTransformation(treatment=treatment) + converter = MongoDBCrateDBConverter(transformation=transformation) + assert converter.decode_document(data_in) == data_out + + +def test_convert_with_treatment_all_options(): + """ + Validate all treatment options. + """ + data_in = { + "_id": { + "$oid": "56027fcae4b09385a85f9344", + }, + "ignore_toplevel": 42, + "value": { + "id": 42, + "date": {"$date": "2015-09-23T10:32:42.33Z"}, + "ignore_nested": 42, + }, + "to_list": 42, + "to_string": 42, + "to_dict_scalar": 42, + "to_dict_list": [{"user": 42}], + } + data_out = { + "_id": "56027fcae4b09385a85f9344", + "value": { + "date": 1443004362000, + "id": 42, + }, + "to_list": [42], + "to_string": "42", + "to_dict_scalar": {"id": 42}, + "to_dict_list": [{"user": {"id": 42}}], + } + + treatment = Treatment( + ignore_complex_lists=False, + ignore_field=["ignore_toplevel", "ignore_nested"], + convert_list=["to_list"], + convert_string=["to_string"], + convert_dict=[ + {"name": "to_dict_scalar", "wrapper_name": "id"}, + {"name": "user", "wrapper_name": "id"}, + ], + ) + transformation = CollectionTransformation(treatment=treatment) + converter = MongoDBCrateDBConverter(transformation=transformation) + assert converter.decode_document(data_in) == data_out + + +def test_convert_transform_timestamp(): + """ + Validate a Zyp transformation that converts datetime values in text format. + """ + data_in = [{"device": "Hotzenplotz", "temperature": 42.42, "timestamp": "06/14/2022 12:42:24.4242 UTC"}] + data_out = [{"device": "Hotzenplotz", "temperature": 42.42, "timestamp": 1655210544.4242}] + + bucket_transformation = BucketTransformation( + values=ValueConverter().add(pointer="/timestamp", transformer="to_unixtime"), + ) + transformation = CollectionTransformation(bucket=bucket_transformation) + converter = MongoDBCrateDBConverter(transformation=transformation) + data = converter.decode_documents(data_in) + assert data == data_out diff --git a/tests/transform/mongodb/test_mongodb_full.py b/tests/transform/mongodb/test_mongodb_full.py new file mode 100644 index 0000000..5151a3e --- /dev/null +++ b/tests/transform/mongodb/test_mongodb_full.py @@ -0,0 +1,47 @@ +# ruff: noqa: E402 +import pytest + +pytestmark = pytest.mark.mongodb + +from commons_codec.model import SQLOperation +from commons_codec.transform.mongodb import MongoDBFullLoadTranslator +from tests.transform.mongodb.data import RECORD_IN_ALL_TYPES, RECORD_OUT_ALL_TYPES + + +def test_sql_ddl(): + translator = MongoDBFullLoadTranslator(table_name="foo") + assert translator.sql_ddl == "CREATE TABLE IF NOT EXISTS foo (oid TEXT, data OBJECT(DYNAMIC));" + + +def test_to_sql_operation(): + """ + Verify outcome of `MongoDBFullLoadTranslator.to_sql` operation. + """ + translator = MongoDBFullLoadTranslator(table_name="foo") + assert translator.to_sql([RECORD_IN_ALL_TYPES]) == SQLOperation( + statement="INSERT INTO foo (oid, data) VALUES (:oid, :record);", + parameters=[{"oid": "56027fcae4b09385a85f9344", "record": RECORD_OUT_ALL_TYPES}], + ) + + +@pytest.mark.integration +def test_to_sql_cratedb(caplog, cratedb): + """ + Verify writing converted MongoDB document to CrateDB. + """ + + # Compute CrateDB operation (SQL+parameters) from MongoDB document. + translator = MongoDBFullLoadTranslator(table_name="from.mongodb") + operation = translator.to_sql(RECORD_IN_ALL_TYPES) + + # Insert into CrateDB. + cratedb.database.run_sql(translator.sql_ddl) + cratedb.database.run_sql(operation.statement, operation.parameters) + + # Verify data in target database. + assert cratedb.database.table_exists("from.mongodb") is True + assert cratedb.database.refresh_table("from.mongodb") is True + assert cratedb.database.count_records("from.mongodb") == 1 + + results = cratedb.database.run_sql('SELECT * FROM "from".mongodb;', records=True) # noqa: S608 + assert results[0]["data"] == RECORD_OUT_ALL_TYPES diff --git a/tests/zyp/test_collection.py b/tests/zyp/test_collection.py index b30015d..c947366 100644 --- a/tests/zyp/test_collection.py +++ b/tests/zyp/test_collection.py @@ -8,8 +8,10 @@ from zyp.model.moksha import MokshaTransformation -class ComplexRecipe: +class CompositeRecipe: """ + Composite recipe composed of demo input and output data and a corresponding transformation. + It executes the following steps, in order of appearance: - Unwrap `records` attribute from container dictionary into actual collection. @@ -53,18 +55,58 @@ class ComplexRecipe: ) -def test_collection_transformation_success(): +class ListRecipes: + """ + Demo transformation recipe for handling anomalies in lists. + """ + + # Define a messy input data collection. + data_in = { + "message-source": "community", + "message-type": "mixed-pickles", + "records": [ + { + "data": { + "to-list-flat": [{"foo": 1}, [{"foo": 2}, {"foo": 3}]], + }, + }, + ], + } + + # Define expectation of the cleansed data collection. + data_out = [ + { + "data": { + "to-list-flat": [{"foo": 1}, {"foo": 2}, {"foo": 3}], + }, + }, + ] + + # Define transformation. + transformation = CollectionTransformation( + pre=MokshaTransformation().jq(".records").jq('.[] |= (.data."to-list-flat" |= flatten)'), + ) + + +def test_collection_transformation_composite_success(): """ Verify transformation recipe for re-shaping a collection of records. """ - assert ComplexRecipe.transformation.apply(ComplexRecipe.data_in) == ComplexRecipe.data_out + assert CompositeRecipe.transformation.apply(CompositeRecipe.data_in) == CompositeRecipe.data_out + + +def test_collection_transformation_lists_success(): + """ + Verify transformation recipe for re-shaping anomalies in lists. + """ + assert ListRecipes.transformation.apply(ListRecipes.data_in) == ListRecipes.data_out def test_collection_transformation_serialize(): """ Verify collection transformation description can be serialized to a data structure and back. """ - transformation = ComplexRecipe.transformation + transformation = CompositeRecipe.transformation transformation_dict = { "meta": {"version": 1, "type": "zyp-collection"}, "pre": { @@ -99,8 +141,8 @@ def test_collection_transformation_regular_load_and_apply(): """ payload = Path("tests/zyp/transformation-collection.yaml").read_text() transformation = CollectionTransformation.from_yaml(payload) - result = transformation.apply(deepcopy(ComplexRecipe.data_in)) - assert result == ComplexRecipe.data_out + result = transformation.apply(deepcopy(CompositeRecipe.data_in)) + assert result == CompositeRecipe.data_out def test_collection_transformation_treatment_load_and_apply(): @@ -109,7 +151,7 @@ def test_collection_transformation_treatment_load_and_apply(): """ payload = Path("tests/zyp/transformation-collection-treatment.yaml").read_text() transformation = CollectionTransformation.from_yaml(payload) - result = transformation.apply(deepcopy(ComplexRecipe.data_in)) + result = transformation.apply(deepcopy(CompositeRecipe.data_in)) assert result == { "message-source": "system-3000", "message-type": "eai-warehouse", diff --git a/tests/zyp/test_moksha.py b/tests/zyp/test_moksha.py index 566ca92..5393ffd 100644 --- a/tests/zyp/test_moksha.py +++ b/tests/zyp/test_moksha.py @@ -13,6 +13,16 @@ def test_moksha_jq_compute_nested(): assert transformation.apply([{"data": {"abc": 123}}]) == [{"data": {"abc": 246}}] +def test_moksha_jq_flatten_list(): + """ + Verify flattening nested list, using moksha/jq. + """ + data_in = [{"data": {"abc": [{"foo": 1}, [{"foo": 2}, {"foo": 3}]]}}] + data_out = [{"data": {"abc": [{"foo": 1}, {"foo": 2}, {"foo": 3}]}}] + transformation = MokshaTransformation().jq(".[] |= (.data.abc |= flatten)") + assert transformation.apply(data_in) == data_out + + def test_transon_duplicate_records(): """ Verify record duplication works well.