diff --git a/CHANGES.md b/CHANGES.md index e4ac18a..77c0c96 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -3,6 +3,8 @@ ## Unreleased - Replace poor man's relation name quoting with implementation `quote_relation_name` from `sqlalchemy-cratedb` package. +- DynamoDB: Add special decoding for varied lists, storing them into a separate + `OBJECT(IGNORED)` column in CrateDB ## 2024/08/27 v0.0.13 - DMS/DynamoDB: Use parameterized SQL WHERE clauses instead of inlining values diff --git a/src/commons_codec/model.py b/src/commons_codec/model.py index 79ae163..f731f56 100644 --- a/src/commons_codec/model.py +++ b/src/commons_codec/model.py @@ -128,3 +128,14 @@ def to_sql(self): Render a WHERE clause of an SQL statement. """ return self.render(" AND ") + + +@define +class DualRecord: + """ + Manage two halves of a record. + One bucket stores the typed fields, the other stores the untyped ones. + """ + + typed: t.Dict[str, t.Any] + untyped: t.Dict[str, t.Any] diff --git a/src/commons_codec/transform/dynamodb.py b/src/commons_codec/transform/dynamodb.py index ca6dde7..6f613bb 100644 --- a/src/commons_codec/transform/dynamodb.py +++ b/src/commons_codec/transform/dynamodb.py @@ -8,10 +8,12 @@ from sqlalchemy_cratedb.support import quote_relation_name from commons_codec.model import ( + DualRecord, SQLOperation, SQLParameterizedSetClause, SQLParameterizedWhereClause, ) +from commons_codec.util.data import TaggableList from commons_codec.vendor.boto3.dynamodb.types import DYNAMODB_CONTEXT, TypeDeserializer logger = logging.getLogger(__name__) @@ -38,14 +40,45 @@ def _deserialize_ss(self, value): def _deserialize_bs(self, value): return list(super()._deserialize_bs(value)) + def _deserialize_l(self, value): + """ + CrateDB can't store varied lists in an OBJECT(DYNAMIC) column, so set the + stage to break them apart in order to store them in an OBJECT(IGNORED) column. + + https://github.com/crate/commons-codec/issues/28 + """ + + # Deserialize list as-is. + result = TaggableList([self.deserialize(v) for v in value]) + result.set_tag("varied", False) + + # If it's not an empty list, check if inner types are varying. + # If so, tag the result list accordingly. + # It doesn't work on the result list itself, but on the DynamoDB + # data structure instead, comparing the single/dual-letter type + # identifiers. + if value: + dynamodb_type_first = list(value[0].keys())[0] + for v in value: + dynamodb_type_current = list(v.keys())[0] + if dynamodb_type_current != dynamodb_type_first: + result.set_tag("varied", True) + break + return result + class DynamoTranslatorBase: """ - Translate DynamoDB CDC events into different representations. + Translate DynamoDB records into a different representation. """ - # Define name of the column where CDC's record data will get materialized into. - DATA_COLUMN = "data" + # Define name of the column where typed DynamoDB fields will get materialized into. + # This column uses the `OBJECT(DYNAMIC)` data type. + TYPED_COLUMN = "data" + + # Define name of the column where untyped DynamoDB fields will get materialized into. + # This column uses the `OBJECT(IGNORED)` data type. + UNTYPED_COLUMN = "aux" def __init__(self, table_name: str): super().__init__() @@ -57,9 +90,12 @@ def sql_ddl(self): """` Define SQL DDL statement for creating table in CrateDB that stores re-materialized CDC events. """ - return f"CREATE TABLE IF NOT EXISTS {self.table_name} ({self.DATA_COLUMN} OBJECT(DYNAMIC));" + return ( + f"CREATE TABLE IF NOT EXISTS {self.table_name} " + f"({self.TYPED_COLUMN} OBJECT(DYNAMIC), {self.UNTYPED_COLUMN} OBJECT(IGNORED));" + ) - def decode_record(self, item: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]: + def decode_record(self, item: t.Dict[str, t.Any]) -> DualRecord: """ Deserialize DynamoDB JSON record into vanilla Python. @@ -86,7 +122,13 @@ def decode_record(self, item: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]: -- https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.NamingRulesDataTypes.html#HowItWorks.DataTypeDescriptors """ - return toolz.valmap(self.deserializer.deserialize, item) + record = toolz.valmap(self.deserializer.deserialize, item) + untyped = {} + for key, value in record.items(): + if isinstance(value, TaggableList) and value.get_tag("varied", False): + untyped[key] = value + record = toolz.dissoc(record, *untyped.keys()) + return DualRecord(typed=record, untyped=untyped) class DynamoDBFullLoadTranslator(DynamoTranslatorBase): @@ -94,9 +136,9 @@ def to_sql(self, record: t.Dict[str, t.Any]) -> SQLOperation: """ Produce INSERT|UPDATE|DELETE SQL statement from INSERT|MODIFY|REMOVE CDC event record. """ - record = self.decode_record(record) - sql = f"INSERT INTO {self.table_name} ({self.DATA_COLUMN}) VALUES (:record);" - return SQLOperation(sql, {"record": record}) + dual_record = self.decode_record(record) + sql = f"INSERT INTO {self.table_name} ({self.TYPED_COLUMN}, {self.UNTYPED_COLUMN}) VALUES (:typed, :untyped);" + return SQLOperation(sql, {"typed": dual_record.typed, "untyped": dual_record.untyped}) class DynamoDBCDCTranslator(DynamoTranslatorBase): @@ -121,9 +163,11 @@ def to_sql(self, event: t.Dict[str, t.Any]) -> SQLOperation: raise ValueError(f"Unknown eventSource: {event_source}") if event_name == "INSERT": - record = self.decode_record(event["dynamodb"]["NewImage"]) - sql = f"INSERT INTO {self.table_name} ({self.DATA_COLUMN}) VALUES (:record);" - parameters = {"record": record} + dual_record = self.decode_record(event["dynamodb"]["NewImage"]) + sql = ( + f"INSERT INTO {self.table_name} ({self.TYPED_COLUMN}, {self.UNTYPED_COLUMN}) VALUES (:typed, :untyped);" + ) + parameters = {"typed": dual_record.typed, "untyped": dual_record.untyped} elif event_name == "MODIFY": new_image = event["dynamodb"]["NewImage"] @@ -133,8 +177,8 @@ def to_sql(self, event: t.Dict[str, t.Any]) -> SQLOperation: for key in event["dynamodb"]["Keys"]: del new_image[key] - record = self.decode_record(event["dynamodb"]["NewImage"]) - set_clause = self.update_clause(record) + dual_record = self.decode_record(event["dynamodb"]["NewImage"]) + set_clause = self.update_clause(dual_record) where_clause = self.keys_to_where(event["dynamodb"]["Keys"]) sql = f"UPDATE {self.table_name} SET {set_clause.to_sql()} WHERE {where_clause.to_sql()};" @@ -151,7 +195,7 @@ def to_sql(self, event: t.Dict[str, t.Any]) -> SQLOperation: return SQLOperation(sql, parameters) - def update_clause(self, record: t.Dict[str, t.Any]) -> SQLParameterizedSetClause: + def update_clause(self, dual_record: DualRecord) -> SQLParameterizedSetClause: """ Serializes an image to a comma-separated list of column/values pairs that can be used in the `SET` clause of an `UPDATE` statement. @@ -164,6 +208,12 @@ def update_clause(self, record: t.Dict[str, t.Any]) -> SQLParameterizedSetClause """ clause = SQLParameterizedSetClause() + self.record_to_set_clause(dual_record.typed, self.TYPED_COLUMN, clause) + self.record_to_set_clause(dual_record.untyped, self.UNTYPED_COLUMN, clause) + return clause + + @staticmethod + def record_to_set_clause(record: t.Dict[str, t.Any], container_column: str, clause: SQLParameterizedSetClause): for column, value in record.items(): rval = None if isinstance(value, dict): @@ -172,8 +222,7 @@ def update_clause(self, record: t.Dict[str, t.Any]) -> SQLParameterizedSetClause elif isinstance(value, list) and value and isinstance(value[0], dict): rval = f"CAST(:{column} AS OBJECT[])" - clause.add(lval=f"{self.DATA_COLUMN}['{column}']", name=column, value=value, rval=rval) - return clause + clause.add(lval=f"{container_column}['{column}']", name=column, value=value, rval=rval) def keys_to_where(self, keys: t.Dict[str, t.Dict[str, str]]) -> SQLParameterizedWhereClause: """ @@ -188,8 +237,8 @@ def keys_to_where(self, keys: t.Dict[str, t.Dict[str, str]]) -> SQLParameterized OUT: WHERE data['device'] = 'foo' AND data['timestamp'] = '2024-07-12T01:17:42' """ - keys = self.decode_record(keys) + dual_record = self.decode_record(keys) clause = SQLParameterizedWhereClause() - for key_name, key_value in keys.items(): - clause.add(lval=f"{self.DATA_COLUMN}['{key_name}']", name=key_name, value=key_value) + for key_name, key_value in dual_record.typed.items(): + clause.add(lval=f"{self.TYPED_COLUMN}['{key_name}']", name=key_name, value=key_value) return clause diff --git a/src/commons_codec/util/data.py b/src/commons_codec/util/data.py index d3930af..9d4235b 100644 --- a/src/commons_codec/util/data.py +++ b/src/commons_codec/util/data.py @@ -30,3 +30,15 @@ def is_number(s): pass return False + + +class TaggableList(list): + """ + Just like a list, but with some extra methods to be able to add meta-information. + """ + + def set_tag(self, key, value): + setattr(self, f"__{key}__", value) + + def get_tag(self, key, default): + return getattr(self, f"__{key}__", default) diff --git a/tests/transform/test_dynamodb_cdc.py b/tests/transform/test_dynamodb_cdc.py index c27b7f0..4699b41 100644 --- a/tests/transform/test_dynamodb_cdc.py +++ b/tests/transform/test_dynamodb_cdc.py @@ -3,7 +3,7 @@ import pytest -from commons_codec.model import SQLOperation +from commons_codec.model import DualRecord, SQLOperation from commons_codec.transform.dynamodb import CrateDBTypeDeserializer, DynamoDBCDCTranslator READING_BASIC = {"device": "foo", "temperature": 42.42, "humidity": 84.84} @@ -175,11 +175,16 @@ def test_decode_ddb_deserialize_type(): - assert DynamoDBCDCTranslator(table_name="foo").decode_record({"foo": {"N": "84.84"}}) == {"foo": 84.84} + assert DynamoDBCDCTranslator(table_name="foo").decode_record({"foo": {"N": "84.84"}}) == DualRecord( + typed={"foo": 84.84}, untyped={} + ) def test_decode_cdc_sql_ddl(): - assert DynamoDBCDCTranslator(table_name="foo").sql_ddl == "CREATE TABLE IF NOT EXISTS foo (data OBJECT(DYNAMIC));" + assert ( + DynamoDBCDCTranslator(table_name="foo").sql_ddl + == "CREATE TABLE IF NOT EXISTS foo (data OBJECT(DYNAMIC), aux OBJECT(IGNORED));" + ) def test_decode_cdc_unknown_source(): @@ -196,9 +201,9 @@ def test_decode_cdc_unknown_event(): def test_decode_cdc_insert_basic(): assert DynamoDBCDCTranslator(table_name="foo").to_sql(MSG_INSERT_BASIC) == SQLOperation( - statement="INSERT INTO foo (data) VALUES (:record);", + statement="INSERT INTO foo (data, aux) VALUES (:typed, :untyped);", parameters={ - "record": { + "typed": { "humidity": 84.84, "temperature": 42.42, "device": "foo", @@ -206,16 +211,17 @@ def test_decode_cdc_insert_basic(): "string_set": ["location_1"], "number_set": [1.0, 2.0, 3.0, 4.0], "binary_set": ["U3Vubnk="], - } + }, + "untyped": {}, }, ) def test_decode_cdc_insert_nested(): assert DynamoDBCDCTranslator(table_name="foo").to_sql(MSG_INSERT_NESTED) == SQLOperation( - statement="INSERT INTO foo (data) VALUES (:record);", + statement="INSERT INTO foo (data, aux) VALUES (:typed, :untyped);", parameters={ - "record": { + "typed": { "id": "5F9E-Fsadd41C-4C92-A8C1-70BF3FFB9266", "data": {"temperature": 42.42, "humidity": 84.84}, "meta": {"timestamp": "2024-07-12T01:17:42", "device": "foo"}, @@ -223,7 +229,8 @@ def test_decode_cdc_insert_nested(): "number_set": [0.34, 1.0, 2.0, 3.0], "binary_set": ["U3Vubnk="], "somemap": {"test": 1.0, "test2": 2.0}, - } + }, + "untyped": {}, }, ) diff --git a/tests/transform/test_dynamodb_full.py b/tests/transform/test_dynamodb_full.py index bb1887e..a2cfb9c 100644 --- a/tests/transform/test_dynamodb_full.py +++ b/tests/transform/test_dynamodb_full.py @@ -14,6 +14,7 @@ "test2": {"N": 2}, } }, + "list_varied": {"L": [{"M": {"a": {"N": 1}}}, {"N": 2}, {"S": "Three"}]}, } RECORD_UTM = { @@ -35,15 +36,16 @@ def test_sql_ddl(): assert ( - DynamoDBFullLoadTranslator(table_name="foo").sql_ddl == "CREATE TABLE IF NOT EXISTS foo (data OBJECT(DYNAMIC));" + DynamoDBFullLoadTranslator(table_name="foo").sql_ddl + == "CREATE TABLE IF NOT EXISTS foo (data OBJECT(DYNAMIC), aux OBJECT(IGNORED));" ) def test_to_sql_all_types(): assert DynamoDBFullLoadTranslator(table_name="foo").to_sql(RECORD_ALL_TYPES) == SQLOperation( - statement="INSERT INTO foo (data) VALUES (:record);", + statement="INSERT INTO foo (data, aux) VALUES (:typed, :untyped);", parameters={ - "record": { + "typed": { "id": "5F9E-Fsadd41C-4C92-A8C1-70BF3FFB9266", "data": {"temperature": 42.42, "humidity": 84.84}, "meta": {"timestamp": "2024-07-12T01:17:42", "device": "foo"}, @@ -51,16 +53,23 @@ def test_to_sql_all_types(): "number_set": [0.34, 1.0, 2.0, 3.0], "binary_set": ["U3Vubnk="], "somemap": {"test": 1.0, "test2": 2.0}, - } + }, + "untyped": { + "list_varied": [ + {"a": 1.0}, + 2.0, + "Three", + ], + }, }, ) def test_to_sql_list_of_objects(): assert DynamoDBFullLoadTranslator(table_name="foo").to_sql(RECORD_UTM) == SQLOperation( - statement="INSERT INTO foo (data) VALUES (:record);", + statement="INSERT INTO foo (data, aux) VALUES (:typed, :untyped);", parameters={ - "record": { + "typed": { "utmTags": [ { "date": "2024-08-28T20:05:42.603Z", @@ -70,6 +79,7 @@ def test_to_sql_list_of_objects(): "utm_source": "google", } ] - } + }, + "untyped": {}, }, ) diff --git a/tests/transform/test_dynamodb_types_cratedb.py b/tests/transform/test_dynamodb_types_cratedb.py new file mode 100644 index 0000000..4501354 --- /dev/null +++ b/tests/transform/test_dynamodb_types_cratedb.py @@ -0,0 +1,23 @@ +import unittest +from decimal import Decimal + +from commons_codec.model import DualRecord +from commons_codec.transform.dynamodb import CrateDBTypeDeserializer, DynamoDBCDCTranslator + + +class TestDeserializer(unittest.TestCase): + def setUp(self): + self.deserializer = CrateDBTypeDeserializer() + + def test_deserialize_list(self): + assert self.deserializer.deserialize({"L": [{"N": "1"}, {"S": "foo"}, {"L": [{"N": "1.25"}]}]}) == [ + Decimal("1"), + "foo", + [Decimal("1.25")], + ] + + +def test_decode_typed_untyped(): + assert DynamoDBCDCTranslator(table_name="foo").decode_record( + {"foo": {"N": "84.84"}, "bar": {"L": [{"N": "1"}, {"S": "foo"}]}} + ) == DualRecord(typed={"foo": 84.84}, untyped={"bar": [1.0, "foo"]}) diff --git a/tests/transform/test_dynamodb_types.py b/tests/transform/test_dynamodb_types_vanilla.py similarity index 100% rename from tests/transform/test_dynamodb_types.py rename to tests/transform/test_dynamodb_types_vanilla.py