Skip to content

Commit

Permalink
DynamoDB: Add special decoding for varied lists
Browse files Browse the repository at this point in the history
Store them into a separate `OBJECT(IGNORED)` column in CrateDB.
  • Loading branch information
Andreas Motl authored and amotl committed Aug 31, 2024
1 parent 664a5f7 commit 4ca7b87
Show file tree
Hide file tree
Showing 8 changed files with 150 additions and 36 deletions.
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
## Unreleased
- Replace poor man's relation name quoting with implementation
`quote_relation_name` from `sqlalchemy-cratedb` package.
- DynamoDB: Add special decoding for varied lists, storing them into a separate
`OBJECT(IGNORED)` column in CrateDB

## 2024/08/27 v0.0.13
- DMS/DynamoDB: Use parameterized SQL WHERE clauses instead of inlining values
Expand Down
11 changes: 11 additions & 0 deletions src/commons_codec/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,3 +128,14 @@ def to_sql(self):
Render a WHERE clause of an SQL statement.
"""
return self.render(" AND ")


@define
class DualRecord:
"""
Manage two halves of a record.
One bucket stores the typed fields, the other stores the untyped ones.
"""

typed: t.Dict[str, t.Any]
untyped: t.Dict[str, t.Any]
89 changes: 69 additions & 20 deletions src/commons_codec/transform/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@
from sqlalchemy_cratedb.support import quote_relation_name

from commons_codec.model import (
DualRecord,
SQLOperation,
SQLParameterizedSetClause,
SQLParameterizedWhereClause,
)
from commons_codec.util.data import TaggableList
from commons_codec.vendor.boto3.dynamodb.types import DYNAMODB_CONTEXT, TypeDeserializer

logger = logging.getLogger(__name__)
Expand All @@ -38,14 +40,45 @@ def _deserialize_ss(self, value):
def _deserialize_bs(self, value):
return list(super()._deserialize_bs(value))

def _deserialize_l(self, value):
"""
CrateDB can't store varied lists in an OBJECT(DYNAMIC) column, so set the
stage to break them apart in order to store them in an OBJECT(IGNORED) column.
https://github.com/crate/commons-codec/issues/28
"""

# Deserialize list as-is.
result = TaggableList([self.deserialize(v) for v in value])
result.set_tag("varied", False)

# If it's not an empty list, check if inner types are varying.
# If so, tag the result list accordingly.
# It doesn't work on the result list itself, but on the DynamoDB
# data structure instead, comparing the single/dual-letter type
# identifiers.
if value:
dynamodb_type_first = list(value[0].keys())[0]
for v in value:
dynamodb_type_current = list(v.keys())[0]
if dynamodb_type_current != dynamodb_type_first:
result.set_tag("varied", True)
break
return result


class DynamoTranslatorBase:
"""
Translate DynamoDB CDC events into different representations.
Translate DynamoDB records into a different representation.
"""

# Define name of the column where CDC's record data will get materialized into.
DATA_COLUMN = "data"
# Define name of the column where typed DynamoDB fields will get materialized into.
# This column uses the `OBJECT(DYNAMIC)` data type.
TYPED_COLUMN = "data"

# Define name of the column where untyped DynamoDB fields will get materialized into.
# This column uses the `OBJECT(IGNORED)` data type.
UNTYPED_COLUMN = "aux"

def __init__(self, table_name: str):
super().__init__()
Expand All @@ -57,9 +90,12 @@ def sql_ddl(self):
"""`
Define SQL DDL statement for creating table in CrateDB that stores re-materialized CDC events.
"""
return f"CREATE TABLE IF NOT EXISTS {self.table_name} ({self.DATA_COLUMN} OBJECT(DYNAMIC));"
return (
f"CREATE TABLE IF NOT EXISTS {self.table_name} "
f"({self.TYPED_COLUMN} OBJECT(DYNAMIC), {self.UNTYPED_COLUMN} OBJECT(IGNORED));"
)

def decode_record(self, item: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]:
def decode_record(self, item: t.Dict[str, t.Any]) -> DualRecord:
"""
Deserialize DynamoDB JSON record into vanilla Python.
Expand All @@ -86,17 +122,23 @@ def decode_record(self, item: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]:
-- https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.NamingRulesDataTypes.html#HowItWorks.DataTypeDescriptors
"""
return toolz.valmap(self.deserializer.deserialize, item)
record = toolz.valmap(self.deserializer.deserialize, item)
untyped = {}
for key, value in record.items():
if isinstance(value, TaggableList) and value.get_tag("varied", False):
untyped[key] = value
record = toolz.dissoc(record, *untyped.keys())
return DualRecord(typed=record, untyped=untyped)


class DynamoDBFullLoadTranslator(DynamoTranslatorBase):
def to_sql(self, record: t.Dict[str, t.Any]) -> SQLOperation:
"""
Produce INSERT|UPDATE|DELETE SQL statement from INSERT|MODIFY|REMOVE CDC event record.
"""
record = self.decode_record(record)
sql = f"INSERT INTO {self.table_name} ({self.DATA_COLUMN}) VALUES (:record);"
return SQLOperation(sql, {"record": record})
dual_record = self.decode_record(record)
sql = f"INSERT INTO {self.table_name} ({self.TYPED_COLUMN}, {self.UNTYPED_COLUMN}) VALUES (:typed, :untyped);"
return SQLOperation(sql, {"typed": dual_record.typed, "untyped": dual_record.untyped})


class DynamoDBCDCTranslator(DynamoTranslatorBase):
Expand All @@ -121,9 +163,11 @@ def to_sql(self, event: t.Dict[str, t.Any]) -> SQLOperation:
raise ValueError(f"Unknown eventSource: {event_source}")

if event_name == "INSERT":
record = self.decode_record(event["dynamodb"]["NewImage"])
sql = f"INSERT INTO {self.table_name} ({self.DATA_COLUMN}) VALUES (:record);"
parameters = {"record": record}
dual_record = self.decode_record(event["dynamodb"]["NewImage"])
sql = (
f"INSERT INTO {self.table_name} ({self.TYPED_COLUMN}, {self.UNTYPED_COLUMN}) VALUES (:typed, :untyped);"
)
parameters = {"typed": dual_record.typed, "untyped": dual_record.untyped}

elif event_name == "MODIFY":
new_image = event["dynamodb"]["NewImage"]
Expand All @@ -133,8 +177,8 @@ def to_sql(self, event: t.Dict[str, t.Any]) -> SQLOperation:
for key in event["dynamodb"]["Keys"]:
del new_image[key]

record = self.decode_record(event["dynamodb"]["NewImage"])
set_clause = self.update_clause(record)
dual_record = self.decode_record(event["dynamodb"]["NewImage"])
set_clause = self.update_clause(dual_record)

where_clause = self.keys_to_where(event["dynamodb"]["Keys"])
sql = f"UPDATE {self.table_name} SET {set_clause.to_sql()} WHERE {where_clause.to_sql()};"
Expand All @@ -151,7 +195,7 @@ def to_sql(self, event: t.Dict[str, t.Any]) -> SQLOperation:

return SQLOperation(sql, parameters)

def update_clause(self, record: t.Dict[str, t.Any]) -> SQLParameterizedSetClause:
def update_clause(self, dual_record: DualRecord) -> SQLParameterizedSetClause:
"""
Serializes an image to a comma-separated list of column/values pairs
that can be used in the `SET` clause of an `UPDATE` statement.
Expand All @@ -164,6 +208,12 @@ def update_clause(self, record: t.Dict[str, t.Any]) -> SQLParameterizedSetClause
"""

clause = SQLParameterizedSetClause()
self.record_to_set_clause(dual_record.typed, self.TYPED_COLUMN, clause)
self.record_to_set_clause(dual_record.untyped, self.UNTYPED_COLUMN, clause)
return clause

@staticmethod
def record_to_set_clause(record: t.Dict[str, t.Any], container_column: str, clause: SQLParameterizedSetClause):
for column, value in record.items():
rval = None
if isinstance(value, dict):
Expand All @@ -172,8 +222,7 @@ def update_clause(self, record: t.Dict[str, t.Any]) -> SQLParameterizedSetClause
elif isinstance(value, list) and value and isinstance(value[0], dict):
rval = f"CAST(:{column} AS OBJECT[])"

clause.add(lval=f"{self.DATA_COLUMN}['{column}']", name=column, value=value, rval=rval)
return clause
clause.add(lval=f"{container_column}['{column}']", name=column, value=value, rval=rval)

def keys_to_where(self, keys: t.Dict[str, t.Dict[str, str]]) -> SQLParameterizedWhereClause:
"""
Expand All @@ -188,8 +237,8 @@ def keys_to_where(self, keys: t.Dict[str, t.Dict[str, str]]) -> SQLParameterized
OUT:
WHERE data['device'] = 'foo' AND data['timestamp'] = '2024-07-12T01:17:42'
"""
keys = self.decode_record(keys)
dual_record = self.decode_record(keys)
clause = SQLParameterizedWhereClause()
for key_name, key_value in keys.items():
clause.add(lval=f"{self.DATA_COLUMN}['{key_name}']", name=key_name, value=key_value)
for key_name, key_value in dual_record.typed.items():
clause.add(lval=f"{self.TYPED_COLUMN}['{key_name}']", name=key_name, value=key_value)
return clause
12 changes: 12 additions & 0 deletions src/commons_codec/util/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,15 @@ def is_number(s):
pass

return False


class TaggableList(list):
"""
Just like a list, but with some extra methods to be able to add meta-information.
"""

def set_tag(self, key, value):
setattr(self, f"__{key}__", value)

def get_tag(self, key, default):
return getattr(self, f"__{key}__", default)
25 changes: 16 additions & 9 deletions tests/transform/test_dynamodb_cdc.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import pytest

from commons_codec.model import SQLOperation
from commons_codec.model import DualRecord, SQLOperation
from commons_codec.transform.dynamodb import CrateDBTypeDeserializer, DynamoDBCDCTranslator

READING_BASIC = {"device": "foo", "temperature": 42.42, "humidity": 84.84}
Expand Down Expand Up @@ -175,11 +175,16 @@


def test_decode_ddb_deserialize_type():
assert DynamoDBCDCTranslator(table_name="foo").decode_record({"foo": {"N": "84.84"}}) == {"foo": 84.84}
assert DynamoDBCDCTranslator(table_name="foo").decode_record({"foo": {"N": "84.84"}}) == DualRecord(
typed={"foo": 84.84}, untyped={}
)


def test_decode_cdc_sql_ddl():
assert DynamoDBCDCTranslator(table_name="foo").sql_ddl == "CREATE TABLE IF NOT EXISTS foo (data OBJECT(DYNAMIC));"
assert (
DynamoDBCDCTranslator(table_name="foo").sql_ddl
== "CREATE TABLE IF NOT EXISTS foo (data OBJECT(DYNAMIC), aux OBJECT(IGNORED));"
)


def test_decode_cdc_unknown_source():
Expand All @@ -196,34 +201,36 @@ def test_decode_cdc_unknown_event():

def test_decode_cdc_insert_basic():
assert DynamoDBCDCTranslator(table_name="foo").to_sql(MSG_INSERT_BASIC) == SQLOperation(
statement="INSERT INTO foo (data) VALUES (:record);",
statement="INSERT INTO foo (data, aux) VALUES (:typed, :untyped);",
parameters={
"record": {
"typed": {
"humidity": 84.84,
"temperature": 42.42,
"device": "foo",
"timestamp": "2024-07-12T01:17:42",
"string_set": ["location_1"],
"number_set": [1.0, 2.0, 3.0, 4.0],
"binary_set": ["U3Vubnk="],
}
},
"untyped": {},
},
)


def test_decode_cdc_insert_nested():
assert DynamoDBCDCTranslator(table_name="foo").to_sql(MSG_INSERT_NESTED) == SQLOperation(
statement="INSERT INTO foo (data) VALUES (:record);",
statement="INSERT INTO foo (data, aux) VALUES (:typed, :untyped);",
parameters={
"record": {
"typed": {
"id": "5F9E-Fsadd41C-4C92-A8C1-70BF3FFB9266",
"data": {"temperature": 42.42, "humidity": 84.84},
"meta": {"timestamp": "2024-07-12T01:17:42", "device": "foo"},
"string_set": ["location_1"],
"number_set": [0.34, 1.0, 2.0, 3.0],
"binary_set": ["U3Vubnk="],
"somemap": {"test": 1.0, "test2": 2.0},
}
},
"untyped": {},
},
)

Expand Down
24 changes: 17 additions & 7 deletions tests/transform/test_dynamodb_full.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"test2": {"N": 2},
}
},
"list_varied": {"L": [{"M": {"a": {"N": 1}}}, {"N": 2}, {"S": "Three"}]},
}

RECORD_UTM = {
Expand All @@ -35,32 +36,40 @@

def test_sql_ddl():
assert (
DynamoDBFullLoadTranslator(table_name="foo").sql_ddl == "CREATE TABLE IF NOT EXISTS foo (data OBJECT(DYNAMIC));"
DynamoDBFullLoadTranslator(table_name="foo").sql_ddl
== "CREATE TABLE IF NOT EXISTS foo (data OBJECT(DYNAMIC), aux OBJECT(IGNORED));"
)


def test_to_sql_all_types():
assert DynamoDBFullLoadTranslator(table_name="foo").to_sql(RECORD_ALL_TYPES) == SQLOperation(
statement="INSERT INTO foo (data) VALUES (:record);",
statement="INSERT INTO foo (data, aux) VALUES (:typed, :untyped);",
parameters={
"record": {
"typed": {
"id": "5F9E-Fsadd41C-4C92-A8C1-70BF3FFB9266",
"data": {"temperature": 42.42, "humidity": 84.84},
"meta": {"timestamp": "2024-07-12T01:17:42", "device": "foo"},
"string_set": ["location_1"],
"number_set": [0.34, 1.0, 2.0, 3.0],
"binary_set": ["U3Vubnk="],
"somemap": {"test": 1.0, "test2": 2.0},
}
},
"untyped": {
"list_varied": [
{"a": 1.0},
2.0,
"Three",
],
},
},
)


def test_to_sql_list_of_objects():
assert DynamoDBFullLoadTranslator(table_name="foo").to_sql(RECORD_UTM) == SQLOperation(
statement="INSERT INTO foo (data) VALUES (:record);",
statement="INSERT INTO foo (data, aux) VALUES (:typed, :untyped);",
parameters={
"record": {
"typed": {
"utmTags": [
{
"date": "2024-08-28T20:05:42.603Z",
Expand All @@ -70,6 +79,7 @@ def test_to_sql_list_of_objects():
"utm_source": "google",
}
]
}
},
"untyped": {},
},
)
23 changes: 23 additions & 0 deletions tests/transform/test_dynamodb_types_cratedb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import unittest
from decimal import Decimal

from commons_codec.model import DualRecord
from commons_codec.transform.dynamodb import CrateDBTypeDeserializer, DynamoDBCDCTranslator


class TestDeserializer(unittest.TestCase):
def setUp(self):
self.deserializer = CrateDBTypeDeserializer()

def test_deserialize_list(self):
assert self.deserializer.deserialize({"L": [{"N": "1"}, {"S": "foo"}, {"L": [{"N": "1.25"}]}]}) == [
Decimal("1"),
"foo",
[Decimal("1.25")],
]


def test_decode_typed_untyped():
assert DynamoDBCDCTranslator(table_name="foo").decode_record(
{"foo": {"N": "84.84"}, "bar": {"L": [{"N": "1"}, {"S": "foo"}]}}
) == DualRecord(typed={"foo": 84.84}, untyped={"bar": [1.0, "foo"]})
File renamed without changes.

0 comments on commit 4ca7b87

Please sign in to comment.