Skip to content

Commit

Permalink
Add transformer for DynamoDB CDC to CrateDB SQL conversion
Browse files Browse the repository at this point in the history
It vendorizes DynamoDB's `TypeDeserializer` Python implementation from
boto3, in order to optimally provide type support without needing to
pull in the full package as a dependency.
  • Loading branch information
amotl committed Jul 16, 2024
1 parent de911a7 commit 114b3c9
Show file tree
Hide file tree
Showing 11 changed files with 593 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@

## Unreleased
- Added decoders for Airrohr, Tasmota, and TTS/TTN from Kotori DAQ
- Added transformer for DynamoDB CDC to CrateDB SQL conversion

## 2024/07/15 0.0.1
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ To install the most recent version, run:
pip install --upgrade data-x
```

## License
The project uses the LGPLv3 license for the whole ensemble. However, individual
portions of the code base are vendorized from other Python packages, where
deviating licenses may apply. Please check for detailed license information
within the header sections of relevant files.

## Contributing
The `data-x` package is an open source project, and is
[managed on GitHub](https://github.com/daq-tools/data-x).
Expand Down
Empty file.
155 changes: 155 additions & 0 deletions src/data_x/transform/dynamodb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
# Copyright (c) 2023-2024, The Kotori Developers and contributors.
# Distributed under the terms of the LGPLv3 license, see LICENSE.

# ruff: noqa: S608 FIXME: Possible SQL injection vector through string-based query construction

import logging
import typing as t

import simplejson as json
import toolz

from data_x.vendor.boto3.dynamodb.types import TypeDeserializer

logger = logging.getLogger(__name__)


class DynamoCDCTranslatorBase:
"""
Translate DynamoDB CDC events into different representations.
"""

def __init__(self):
self.deserializer = TypeDeserializer()

def deserialize_item(self, item: t.Dict[str, t.Dict[str, str]]) -> t.Dict[str, str]:
"""
Deserialize DynamoDB type-enriched nested JSON snippet into vanilla Python.
Example:
{
"humidity": {"N": "84.84"},
"temperature": {"N": "42.42"},
"device": {"S": "qux"},
"timestamp": {"S": "2024-07-12T01:17:42"},
}
A complete list of DynamoDB data type descriptors:
S – String
N – Number
B – Binary
BOOL – Boolean
NULL – Null
M – Map
L – List
SS – String Set
NS – Number Set
BS – Binary Set
-- https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.NamingRulesDataTypes.html#HowItWorks.DataTypeDescriptors
"""
return toolz.valmap(self.deserializer.deserialize, item)


class DynamoCDCTranslatorCrateDB(DynamoCDCTranslatorBase):
"""
Translate DynamoDB CDC events into CrateDB SQL statements that materialize them again.
The SQL DDL schema for CrateDB:
CREATE TABLE <tablename> (data OBJECT(DYNAMIC));
Blueprint:
https://www.singlestore.com/blog/cdc-data-from-dynamodb-to-singlestore-using-dynamodb-streams/
"""

# Define name of the column where CDC's record data will get materialized into.
DATA_COLUMN = "data"

def __init__(self, table_name: str):
super().__init__()
self.table_name = self.quote_table_name(table_name)

@property
def sql_ddl(self):
"""
Define SQL DDL statement for creating table in CrateDB that stores re-materialized CDC events.
"""
return f"CREATE TABLE {self.table_name} ({self.DATA_COLUMN} OBJECT(DYNAMIC));"

def to_sql(self, record: t.Dict[str, t.Any]) -> str:
"""
Produce INSERT|UPDATE|DELETE SQL statement from INSERT|MODIFY|REMOVE CDC event record.
"""
event_source = record.get("eventSource")
event_name = record.get("eventName")

if event_source != "aws:dynamodb":
raise ValueError(f"Unknown eventSource: {event_source}")

if event_name == "INSERT":
values_clause = self.image_to_values(record["dynamodb"]["NewImage"])
sql = f"INSERT INTO {self.table_name} " f"({self.DATA_COLUMN}) " f"VALUES ('{values_clause}');"

elif event_name == "MODIFY":
values_clause = self.image_to_values(record["dynamodb"]["NewImage"])
where_clause = self.keys_to_where(record["dynamodb"]["Keys"])
sql = f"UPDATE {self.table_name} " f"SET {self.DATA_COLUMN} = '{values_clause}' " f"WHERE {where_clause};"

elif event_name == "REMOVE":
where_clause = self.keys_to_where(record["dynamodb"]["Keys"])
sql = f"DELETE FROM {self.table_name} " f"WHERE {where_clause};"

else:
raise ValueError(f"Unknown CDC event name: {event_name}")

return sql

def image_to_values(self, image: t.Dict[str, t.Any]) -> str:
"""
Serialize CDC event's "(New|Old)Image" representation to a `VALUES` clause in CrateDB SQL syntax.
IN (top-level stripped):
"NewImage": {
"humidity": {"N": "84.84"},
"temperature": {"N": "42.42"},
"device": {"S": "foo"},
"timestamp": {"S": "2024-07-12T01:17:42"},
}
OUT:
{"humidity": 84.84, "temperature": 42.42, "device": "foo", "timestamp": "2024-07-12T01:17:42"}
"""
return json.dumps(self.deserialize_item(image))

def keys_to_where(self, keys: t.Dict[str, t.Dict[str, str]]) -> str:
"""
Serialize CDC event's "Keys" representation to an SQL `WHERE` clause in CrateDB SQL syntax.
IN (top-level stripped):
"Keys": {
"device": {"S": "foo"},
"timestamp": {"S": "2024-07-12T01:17:42"},
}
OUT:
WHERE data['device'] = 'foo' AND data['timestamp'] = '2024-07-12T01:17:42'
"""
constraints: t.List[str] = []
for key_name, key_value_raw in keys.items():
key_value = self.deserializer.deserialize(key_value_raw)
# FIXME: Does the quoting of the value on the right hand side need to take the data type into account?
constraint = f"{self.DATA_COLUMN}['{key_name}'] = '{key_value}'"
constraints.append(constraint)
return " AND ".join(constraints)

@staticmethod
def quote_table_name(name: str):
"""
Poor man's table quoting.
TODO: Better use or vendorize canonical table quoting function from CrateDB Toolkit, when applicable.
"""
if '"' not in name:
name = f'"{name}"'
return name
9 changes: 9 additions & 0 deletions src/data_x/util/io.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Copyright (c) 2016-2024, The Kotori Developers and contributors.
# Distributed under the terms of the LGPLv3 license, see LICENSE.
import json
import typing as t
from pathlib import Path


def read_jsonfile(name: t.Union[str, Path]) -> t.Dict[str, t.Any]:
return json.loads(Path(name).read_text())
Empty file added src/data_x/vendor/__init__.py
Empty file.
Empty file.
Empty file.
143 changes: 143 additions & 0 deletions src/data_x/vendor/boto3/dynamodb/types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
# Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# https://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
# Patched: from boto3.compat import collections_abc
from decimal import (
Clamped,
Context,
Inexact,
Overflow,
Rounded,
Underflow,
)

STRING = "S"
NUMBER = "N"
BINARY = "B"
STRING_SET = "SS"
NUMBER_SET = "NS"
BINARY_SET = "BS"
NULL = "NULL"
BOOLEAN = "BOOL"
MAP = "M"
LIST = "L"


DYNAMODB_CONTEXT = Context(
Emin=-128,
Emax=126,
prec=38,
traps=[Clamped, Overflow, Inexact, Rounded, Underflow],
)


BINARY_TYPES = (bytearray, bytes)


class Binary:
"""A class for representing Binary in dynamodb
Especially for Python 2, use this class to explicitly specify
binary data for item in DynamoDB. It is essentially a wrapper around
binary. Unicode and Python 3 string types are not allowed.
"""

def __init__(self, value):
if not isinstance(value, BINARY_TYPES):
types = ", ".join([str(t) for t in BINARY_TYPES])
raise TypeError(f"Value must be of the following types: {types}")
self.value = value

def __eq__(self, other):
if isinstance(other, Binary):
return self.value == other.value
return self.value == other

def __ne__(self, other):
return not self.__eq__(other)

def __repr__(self):
return f"Binary({self.value!r})"

def __str__(self):
return self.value

def __bytes__(self):
return self.value

def __hash__(self):
return hash(self.value)


class TypeDeserializer:
"""This class deserializes DynamoDB types to Python types."""

def deserialize(self, value):
"""The method to deserialize the DynamoDB data types.
:param value: A DynamoDB value to be deserialized to a pythonic value.
Here are the various conversions:
DynamoDB Python
-------- ------
{'NULL': True} None
{'BOOL': True/False} True/False
{'N': str(value)} Decimal(str(value))
{'S': string} string
{'B': bytes} Binary(bytes)
{'NS': [str(value)]} set([Decimal(str(value))])
{'SS': [string]} set([string])
{'BS': [bytes]} set([bytes])
{'L': list} list
{'M': dict} dict
:returns: The pythonic value of the DynamoDB type.
"""

if not value:
raise TypeError("Value must be a nonempty dictionary whose key is a valid dynamodb type.")
dynamodb_type = list(value.keys())[0]
try:
deserializer = getattr(self, f"_deserialize_{dynamodb_type}".lower())
except AttributeError as ex:
raise TypeError(f"Dynamodb type {dynamodb_type} is not supported") from ex
return deserializer(value[dynamodb_type])

def _deserialize_null(self, value):
return None

def _deserialize_bool(self, value):
return value

def _deserialize_n(self, value):
return DYNAMODB_CONTEXT.create_decimal(value)

def _deserialize_s(self, value):
return value

def _deserialize_b(self, value):
return Binary(value)

def _deserialize_ns(self, value):
return set(map(self._deserialize_n, value))

def _deserialize_ss(self, value):
return set(map(self._deserialize_s, value))

def _deserialize_bs(self, value):
return set(map(self._deserialize_b, value))

def _deserialize_l(self, value):
return [self.deserialize(v) for v in value]

def _deserialize_m(self, value):
return {k: self.deserialize(v) for k, v in value.items()}
Loading

0 comments on commit 114b3c9

Please sign in to comment.