diff --git a/CHANGES.md b/CHANGES.md index 85a7ffc..555f439 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -3,5 +3,6 @@ ## Unreleased - Added decoders for Airrohr, Tasmota, and TTS/TTN from Kotori DAQ +- Added transformer for DynamoDB CDC to CrateDB SQL conversion ## 2024/07/15 0.0.1 diff --git a/README.md b/README.md index 1080bc7..7939966 100644 --- a/README.md +++ b/README.md @@ -24,6 +24,12 @@ To install the most recent version, run: pip install --upgrade data-x ``` +## License +The project uses the LGPLv3 license for the whole ensemble. However, individual +portions of the code base are vendorized from other Python packages, where +deviating licenses may apply. Please check for detailed license information +within the header sections of relevant files. + ## Contributing The `data-x` package is an open source project, and is [managed on GitHub](https://github.com/daq-tools/data-x). diff --git a/src/data_x/transform/__init__.py b/src/data_x/transform/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/data_x/transform/dynamodb.py b/src/data_x/transform/dynamodb.py new file mode 100644 index 0000000..7669d0b --- /dev/null +++ b/src/data_x/transform/dynamodb.py @@ -0,0 +1,155 @@ +# Copyright (c) 2023-2024, The Kotori Developers and contributors. +# Distributed under the terms of the LGPLv3 license, see LICENSE. + +# ruff: noqa: S608 FIXME: Possible SQL injection vector through string-based query construction + +import logging +import typing as t + +import simplejson as json +import toolz + +from data_x.vendor.boto3.dynamodb.types import TypeDeserializer + +logger = logging.getLogger(__name__) + + +class DynamoCDCTranslatorBase: + """ + Translate DynamoDB CDC events into different representations. + """ + + def __init__(self): + self.deserializer = TypeDeserializer() + + def deserialize_item(self, item: t.Dict[str, t.Dict[str, str]]) -> t.Dict[str, str]: + """ + Deserialize DynamoDB type-enriched nested JSON snippet into vanilla Python. + + Example: + { + "humidity": {"N": "84.84"}, + "temperature": {"N": "42.42"}, + "device": {"S": "qux"}, + "timestamp": {"S": "2024-07-12T01:17:42"}, + } + + A complete list of DynamoDB data type descriptors: + + S – String + N – Number + B – Binary + BOOL – Boolean + NULL – Null + M – Map + L – List + SS – String Set + NS – Number Set + BS – Binary Set + + -- https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.NamingRulesDataTypes.html#HowItWorks.DataTypeDescriptors + """ + return toolz.valmap(self.deserializer.deserialize, item) + + +class DynamoCDCTranslatorCrateDB(DynamoCDCTranslatorBase): + """ + Translate DynamoDB CDC events into CrateDB SQL statements that materialize them again. + + The SQL DDL schema for CrateDB: + CREATE TABLE (data OBJECT(DYNAMIC)); + + Blueprint: + https://www.singlestore.com/blog/cdc-data-from-dynamodb-to-singlestore-using-dynamodb-streams/ + """ + + # Define name of the column where CDC's record data will get materialized into. + DATA_COLUMN = "data" + + def __init__(self, table_name: str): + super().__init__() + self.table_name = self.quote_table_name(table_name) + + @property + def sql_ddl(self): + """ + Define SQL DDL statement for creating table in CrateDB that stores re-materialized CDC events. + """ + return f"CREATE TABLE {self.table_name} ({self.DATA_COLUMN} OBJECT(DYNAMIC));" + + def to_sql(self, record: t.Dict[str, t.Any]) -> str: + """ + Produce INSERT|UPDATE|DELETE SQL statement from INSERT|MODIFY|REMOVE CDC event record. + """ + event_source = record.get("eventSource") + event_name = record.get("eventName") + + if event_source != "aws:dynamodb": + raise ValueError(f"Unknown eventSource: {event_source}") + + if event_name == "INSERT": + values_clause = self.image_to_values(record["dynamodb"]["NewImage"]) + sql = f"INSERT INTO {self.table_name} " f"({self.DATA_COLUMN}) " f"VALUES ('{values_clause}');" + + elif event_name == "MODIFY": + values_clause = self.image_to_values(record["dynamodb"]["NewImage"]) + where_clause = self.keys_to_where(record["dynamodb"]["Keys"]) + sql = f"UPDATE {self.table_name} " f"SET {self.DATA_COLUMN} = '{values_clause}' " f"WHERE {where_clause};" + + elif event_name == "REMOVE": + where_clause = self.keys_to_where(record["dynamodb"]["Keys"]) + sql = f"DELETE FROM {self.table_name} " f"WHERE {where_clause};" + + else: + raise ValueError(f"Unknown CDC event name: {event_name}") + + return sql + + def image_to_values(self, image: t.Dict[str, t.Any]) -> str: + """ + Serialize CDC event's "(New|Old)Image" representation to a `VALUES` clause in CrateDB SQL syntax. + + IN (top-level stripped): + "NewImage": { + "humidity": {"N": "84.84"}, + "temperature": {"N": "42.42"}, + "device": {"S": "foo"}, + "timestamp": {"S": "2024-07-12T01:17:42"}, + } + + OUT: + {"humidity": 84.84, "temperature": 42.42, "device": "foo", "timestamp": "2024-07-12T01:17:42"} + """ + return json.dumps(self.deserialize_item(image)) + + def keys_to_where(self, keys: t.Dict[str, t.Dict[str, str]]) -> str: + """ + Serialize CDC event's "Keys" representation to an SQL `WHERE` clause in CrateDB SQL syntax. + + IN (top-level stripped): + "Keys": { + "device": {"S": "foo"}, + "timestamp": {"S": "2024-07-12T01:17:42"}, + } + + OUT: + WHERE data['device'] = 'foo' AND data['timestamp'] = '2024-07-12T01:17:42' + """ + constraints: t.List[str] = [] + for key_name, key_value_raw in keys.items(): + key_value = self.deserializer.deserialize(key_value_raw) + # FIXME: Does the quoting of the value on the right hand side need to take the data type into account? + constraint = f"{self.DATA_COLUMN}['{key_name}'] = '{key_value}'" + constraints.append(constraint) + return " AND ".join(constraints) + + @staticmethod + def quote_table_name(name: str): + """ + Poor man's table quoting. + + TODO: Better use or vendorize canonical table quoting function from CrateDB Toolkit, when applicable. + """ + if '"' not in name: + name = f'"{name}"' + return name diff --git a/src/data_x/util/io.py b/src/data_x/util/io.py new file mode 100644 index 0000000..2d83cf1 --- /dev/null +++ b/src/data_x/util/io.py @@ -0,0 +1,9 @@ +# Copyright (c) 2016-2024, The Kotori Developers and contributors. +# Distributed under the terms of the LGPLv3 license, see LICENSE. +import json +import typing as t +from pathlib import Path + + +def read_jsonfile(name: t.Union[str, Path]) -> t.Dict[str, t.Any]: + return json.loads(Path(name).read_text()) diff --git a/src/data_x/vendor/__init__.py b/src/data_x/vendor/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/data_x/vendor/boto3/__init__.py b/src/data_x/vendor/boto3/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/data_x/vendor/boto3/dynamodb/__init__.py b/src/data_x/vendor/boto3/dynamodb/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/data_x/vendor/boto3/dynamodb/types.py b/src/data_x/vendor/boto3/dynamodb/types.py new file mode 100644 index 0000000..285ea08 --- /dev/null +++ b/src/data_x/vendor/boto3/dynamodb/types.py @@ -0,0 +1,143 @@ +# Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# https://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +# Patched: from boto3.compat import collections_abc +from decimal import ( + Clamped, + Context, + Inexact, + Overflow, + Rounded, + Underflow, +) + +STRING = "S" +NUMBER = "N" +BINARY = "B" +STRING_SET = "SS" +NUMBER_SET = "NS" +BINARY_SET = "BS" +NULL = "NULL" +BOOLEAN = "BOOL" +MAP = "M" +LIST = "L" + + +DYNAMODB_CONTEXT = Context( + Emin=-128, + Emax=126, + prec=38, + traps=[Clamped, Overflow, Inexact, Rounded, Underflow], +) + + +BINARY_TYPES = (bytearray, bytes) + + +class Binary: + """A class for representing Binary in dynamodb + + Especially for Python 2, use this class to explicitly specify + binary data for item in DynamoDB. It is essentially a wrapper around + binary. Unicode and Python 3 string types are not allowed. + """ + + def __init__(self, value): + if not isinstance(value, BINARY_TYPES): + types = ", ".join([str(t) for t in BINARY_TYPES]) + raise TypeError(f"Value must be of the following types: {types}") + self.value = value + + def __eq__(self, other): + if isinstance(other, Binary): + return self.value == other.value + return self.value == other + + def __ne__(self, other): + return not self.__eq__(other) + + def __repr__(self): + return f"Binary({self.value!r})" + + def __str__(self): + return self.value + + def __bytes__(self): + return self.value + + def __hash__(self): + return hash(self.value) + + +class TypeDeserializer: + """This class deserializes DynamoDB types to Python types.""" + + def deserialize(self, value): + """The method to deserialize the DynamoDB data types. + + :param value: A DynamoDB value to be deserialized to a pythonic value. + Here are the various conversions: + + DynamoDB Python + -------- ------ + {'NULL': True} None + {'BOOL': True/False} True/False + {'N': str(value)} Decimal(str(value)) + {'S': string} string + {'B': bytes} Binary(bytes) + {'NS': [str(value)]} set([Decimal(str(value))]) + {'SS': [string]} set([string]) + {'BS': [bytes]} set([bytes]) + {'L': list} list + {'M': dict} dict + + :returns: The pythonic value of the DynamoDB type. + """ + + if not value: + raise TypeError("Value must be a nonempty dictionary whose key is a valid dynamodb type.") + dynamodb_type = list(value.keys())[0] + try: + deserializer = getattr(self, f"_deserialize_{dynamodb_type}".lower()) + except AttributeError as ex: + raise TypeError(f"Dynamodb type {dynamodb_type} is not supported") from ex + return deserializer(value[dynamodb_type]) + + def _deserialize_null(self, value): + return None + + def _deserialize_bool(self, value): + return value + + def _deserialize_n(self, value): + return DYNAMODB_CONTEXT.create_decimal(value) + + def _deserialize_s(self, value): + return value + + def _deserialize_b(self, value): + return Binary(value) + + def _deserialize_ns(self, value): + return set(map(self._deserialize_n, value)) + + def _deserialize_ss(self, value): + return set(map(self._deserialize_s, value)) + + def _deserialize_bs(self, value): + return set(map(self._deserialize_b, value)) + + def _deserialize_l(self, value): + return [self.deserialize(v) for v in value] + + def _deserialize_m(self, value): + return {k: self.deserialize(v) for k, v in value.items()} diff --git a/tests/transform/test_dynamodb.py b/tests/transform/test_dynamodb.py new file mode 100644 index 0000000..4975625 --- /dev/null +++ b/tests/transform/test_dynamodb.py @@ -0,0 +1,158 @@ +import decimal + +import pytest +from data_x.transform.dynamodb import DynamoCDCTranslatorCrateDB + +READING_BASIC = {"device": "foo", "temperature": 42.42, "humidity": 84.84} + +MSG_UNKNOWN_SOURCE = { + "eventSource": "foo:bar", +} +MSG_UNKNOWN_EVENT = { + "eventSource": "aws:dynamodb", + "eventName": "FOOBAR", +} + +MSG_INSERT_BASIC = { + "awsRegion": "us-east-1", + "eventID": "b015b5f0-c095-4b50-8ad0-4279aa3d88c6", + "eventName": "INSERT", + "userIdentity": None, + "recordFormat": "application/json", + "tableName": "foo", + "dynamodb": { + "ApproximateCreationDateTime": 1720740233012995, + "Keys": {"device": {"S": "foo"}, "timestamp": {"S": "2024-07-12T01:17:42"}}, + "NewImage": { + "humidity": {"N": "84.84"}, + "temperature": {"N": "42.42"}, + "device": {"S": "foo"}, + "timestamp": {"S": "2024-07-12T01:17:42"}, + }, + "SizeBytes": 99, + "ApproximateCreationDateTimePrecision": "MICROSECOND", + }, + "eventSource": "aws:dynamodb", +} +MSG_INSERT_NESTED = { + "awsRegion": "us-east-1", + "eventID": "b581c2dc-9d97-44ed-94f7-cb77e4fdb740", + "eventName": "INSERT", + "userIdentity": None, + "recordFormat": "application/json", + "tableName": "table-testdrive-nested", + "dynamodb": { + "ApproximateCreationDateTime": 1720800199717446, + "Keys": {"id": {"S": "5F9E-Fsadd41C-4C92-A8C1-70BF3FFB9266"}}, + "NewImage": { + "id": {"S": "5F9E-Fsadd41C-4C92-A8C1-70BF3FFB9266"}, + "data": {"M": {"temperature": {"N": "42.42"}, "humidity": {"N": "84.84"}}}, + "meta": {"M": {"timestamp": {"S": "2024-07-12T01:17:42"}, "device": {"S": "foo"}}}, + }, + "SizeBytes": 156, + "ApproximateCreationDateTimePrecision": "MICROSECOND", + }, + "eventSource": "aws:dynamodb", +} +MSG_MODIFY = { + "awsRegion": "us-east-1", + "eventID": "24757579-ebfd-480a-956d-a1287d2ef707", + "eventName": "MODIFY", + "userIdentity": None, + "recordFormat": "application/json", + "tableName": "foo", + "dynamodb": { + "ApproximateCreationDateTime": 1720742302233719, + "Keys": {"device": {"S": "foo"}, "timestamp": {"S": "2024-07-12T01:17:42"}}, + "NewImage": { + "humidity": {"N": "84.84"}, + "temperature": {"N": "55.66"}, + "device": {"S": "bar"}, + "timestamp": {"S": "2024-07-12T01:17:42"}, + }, + "OldImage": { + "humidity": {"N": "84.84"}, + "temperature": {"N": "42.42"}, + "device": {"S": "foo"}, + "timestamp": {"S": "2024-07-12T01:17:42"}, + }, + "SizeBytes": 161, + "ApproximateCreationDateTimePrecision": "MICROSECOND", + }, + "eventSource": "aws:dynamodb", +} +MSG_REMOVE = { + "awsRegion": "us-east-1", + "eventID": "ff4e68ab-0820-4a0c-80b2-38753e8e00e5", + "eventName": "REMOVE", + "userIdentity": None, + "recordFormat": "application/json", + "tableName": "foo", + "dynamodb": { + "ApproximateCreationDateTime": 1720742321848352, + "Keys": {"device": {"S": "bar"}, "timestamp": {"S": "2024-07-12T01:17:42"}}, + "OldImage": { + "humidity": {"N": "84.84"}, + "temperature": {"N": "55.66"}, + "device": {"S": "bar"}, + "timestamp": {"S": "2024-07-12T01:17:42"}, + }, + "SizeBytes": 99, + "ApproximateCreationDateTimePrecision": "MICROSECOND", + }, + "eventSource": "aws:dynamodb", +} + + +def test_decode_ddb_deserialize_type(): + assert DynamoCDCTranslatorCrateDB(table_name="foo").deserialize_item({"foo": {"N": "84.84"}}) == { + "foo": decimal.Decimal("84.84") + } + + +def test_decode_cdc_sql_ddl(): + assert DynamoCDCTranslatorCrateDB(table_name="foo").sql_ddl == 'CREATE TABLE "foo" (data OBJECT(DYNAMIC));' + + +def test_decode_cdc_unknown_source(): + with pytest.raises(ValueError) as ex: + DynamoCDCTranslatorCrateDB(table_name="foo").to_sql(MSG_UNKNOWN_SOURCE) + assert ex.match("Unknown eventSource: foo:bar") + + +def test_decode_cdc_unknown_event(): + with pytest.raises(ValueError) as ex: + DynamoCDCTranslatorCrateDB(table_name="foo").to_sql(MSG_UNKNOWN_EVENT) + assert ex.match("Unknown CDC event name: FOOBAR") + + +def test_decode_cdc_insert_basic(): + assert ( + DynamoCDCTranslatorCrateDB(table_name="foo").to_sql(MSG_INSERT_BASIC) == 'INSERT INTO "foo" (data) ' + 'VALUES (\'{"humidity": 84.84, "temperature": 42.42, "device": "foo", "timestamp": "2024-07-12T01:17:42"}\');' + ) + + +def test_decode_cdc_insert_nested(): + assert ( + DynamoCDCTranslatorCrateDB(table_name="foo").to_sql(MSG_INSERT_NESTED) + == 'INSERT INTO "foo" (data) VALUES (\'{"id": "5F9E-Fsadd41C-4C92-A8C1-70BF3FFB9266", ' + '"data": {"temperature": 42.42, "humidity": 84.84}, ' + '"meta": {"timestamp": "2024-07-12T01:17:42", "device": "foo"}}\');' + ) + + +def test_decode_cdc_modify(): + assert ( + DynamoCDCTranslatorCrateDB(table_name="foo").to_sql(MSG_MODIFY) == 'UPDATE "foo" ' + 'SET data = \'{"humidity": 84.84, "temperature": 55.66, ' + '"device": "bar", "timestamp": "2024-07-12T01:17:42"}\' ' + "WHERE data['device'] = 'foo' AND data['timestamp'] = '2024-07-12T01:17:42';" + ) + + +def test_decode_cdc_remove(): + assert ( + DynamoCDCTranslatorCrateDB(table_name="foo").to_sql(MSG_REMOVE) == 'DELETE FROM "foo" ' + "WHERE data['device'] = 'bar' AND data['timestamp'] = '2024-07-12T01:17:42';" + ) diff --git a/tests/transform/test_dynamodb_types.py b/tests/transform/test_dynamodb_types.py new file mode 100644 index 0000000..c818fd9 --- /dev/null +++ b/tests/transform/test_dynamodb_types.py @@ -0,0 +1,121 @@ +# Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the 'License'). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# https://aws.amazon.com/apache2.0/ +# +# or in the 'license' file accompanying this file. This file is +# distributed on an 'AS IS' BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +import unittest +from decimal import Decimal + +import pytest +from data_x.vendor.boto3.dynamodb.types import Binary, TypeDeserializer + + +class TestBinary(unittest.TestCase): + def test_bytes_input(self): + data = Binary(b"\x01") + assert b"\x01" == data + assert b"\x01" == data.value + + def test_non_ascii_bytes_input(self): + # Binary data that is out of ASCII range + data = Binary(b"\x88") + assert b"\x88" == data + assert b"\x88" == data.value + + def test_bytearray_input(self): + data = Binary(bytearray([1])) + assert b"\x01" == data + assert b"\x01" == data.value + + def test_unicode_throws_error(self): + with pytest.raises(TypeError): + Binary("\u00e9") + + def test_integer_throws_error(self): + with pytest.raises(TypeError): + Binary(1) + + def test_not_equal(self): + assert Binary(b"\x01") != b"\x02" + + def test_str(self): + assert Binary(b"\x01").__str__() == b"\x01" + + def test_bytes(self): + self.assertEqual(bytes(Binary(b"\x01")), b"\x01") + + def test_repr(self): + assert "Binary" in repr(Binary(b"1")) + + +class TestDeserializer(unittest.TestCase): + def setUp(self): + self.deserializer = TypeDeserializer() + + def test_deserialize_invalid_type(self): + with pytest.raises(TypeError, match=r"FOO is not supported"): + self.deserializer.deserialize({"FOO": "bar"}) + + def test_deserialize_empty_structure(self): + with pytest.raises(TypeError, match=r"Value must be a nonempty"): + self.assertEqual(self.deserializer.deserialize({}), {}) + + def test_deserialize_null(self): + assert self.deserializer.deserialize({"NULL": True}) is None + + def test_deserialize_boolean(self): + assert self.deserializer.deserialize({"BOOL": False}) is False + + def test_deserialize_integer(self): + assert self.deserializer.deserialize({"N": "1"}) == Decimal("1") + + def test_deserialize_decimal(self): + assert self.deserializer.deserialize({"N": "1.25"}) == Decimal("1.25") + + def test_deserialize_string(self): + assert self.deserializer.deserialize({"S": "foo"}) == "foo" + + def test_deserialize_binary(self): + assert self.deserializer.deserialize({"B": b"\x00"}) == Binary(b"\x00") + + def test_deserialize_number_set(self): + assert self.deserializer.deserialize({"NS": ["1", "1.25"]}) == { + Decimal("1"), + Decimal("1.25"), + } + + def test_deserialize_string_set(self): + assert self.deserializer.deserialize({"SS": ["foo", "bar"]}) == { + "foo", + "bar", + } + + def test_deserialize_binary_set(self): + assert self.deserializer.deserialize({"BS": [b"\x00", b"\x01"]}) == { + Binary(b"\x00"), + Binary(b"\x01"), + } + + def test_deserialize_list(self): + assert self.deserializer.deserialize({"L": [{"N": "1"}, {"S": "foo"}, {"L": [{"N": "1.25"}]}]}) == [ + Decimal("1"), + "foo", + [Decimal("1.25")], + ] + + def test_deserialize_map(self): + assert self.deserializer.deserialize( + { + "M": { + "foo": {"S": "mystring"}, + "bar": {"M": {"baz": {"N": "1"}}}, + } + } + ) == {"foo": "mystring", "bar": {"baz": Decimal("1")}}