diff --git a/CHANGES.md b/CHANGES.md index f88f731..b66a2c4 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,6 +1,7 @@ # Changelog ## Unreleased +- DMS/DynamoDB/MongoDB: Use SQL with parameters instead of inlining values ## 2024/08/23 v0.0.11 - DynamoDB: Fix serializing OBJECT and ARRAY representations to CrateDB diff --git a/src/commons_codec/model.py b/src/commons_codec/model.py index b69f808..9944eff 100644 --- a/src/commons_codec/model.py +++ b/src/commons_codec/model.py @@ -3,13 +3,14 @@ import typing as t from enum import auto +from attr import Factory +from attrs import define + if sys.version_info >= (3, 11): from enum import StrEnum else: from backports.strenum import StrEnum # pragma: no cover -from attrs import define - @define(frozen=True) class TableAddress: @@ -81,3 +82,33 @@ def from_json(cls, payload: str) -> t.Union["ColumnTypeMapStore", None]: if not payload: return None return cls.from_dict(json.loads(payload)) + + +@define +class SQLOperation: + """ + Bundle data about an SQL operation, including statement and parameters. + + Parameters can be a single dictionary or a list of dictionaries. + """ + + statement: str + parameters: t.Optional[t.Union[t.Mapping[str, t.Any], t.List[t.Mapping[str, t.Any]]]] = None + + +@define +class SQLParameterizedClause: + """ + Manage details about a SQL parameterized clause, including column names, parameter names, and values. + """ + + columns: t.List[str] = Factory(list) + names: t.List[str] = Factory(list) + values: t.List[str] = Factory(list) + + @property + def set_clause(self): + """ + Render a SET clause of an SQL statement. + """ + return ", ".join([f"{key}={value}" for key, value in zip(self.columns, self.names)]) diff --git a/src/commons_codec/transform/aws_dms.py b/src/commons_codec/transform/aws_dms.py index 97d55a9..8eb1de8 100644 --- a/src/commons_codec/transform/aws_dms.py +++ b/src/commons_codec/transform/aws_dms.py @@ -9,7 +9,14 @@ import simplejson as json from commons_codec.exception import MessageFormatError, UnknownOperationError -from commons_codec.model import ColumnType, ColumnTypeMapStore, PrimaryKeyStore, TableAddress +from commons_codec.model import ( + ColumnType, + ColumnTypeMapStore, + PrimaryKeyStore, + SQLOperation, + SQLParameterizedClause, + TableAddress, +) logger = logging.getLogger(__name__) @@ -66,35 +73,39 @@ def __init__( self.primary_keys: t.List[str] = self.container.primary_keys[self.address] self.column_types: t.Dict[str, ColumnType] = self.container.column_types[self.address] - def to_sql(self) -> str: + def to_sql(self) -> SQLOperation: if self.operation == "create-table": pks = self.control.get("table-def", {}).get("primary-key") if pks: self.primary_keys += pks # TODO: What about dropping tables first? - return f"CREATE TABLE IF NOT EXISTS {self.address.fqn} ({self.DATA_COLUMN} OBJECT(DYNAMIC));" + return SQLOperation(f"CREATE TABLE IF NOT EXISTS {self.address.fqn} ({self.DATA_COLUMN} OBJECT(DYNAMIC));") elif self.operation in ["load", "insert"]: - values_clause = self.record_to_values() - sql = f"INSERT INTO {self.address.fqn} ({self.DATA_COLUMN}) VALUES ('{values_clause}');" + record = self.record_to_values() + sql = f"INSERT INTO {self.address.fqn} ({self.DATA_COLUMN}) VALUES (:record);" + parameters = {"record": record} elif self.operation == "update": - values_clause = self.record_to_update() + record = self.record_to_values() + clause = self.update_clause() where_clause = self.keys_to_where() - sql = f"UPDATE {self.address.fqn} SET {values_clause} WHERE {where_clause};" + sql = f"UPDATE {self.address.fqn} SET {clause.set_clause} WHERE {where_clause};" + parameters = {"record": record} elif self.operation == "delete": where_clause = self.keys_to_where() sql = f"DELETE FROM {self.address.fqn} WHERE {where_clause};" + parameters = None else: message = f"Unknown CDC event operation: {self.operation}" logger.warning(message) raise UnknownOperationError(message, operation=self.operation, record=self.record) - return sql + return SQLOperation(sql, parameters) - def record_to_update(self) -> str: + def update_clause(self) -> SQLParameterizedClause: """ Serializes an image to a comma-separated list of column/values pairs that can be used in the `SET` clause of an `UPDATE` statement. @@ -106,17 +117,21 @@ def record_to_update(self) -> str: OUT data['age'] = '33', data['attributes'] = '{"foo": "bar"}', data['name'] = 'John' """ - constraints: t.List[str] = [] - for column_name, column_value in self.record["data"].items(): + clause = SQLParameterizedClause() + for column, value in self.record["data"].items(): # Skip primary key columns, they cannot be updated - if column_name in self.primary_keys: + if column in self.primary_keys: continue - constraint = f"{self.DATA_COLUMN}['{column_name}'] = '{column_value}'" - constraints.append(constraint) - return ", ".join(constraints) + param_name = f":{column}" + + clause.columns.append(f"{self.DATA_COLUMN}['{column}']") + clause.names.append(param_name) + clause.values.append(value) # noqa: PD011 + + return clause - def record_to_values(self) -> str: + def record_to_values(self) -> t.Dict[str, t.Any]: """ Apply type translations to record, and serialize to JSON. @@ -133,7 +148,7 @@ def record_to_values(self) -> str: if column_type is ColumnType.MAP and isinstance(value, str): value = json.loads(value) self.data[column_name] = value - return json.dumps(self.data) + return self.data def keys_to_where(self) -> str: """ @@ -169,7 +184,7 @@ def __init__( self.primary_keys = primary_keys or PrimaryKeyStore() self.column_types = column_types or ColumnTypeMapStore() - def to_sql(self, record: t.Dict[str, t.Any]) -> str: + def to_sql(self, record: t.Dict[str, t.Any]) -> SQLOperation: """ Produce INSERT|UPDATE|DELETE SQL statement from load|insert|update|delete CDC event record. """ diff --git a/src/commons_codec/transform/dynamodb.py b/src/commons_codec/transform/dynamodb.py index 93be70e..a20e965 100644 --- a/src/commons_codec/transform/dynamodb.py +++ b/src/commons_codec/transform/dynamodb.py @@ -6,9 +6,9 @@ import logging import typing as t -import simplejson as json import toolz +from commons_codec.model import SQLOperation, SQLParameterizedClause from commons_codec.vendor.boto3.dynamodb.types import DYNAMODB_CONTEXT, TypeDeserializer logger = logging.getLogger(__name__) @@ -36,17 +36,40 @@ def _deserialize_bs(self, value): return list(super()._deserialize_bs(value)) -class DynamoCDCTranslatorBase: +class DynamoTranslatorBase: """ Translate DynamoDB CDC events into different representations. """ - def __init__(self): + # Define name of the column where CDC's record data will get materialized into. + DATA_COLUMN = "data" + + def __init__(self, table_name: str): + super().__init__() + self.table_name = self.quote_table_name(table_name) self.deserializer = CrateDBTypeDeserializer() - def deserialize_item(self, item: t.Dict[str, t.Dict[str, str]]) -> t.Dict[str, str]: + @property + def sql_ddl(self): + """` + Define SQL DDL statement for creating table in CrateDB that stores re-materialized CDC events. """ - Deserialize DynamoDB type-enriched nested JSON snippet into vanilla Python. + return f"CREATE TABLE IF NOT EXISTS {self.table_name} ({self.DATA_COLUMN} OBJECT(DYNAMIC));" + + @staticmethod + def quote_table_name(name: str): + """ + Poor man's table quoting. + + TODO: Better use or vendorize canonical table quoting function from CrateDB Toolkit, when applicable. + """ + if '"' not in name: + name = f'"{name}"' + return name + + def decode_record(self, item: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]: + """ + Deserialize DynamoDB JSON record into vanilla Python. Example: { @@ -74,7 +97,17 @@ def deserialize_item(self, item: t.Dict[str, t.Dict[str, str]]) -> t.Dict[str, s return toolz.valmap(self.deserializer.deserialize, item) -class DynamoCDCTranslatorCrateDB(DynamoCDCTranslatorBase): +class DynamoDBFullLoadTranslator(DynamoTranslatorBase): + def to_sql(self, record: t.Dict[str, t.Any]) -> SQLOperation: + """ + Produce INSERT|UPDATE|DELETE SQL statement from INSERT|MODIFY|REMOVE CDC event record. + """ + record = self.decode_record(record) + sql = f"INSERT INTO {self.table_name} ({self.DATA_COLUMN}) VALUES (:record);" + return SQLOperation(sql, {"record": record}) + + +class DynamoDBCDCTranslator(DynamoTranslatorBase): """ Translate DynamoDB CDC events into CrateDB SQL statements that materialize them again. @@ -85,74 +118,47 @@ class DynamoCDCTranslatorCrateDB(DynamoCDCTranslatorBase): https://www.singlestore.com/blog/cdc-data-from-dynamodb-to-singlestore-using-dynamodb-streams/ """ - # Define name of the column where CDC's record data will get materialized into. - DATA_COLUMN = "data" - - def __init__(self, table_name: str): - super().__init__() - self.table_name = self.quote_table_name(table_name) - - @property - def sql_ddl(self): - """ - Define SQL DDL statement for creating table in CrateDB that stores re-materialized CDC events. - """ - return f"CREATE TABLE {self.table_name} ({self.DATA_COLUMN} OBJECT(DYNAMIC));" - - def to_sql(self, record: t.Dict[str, t.Any]) -> str: + def to_sql(self, event: t.Dict[str, t.Any]) -> SQLOperation: """ Produce INSERT|UPDATE|DELETE SQL statement from INSERT|MODIFY|REMOVE CDC event record. """ - event_source = record.get("eventSource") - event_name = record.get("eventName") + event_source = event.get("eventSource") + event_name = event.get("eventName") if event_source != "aws:dynamodb": raise ValueError(f"Unknown eventSource: {event_source}") if event_name == "INSERT": - values_clause = self.image_to_values(record["dynamodb"]["NewImage"]) - sql = f"INSERT INTO {self.table_name} " f"({self.DATA_COLUMN}) " f"VALUES ('{values_clause}');" + record = self.decode_record(event["dynamodb"]["NewImage"]) + sql = f"INSERT INTO {self.table_name} ({self.DATA_COLUMN}) VALUES (:record);" + parameters = {"record": record} elif event_name == "MODIFY": - new_image_cleaned = record["dynamodb"]["NewImage"] + new_image = event["dynamodb"]["NewImage"] # Drop primary key columns to not update them. # Primary key values should be identical (if chosen identical in DynamoDB and CrateDB), - # but CrateDB does not allow having themin an UPDATE's SET clause. - for key in record["dynamodb"]["Keys"]: - del new_image_cleaned[key] + # but CrateDB does not allow having them in an UPDATE's SET clause. + for key in event["dynamodb"]["Keys"]: + del new_image[key] - values_clause = self.values_to_update(new_image_cleaned) + record = self.decode_record(event["dynamodb"]["NewImage"]) + clause = self.update_clause(record) - where_clause = self.keys_to_where(record["dynamodb"]["Keys"]) - sql = f"UPDATE {self.table_name} " f"SET {values_clause} " f"WHERE {where_clause};" + where_clause = self.keys_to_where(event["dynamodb"]["Keys"]) + sql = f"UPDATE {self.table_name} SET {clause.set_clause} WHERE {where_clause};" + parameters = record elif event_name == "REMOVE": - where_clause = self.keys_to_where(record["dynamodb"]["Keys"]) - sql = f"DELETE FROM {self.table_name} " f"WHERE {where_clause};" + where_clause = self.keys_to_where(event["dynamodb"]["Keys"]) + sql = f"DELETE FROM {self.table_name} WHERE {where_clause};" + parameters = None else: raise ValueError(f"Unknown CDC event name: {event_name}") - return sql - - def image_to_values(self, image: t.Dict[str, t.Any]) -> str: - """ - Serialize CDC event's "(New|Old)Image" representation to a `VALUES` clause in CrateDB SQL syntax. + return SQLOperation(sql, parameters) - IN (top-level stripped): - "NewImage": { - "humidity": {"N": "84.84"}, - "temperature": {"N": "42.42"}, - "device": {"S": "foo"}, - "timestamp": {"S": "2024-07-12T01:17:42"}, - } - - OUT: - {"humidity": 84.84, "temperature": 42.42, "device": "foo", "timestamp": "2024-07-12T01:17:42"} - """ - return json.dumps(self.deserialize_item(image)) - - def values_to_update(self, keys: t.Dict[str, t.Dict[str, str]]) -> str: + def update_clause(self, record: t.Dict[str, t.Any]) -> SQLParameterizedClause: """ Serializes an image to a comma-separated list of column/values pairs that can be used in the `SET` clause of an `UPDATE` statement. @@ -161,28 +167,25 @@ def values_to_update(self, keys: t.Dict[str, t.Dict[str, str]]) -> str: {'humidity': {'N': '84.84'}, 'temperature': {'N': '55.66'}} OUT: - data['humidity] = '84.84', temperature = '55.66' + data['humidity'] = '84.84', data['temperature'] = '55.66' """ - values_clause = self.deserialize_item(keys) - - constraints: t.List[str] = [] - for key_name, key_value in values_clause.items(): - if key_value is None: - key_value = "NULL" - elif isinstance(key_value, str): - key_value = "'" + str(key_value).replace("'", "''") + "'" + clause = SQLParameterizedClause() + for column, value in record.items(): + param_name = f":{column}" + if value is None: + value = "NULL" - elif isinstance(key_value, dict): - # TODO: Does it also need escaping of inner TEXT values, like the above? - key_value = "'" + json.dumps(key_value) + "'::OBJECT" + elif isinstance(value, dict): + param_name = f"CAST({param_name} AS OBJECT)" - elif isinstance(key_value, list) and key_value and isinstance(key_value[0], dict): - key_value = "'" + json.dumps(key_value) + "'::OBJECT[]" + elif isinstance(value, list) and value and isinstance(value[0], dict): + param_name = f"CAST({param_name} AS OBJECT[])" - constraint = f"{self.DATA_COLUMN}['{key_name}'] = {key_value}" - constraints.append(constraint) - return ", ".join(constraints) + clause.columns.append(f"{self.DATA_COLUMN}['{column}']") + clause.names.append(param_name) + clause.values.append(value) # noqa: PD011 + return clause def keys_to_where(self, keys: t.Dict[str, t.Dict[str, str]]) -> str: """ @@ -204,14 +207,3 @@ def keys_to_where(self, keys: t.Dict[str, t.Dict[str, str]]) -> str: constraint = f"{self.DATA_COLUMN}['{key_name}'] = '{key_value}'" constraints.append(constraint) return " AND ".join(constraints) - - @staticmethod - def quote_table_name(name: str): - """ - Poor man's table quoting. - - TODO: Better use or vendorize canonical table quoting function from CrateDB Toolkit, when applicable. - """ - if '"' not in name: - name = f'"{name}"' - return name diff --git a/src/commons_codec/transform/mongodb.py b/src/commons_codec/transform/mongodb.py index 752915c..aabbd72 100644 --- a/src/commons_codec/transform/mongodb.py +++ b/src/commons_codec/transform/mongodb.py @@ -6,9 +6,10 @@ import logging import typing as t -import simplejson as json from bson.json_util import _json_convert +from commons_codec.model import SQLOperation + logger = logging.getLogger(__name__) @@ -28,9 +29,11 @@ class MongoDBCDCTranslatorBase: - https://www.mongodb.com/developer/languages/python/python-change-streams/ """ - def deserialize_item(self, item: t.Dict[str, t.Dict[str, str]]) -> t.Dict[str, str]: + def decode_bson(self, item: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]: """ - Deserialize MongoDB type-enriched nested JSON snippet into vanilla Python. + Convert MongoDB Extended JSON to vanilla Python dictionary. + + https://www.mongodb.com/docs/manual/reference/mongodb-extended-json/ Example: { @@ -39,6 +42,21 @@ def deserialize_item(self, item: t.Dict[str, t.Dict[str, str]]) -> t.Dict[str, s "data": {"temperature": 42.42, "humidity": 84.84}, "meta": {"timestamp": datetime.datetime(2024, 7, 11, 23, 17, 42), "device": "foo"}, } + + IN (top-level stripped): + "fullDocument": { + "_id": ObjectId("669683c2b0750b2c84893f3e"), + "id": "5F9E", + "data": {"temperature": 42.42, "humidity": 84.84}, + "meta": {"timestamp": datetime.datetime(2024, 7, 11, 23, 17, 42), "device": "foo"}, + } + + OUT: + {"_id": {"$oid": "669683c2b0750b2c84893f3e"}, + "id": "5F9E", + "data": {"temperature": 42.42, "humidity": 84.84}, + "meta": {"timestamp": {"$date": "2024-07-11T23:17:42Z"}, "device": "foo"}, + } """ return _json_convert(item) @@ -94,52 +112,49 @@ def sql_ddl(self): f"CREATE TABLE IF NOT EXISTS {self.table_name} ({self.ID_COLUMN} TEXT, {self.DATA_COLUMN} OBJECT(DYNAMIC));" ) - def to_sql(self, record: t.Dict[str, t.Any]) -> str: + def to_sql(self, event: t.Dict[str, t.Any]) -> t.Union[SQLOperation, None]: """ Produce INSERT|UPDATE|DELETE SQL statement from insert|update|replace|delete CDC event record. """ - if "operationType" in record and record["operationType"]: - operation_type: str = str(record["operationType"]) + if "operationType" in event and event["operationType"]: + operation_type: str = str(event["operationType"]) else: - raise ValueError(f"Operation Type missing or empty: {record}") + raise ValueError(f"Operation Type missing or empty: {event}") if operation_type == "insert": - oid: str = self.get_document_key(record) - full_document = self.get_full_document(record) - values_clause = self.full_document_to_values(full_document) - sql = ( - f"INSERT INTO {self.table_name} " - f"({self.ID_COLUMN}, {self.DATA_COLUMN}) " - f"VALUES ('{oid}', '{values_clause}');" - ) + oid: str = self.get_document_key(event) + record = self.decode_bson(self.get_full_document(event)) + sql = f"INSERT INTO {self.table_name} " f"({self.ID_COLUMN}, {self.DATA_COLUMN}) " "VALUES (:oid, :record);" + parameters = {"oid": oid, "record": record} # In order to use "full document" representations from "update" events, # you need to use `watch(full_document="updateLookup")`. # https://www.mongodb.com/docs/manual/changeStreams/#lookup-full-document-for-update-operations elif operation_type in ["update", "replace"]: - full_document = self.get_full_document(record) - values_clause = self.full_document_to_values(full_document) - where_clause = self.where_clause(record) - sql = f"UPDATE {self.table_name} SET {self.DATA_COLUMN} = '{values_clause}' WHERE {where_clause};" + record = self.decode_bson(self.get_full_document(event)) + where_clause = self.where_clause(event) + sql = f"UPDATE {self.table_name} " f"SET {self.DATA_COLUMN} = :record " f"WHERE {where_clause};" + parameters = {"record": record} elif operation_type == "delete": - where_clause = self.where_clause(record) + where_clause = self.where_clause(event) sql = f"DELETE FROM {self.table_name} WHERE {where_clause};" + parameters = None # TODO: Enable applying the "drop" operation conditionally when enabled. elif operation_type == "drop": logger.info("Received 'drop' operation, but skipping to apply 'DROP TABLE'") - sql = "" + return None elif operation_type == "invalidate": logger.info("Ignoring 'invalidate' CDC operation") - sql = "" + return None else: raise ValueError(f"Unknown CDC operation type: {operation_type}") - return sql + return SQLOperation(sql, parameters) @staticmethod def get_document_key(record: t.Dict[str, t.Any]) -> str: @@ -157,27 +172,6 @@ def get_full_document(record: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]: """ return t.cast(dict, record.get("fullDocument")) - def full_document_to_values(self, document: t.Dict[str, t.Any]) -> str: - """ - Serialize CDC event's "fullDocument" representation to a `VALUES` clause in CrateDB SQL syntax. - - IN (top-level stripped): - "fullDocument": { - "_id": ObjectId("669683c2b0750b2c84893f3e"), - "id": "5F9E", - "data": {"temperature": 42.42, "humidity": 84.84}, - "meta": {"timestamp": datetime.datetime(2024, 7, 11, 23, 17, 42), "device": "foo"}, - } - - OUT: - {"_id": {"$oid": "669683c2b0750b2c84893f3e"}, - "id": "5F9E", - "data": {"temperature": 42.42, "humidity": 84.84}, - "meta": {"timestamp": {"$date": "2024-07-11T23:17:42Z"}, "device": "foo"}, - } - """ - return json.dumps(self.deserialize_item(document)) - def where_clause(self, record: t.Dict[str, t.Any]) -> str: """ When converging an oplog of a MongoDB collection, the primary key is always the MongoDB document OID. diff --git a/tests/transform/test_aws_dms.py b/tests/transform/test_aws_dms.py index 1b3d765..208b857 100644 --- a/tests/transform/test_aws_dms.py +++ b/tests/transform/test_aws_dms.py @@ -5,7 +5,7 @@ import pytest from commons_codec.exception import MessageFormatError, UnknownOperationError -from commons_codec.model import ColumnType, ColumnTypeMapStore, TableAddress +from commons_codec.model import ColumnType, ColumnTypeMapStore, SQLOperation, TableAddress from commons_codec.transform.aws_dms import DMSTranslatorCrateDB RECORD_INSERT = {"age": 31, "attributes": {"baz": "qux"}, "id": 46, "name": "Jane"} @@ -212,19 +212,20 @@ def test_decode_cdc_unknown_event(cdc): def test_decode_cdc_sql_ddl_regular(cdc): - assert cdc.to_sql(MSG_CONTROL_CREATE_TABLE) == 'CREATE TABLE IF NOT EXISTS "public"."foo" (data OBJECT(DYNAMIC));' + assert cdc.to_sql(MSG_CONTROL_CREATE_TABLE) == SQLOperation( + statement='CREATE TABLE IF NOT EXISTS "public"."foo" (data OBJECT(DYNAMIC));', parameters=None + ) def test_decode_cdc_sql_ddl_awsdms(cdc): - assert ( - cdc.to_sql(MSG_CONTROL_AWSDMS) - == 'CREATE TABLE IF NOT EXISTS "dms"."awsdms_apply_exceptions" (data OBJECT(DYNAMIC));' + assert cdc.to_sql(MSG_CONTROL_AWSDMS) == SQLOperation( + statement='CREATE TABLE IF NOT EXISTS "dms"."awsdms_apply_exceptions" (data OBJECT(DYNAMIC));', parameters=None ) def test_decode_cdc_insert(cdc): - assert ( - cdc.to_sql(MSG_DATA_INSERT) == 'INSERT INTO "public"."foo" (data) VALUES ' f"('{json.dumps(RECORD_INSERT)}');" + assert cdc.to_sql(MSG_DATA_INSERT) == SQLOperation( + statement='INSERT INTO "public"."foo" (data) VALUES (:record);', parameters={"record": RECORD_INSERT} ) @@ -236,10 +237,11 @@ def test_decode_cdc_update_success(cdc): cdc.to_sql(MSG_CONTROL_CREATE_TABLE) # Emulate an UPDATE operation. - assert ( - cdc.to_sql(MSG_DATA_UPDATE_VALUE) == 'UPDATE "public"."foo" ' - "SET data['age'] = '33', data['attributes'] = '{\"foo\": \"bar\"}', data['name'] = 'John' " - "WHERE data['id'] = '42';" + assert cdc.to_sql(MSG_DATA_UPDATE_VALUE) == SQLOperation( + statement='UPDATE "public"."foo" SET ' + "data['age']=:age, data['attributes']=:attributes, data['name']=:name " + "WHERE data['id'] = '42';", + parameters={"record": RECORD_UPDATE}, ) @@ -264,7 +266,9 @@ def test_decode_cdc_delete_success(cdc): cdc.to_sql(MSG_CONTROL_CREATE_TABLE) # Emulate a DELETE operation. - assert cdc.to_sql(MSG_DATA_DELETE) == 'DELETE FROM "public"."foo" ' "WHERE data['id'] = '45';" + assert cdc.to_sql(MSG_DATA_DELETE) == SQLOperation( + statement="DELETE FROM \"public\".\"foo\" WHERE data['id'] = '45';", parameters=None + ) def test_decode_cdc_delete_failure(cdc): diff --git a/tests/transform/test_dynamodb.py b/tests/transform/test_dynamodb_cdc.py similarity index 68% rename from tests/transform/test_dynamodb.py rename to tests/transform/test_dynamodb_cdc.py index d2e7642..8ed28c6 100644 --- a/tests/transform/test_dynamodb.py +++ b/tests/transform/test_dynamodb_cdc.py @@ -3,7 +3,8 @@ import pytest -from commons_codec.transform.dynamodb import CrateDBTypeDeserializer, DynamoCDCTranslatorCrateDB +from commons_codec.model import SQLOperation +from commons_codec.transform.dynamodb import CrateDBTypeDeserializer, DynamoDBCDCTranslator READING_BASIC = {"device": "foo", "temperature": 42.42, "humidity": 84.84} @@ -174,69 +175,103 @@ def test_decode_ddb_deserialize_type(): - assert DynamoCDCTranslatorCrateDB(table_name="foo").deserialize_item({"foo": {"N": "84.84"}}) == {"foo": 84.84} + assert DynamoDBCDCTranslator(table_name="foo").decode_record({"foo": {"N": "84.84"}}) == {"foo": 84.84} def test_decode_cdc_sql_ddl(): - assert DynamoCDCTranslatorCrateDB(table_name="foo").sql_ddl == 'CREATE TABLE "foo" (data OBJECT(DYNAMIC));' + assert DynamoDBCDCTranslator(table_name="foo").sql_ddl == 'CREATE TABLE IF NOT EXISTS "foo" (data OBJECT(DYNAMIC));' def test_decode_cdc_unknown_source(): with pytest.raises(ValueError) as ex: - DynamoCDCTranslatorCrateDB(table_name="foo").to_sql(MSG_UNKNOWN_SOURCE) + DynamoDBCDCTranslator(table_name="foo").to_sql(MSG_UNKNOWN_SOURCE) assert ex.match("Unknown eventSource: foo:bar") def test_decode_cdc_unknown_event(): with pytest.raises(ValueError) as ex: - DynamoCDCTranslatorCrateDB(table_name="foo").to_sql(MSG_UNKNOWN_EVENT) + DynamoDBCDCTranslator(table_name="foo").to_sql(MSG_UNKNOWN_EVENT) assert ex.match("Unknown CDC event name: FOOBAR") def test_decode_cdc_insert_basic(): - assert ( - DynamoCDCTranslatorCrateDB(table_name="foo").to_sql(MSG_INSERT_BASIC) == 'INSERT INTO "foo" (data) ' - 'VALUES (\'{"humidity": 84.84, "temperature": 42.42, "device": "foo", "timestamp": "2024-07-12T01:17:42",' - ' "string_set": ["location_1"], "number_set": [1.0, 2.0, 3.0, 4.0], "binary_set": ["U3Vubnk="]}\');' + assert DynamoDBCDCTranslator(table_name="foo").to_sql(MSG_INSERT_BASIC) == SQLOperation( + statement='INSERT INTO "foo" (data) VALUES (:record);', + parameters={ + "record": { + "humidity": 84.84, + "temperature": 42.42, + "device": "foo", + "timestamp": "2024-07-12T01:17:42", + "string_set": ["location_1"], + "number_set": [1.0, 2.0, 3.0, 4.0], + "binary_set": ["U3Vubnk="], + } + }, ) def test_decode_cdc_insert_nested(): - assert ( - DynamoCDCTranslatorCrateDB(table_name="foo").to_sql(MSG_INSERT_NESTED) - == 'INSERT INTO "foo" (data) VALUES (\'{"id": "5F9E-Fsadd41C-4C92-A8C1-70BF3FFB9266", ' - '"data": {"temperature": 42.42, "humidity": 84.84}, ' - '"meta": {"timestamp": "2024-07-12T01:17:42", "device": "foo"},' - ' "string_set": ["location_1"], "number_set": [0.34, 1.0, 2.0, 3.0],' - ' "binary_set": ["U3Vubnk="], "somemap": {"test": 1.0, "test2": 2.0}}\');' + assert DynamoDBCDCTranslator(table_name="foo").to_sql(MSG_INSERT_NESTED) == SQLOperation( + statement='INSERT INTO "foo" (data) VALUES (:record);', + parameters={ + "record": { + "id": "5F9E-Fsadd41C-4C92-A8C1-70BF3FFB9266", + "data": {"temperature": 42.42, "humidity": 84.84}, + "meta": {"timestamp": "2024-07-12T01:17:42", "device": "foo"}, + "string_set": ["location_1"], + "number_set": [0.34, 1.0, 2.0, 3.0], + "binary_set": ["U3Vubnk="], + "somemap": {"test": 1.0, "test2": 2.0}, + } + }, ) def test_decode_cdc_modify_basic(): - assert ( - DynamoCDCTranslatorCrateDB(table_name="foo").to_sql(MSG_MODIFY_BASIC) == 'UPDATE "foo" ' - "SET data['humidity'] = 84.84, data['temperature'] = 55.66, data['location'] = 'Sydney', " - "data['string_set'] = ['location_1'], data['number_set'] = [0.34, 1.0, 2.0, 3.0]," - " data['binary_set'] = ['U3Vubnk='], data['empty_string'] = '', data['null_string'] = NULL" - " WHERE data['device'] = 'foo' AND data['timestamp'] = '2024-07-12T01:17:42';" + assert DynamoDBCDCTranslator(table_name="foo").to_sql(MSG_MODIFY_BASIC) == SQLOperation( + statement='UPDATE "foo" SET ' + "data['humidity']=:humidity, data['temperature']=:temperature, data['location']=:location, " + "data['string_set']=:string_set, data['number_set']=:number_set, data['binary_set']=:binary_set, " + "data['empty_string']=:empty_string, data['null_string']=:null_string " + "WHERE data['device'] = 'foo' AND data['timestamp'] = '2024-07-12T01:17:42';", + parameters={ + "humidity": 84.84, + "temperature": 55.66, + "location": "Sydney", + "string_set": ["location_1"], + "number_set": [0.34, 1.0, 2.0, 3.0], + "binary_set": ["U3Vubnk="], + "empty_string": "", + "null_string": None, + }, ) def test_decode_cdc_modify_nested(): - assert ( - DynamoCDCTranslatorCrateDB(table_name="foo").to_sql(MSG_MODIFY_NESTED) == 'UPDATE "foo" ' - "SET data['tags'] = ['foo', 'bar'], data['empty_map'] = '{}'::OBJECT, data['empty_list'] = []," - " data['string_set'] = ['location_1'], data['number_set'] = [0.34, 1.0, 2.0, 3.0]," - " data['binary_set'] = ['U3Vubnk='], data['somemap'] = '{\"test\": 1.0, \"test2\": 2.0}'::OBJECT," - ' data[\'list_of_objects\'] = \'[{"foo": "bar"}, {"baz": "qux"}]\'::OBJECT[]' - " WHERE data['device'] = 'foo' AND data['timestamp'] = '2024-07-12T01:17:42';" + assert DynamoDBCDCTranslator(table_name="foo").to_sql(MSG_MODIFY_NESTED) == SQLOperation( + statement='UPDATE "foo" SET ' + "data['tags']=:tags, data['empty_map']=CAST(:empty_map AS OBJECT), data['empty_list']=:empty_list, " + "data['string_set']=:string_set, data['number_set']=:number_set, data['binary_set']=:binary_set, " + "data['somemap']=CAST(:somemap AS OBJECT), data['list_of_objects']=CAST(:list_of_objects AS OBJECT[]) " + "WHERE data['device'] = 'foo' AND data['timestamp'] = '2024-07-12T01:17:42';", + parameters={ + "tags": ["foo", "bar"], + "empty_map": {}, + "empty_list": [], + "string_set": ["location_1"], + "number_set": [0.34, 1.0, 2.0, 3.0], + "binary_set": ["U3Vubnk="], + "somemap": {"test": 1.0, "test2": 2.0}, + "list_of_objects": [{"foo": "bar"}, {"baz": "qux"}], + }, ) def test_decode_cdc_remove(): - assert ( - DynamoCDCTranslatorCrateDB(table_name="foo").to_sql(MSG_REMOVE) == 'DELETE FROM "foo" ' - "WHERE data['device'] = 'bar' AND data['timestamp'] = '2024-07-12T01:17:42';" + assert DynamoDBCDCTranslator(table_name="foo").to_sql(MSG_REMOVE) == SQLOperation( + statement="DELETE FROM \"foo\" WHERE data['device'] = 'bar' AND data['timestamp'] = '2024-07-12T01:17:42';", + parameters=None, ) diff --git a/tests/transform/test_dynamodb_full.py b/tests/transform/test_dynamodb_full.py new file mode 100644 index 0000000..ca33738 --- /dev/null +++ b/tests/transform/test_dynamodb_full.py @@ -0,0 +1,41 @@ +from commons_codec.model import SQLOperation +from commons_codec.transform.dynamodb import DynamoDBFullLoadTranslator + +RECORD = { + "id": {"S": "5F9E-Fsadd41C-4C92-A8C1-70BF3FFB9266"}, + "data": {"M": {"temperature": {"N": "42.42"}, "humidity": {"N": "84.84"}}}, + "meta": {"M": {"timestamp": {"S": "2024-07-12T01:17:42"}, "device": {"S": "foo"}}}, + "string_set": {"SS": ["location_1"]}, + "number_set": {"NS": [1, 2, 3, 0.34]}, + "binary_set": {"BS": ["U3Vubnk="]}, + "somemap": { + "M": { + "test": {"N": 1}, + "test2": {"N": 2}, + } + }, +} + + +def test_sql_ddl(): + assert ( + DynamoDBFullLoadTranslator(table_name="foo").sql_ddl + == 'CREATE TABLE IF NOT EXISTS "foo" (data OBJECT(DYNAMIC));' + ) + + +def test_to_sql(): + assert DynamoDBFullLoadTranslator(table_name="foo").to_sql(RECORD) == SQLOperation( + statement='INSERT INTO "foo" (data) VALUES (:record);', + parameters={ + "record": { + "id": "5F9E-Fsadd41C-4C92-A8C1-70BF3FFB9266", + "data": {"temperature": 42.42, "humidity": 84.84}, + "meta": {"timestamp": "2024-07-12T01:17:42", "device": "foo"}, + "string_set": ["location_1"], + "number_set": [0.34, 1.0, 2.0, 3.0], + "binary_set": ["U3Vubnk="], + "somemap": {"test": 1.0, "test2": 2.0}, + } + }, + ) diff --git a/tests/transform/test_mongodb.py b/tests/transform/test_mongodb.py index d687f79..e2d6441 100644 --- a/tests/transform/test_mongodb.py +++ b/tests/transform/test_mongodb.py @@ -3,6 +3,8 @@ import pytest +from commons_codec.model import SQLOperation + pytest.importorskip("pymongo") from bson import ObjectId, Timestamp @@ -117,42 +119,50 @@ def test_decode_cdc_optype_empty(): def test_decode_cdc_insert(): - assert ( - MongoDBCDCTranslatorCrateDB(table_name="foo").to_sql(MSG_INSERT) == 'INSERT INTO "foo" (oid, data) ' - 'VALUES (\'669683c2b0750b2c84893f3e\', \'{"_id": {"$oid": "669683c2b0750b2c84893f3e"}, "id": "5F9E", ' - '"data": {"temperature": 42.42, "humidity": 84.84}, ' - '"meta": {"timestamp": {"$date": "2024-07-11T23:17:42Z"}, "device": "foo"}}\');' + assert MongoDBCDCTranslatorCrateDB(table_name="foo").to_sql(MSG_INSERT) == SQLOperation( + statement='INSERT INTO "foo" (oid, data) VALUES (:oid, :record);', + parameters={ + "oid": "669683c2b0750b2c84893f3e", + "record": { + "_id": {"$oid": "669683c2b0750b2c84893f3e"}, + "id": "5F9E", + "data": {"temperature": 42.42, "humidity": 84.84}, + "meta": {"timestamp": {"$date": "2024-07-11T23:17:42Z"}, "device": "foo"}, + }, + }, ) def test_decode_cdc_update(): - assert ( - MongoDBCDCTranslatorCrateDB(table_name="foo").to_sql(MSG_UPDATE) - == """UPDATE "foo" SET data = '{"_id": {"$oid": "669683c2b0750b2c84893f3e"}, "id": "5F9E", """ - """"data": {"temperature": 42.5}, "meta": {"timestamp": {"$date": "2024-07-11T23:17:42Z"}, "device": "foo"}}' """ - "WHERE oid = '669683c2b0750b2c84893f3e';" + assert MongoDBCDCTranslatorCrateDB(table_name="foo").to_sql(MSG_UPDATE) == SQLOperation( + statement="UPDATE \"foo\" SET data = :record WHERE oid = '669683c2b0750b2c84893f3e';", + parameters={ + "record": { + "_id": {"$oid": "669683c2b0750b2c84893f3e"}, + "id": "5F9E", + "data": {"temperature": 42.5}, + "meta": {"timestamp": {"$date": "2024-07-11T23:17:42Z"}, "device": "foo"}, + } + }, ) def test_decode_cdc_replace(): - assert ( - MongoDBCDCTranslatorCrateDB(table_name="foo").to_sql(MSG_REPLACE) - == """UPDATE "foo" SET data = '{"_id": {"$oid": "669683c2b0750b2c84893f3e"}, """ - """"tags": ["deleted"]}' """ - "WHERE oid = '669683c2b0750b2c84893f3e';" + assert MongoDBCDCTranslatorCrateDB(table_name="foo").to_sql(MSG_REPLACE) == SQLOperation( + statement="UPDATE \"foo\" SET data = :record WHERE oid = '669683c2b0750b2c84893f3e';", + parameters={"record": {"_id": {"$oid": "669683c2b0750b2c84893f3e"}, "tags": ["deleted"]}}, ) def test_decode_cdc_delete(): - assert ( - MongoDBCDCTranslatorCrateDB(table_name="foo").to_sql(MSG_DELETE) - == """DELETE FROM "foo" WHERE oid = '669693c5002ef91ea9c7a562';""" + assert MongoDBCDCTranslatorCrateDB(table_name="foo").to_sql(MSG_DELETE) == SQLOperation( + statement="DELETE FROM \"foo\" WHERE oid = '669693c5002ef91ea9c7a562';", parameters=None ) def test_decode_cdc_drop(): - assert MongoDBCDCTranslatorCrateDB(table_name="foo").to_sql(MSG_DROP) == "" + assert MongoDBCDCTranslatorCrateDB(table_name="foo").to_sql(MSG_DROP) is None def test_decode_cdc_invalidate(): - assert MongoDBCDCTranslatorCrateDB(table_name="foo").to_sql(MSG_INVALIDATE) == "" + assert MongoDBCDCTranslatorCrateDB(table_name="foo").to_sql(MSG_INVALIDATE) is None