Skip to content

Commit

Permalink
DMS/DynamoDB/MongoDB: Use SQL with parameters instead of inlining values
Browse files Browse the repository at this point in the history
- `SQLOperation` bundles data about an SQL operation, including
  statement and parameters.
- `DynamoDBFullLoadTranslator` supports full-load data transfers.
  • Loading branch information
Andreas Motl authored and amotl committed Aug 26, 2024
1 parent 00a3308 commit c9e195c
Show file tree
Hide file tree
Showing 9 changed files with 332 additions and 209 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Changelog

## Unreleased
- DMS/DynamoDB/MongoDB: Use SQL with parameters instead of inlining values

## 2024/08/23 v0.0.11
- DynamoDB: Fix serializing OBJECT and ARRAY representations to CrateDB
Expand Down
35 changes: 33 additions & 2 deletions src/commons_codec/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@
import typing as t
from enum import auto

from attr import Factory
from attrs import define

if sys.version_info >= (3, 11):
from enum import StrEnum
else:
from backports.strenum import StrEnum # pragma: no cover

from attrs import define


@define(frozen=True)
class TableAddress:
Expand Down Expand Up @@ -81,3 +82,33 @@ def from_json(cls, payload: str) -> t.Union["ColumnTypeMapStore", None]:
if not payload:
return None
return cls.from_dict(json.loads(payload))


@define
class SQLOperation:
"""
Bundle data about an SQL operation, including statement and parameters.
Parameters can be a single dictionary or a list of dictionaries.
"""

statement: str
parameters: t.Optional[t.Union[t.Mapping[str, t.Any], t.List[t.Mapping[str, t.Any]]]] = None


@define
class SQLParameterizedClause:
"""
Manage details about a SQL parameterized clause, including column names, parameter names, and values.
"""

columns: t.List[str] = Factory(list)
names: t.List[str] = Factory(list)
values: t.List[str] = Factory(list)

@property
def set_clause(self):
"""
Render a SET clause of an SQL statement.
"""
return ", ".join([f"{key}={value}" for key, value in zip(self.columns, self.names)])
51 changes: 33 additions & 18 deletions src/commons_codec/transform/aws_dms.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,14 @@
import simplejson as json

from commons_codec.exception import MessageFormatError, UnknownOperationError
from commons_codec.model import ColumnType, ColumnTypeMapStore, PrimaryKeyStore, TableAddress
from commons_codec.model import (
ColumnType,
ColumnTypeMapStore,
PrimaryKeyStore,
SQLOperation,
SQLParameterizedClause,
TableAddress,
)

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -66,35 +73,39 @@ def __init__(
self.primary_keys: t.List[str] = self.container.primary_keys[self.address]
self.column_types: t.Dict[str, ColumnType] = self.container.column_types[self.address]

def to_sql(self) -> str:
def to_sql(self) -> SQLOperation:
if self.operation == "create-table":
pks = self.control.get("table-def", {}).get("primary-key")
if pks:
self.primary_keys += pks
# TODO: What about dropping tables first?
return f"CREATE TABLE IF NOT EXISTS {self.address.fqn} ({self.DATA_COLUMN} OBJECT(DYNAMIC));"
return SQLOperation(f"CREATE TABLE IF NOT EXISTS {self.address.fqn} ({self.DATA_COLUMN} OBJECT(DYNAMIC));")

elif self.operation in ["load", "insert"]:
values_clause = self.record_to_values()
sql = f"INSERT INTO {self.address.fqn} ({self.DATA_COLUMN}) VALUES ('{values_clause}');"
record = self.record_to_values()
sql = f"INSERT INTO {self.address.fqn} ({self.DATA_COLUMN}) VALUES (:record);"
parameters = {"record": record}

elif self.operation == "update":
values_clause = self.record_to_update()
record = self.record_to_values()
clause = self.update_clause()
where_clause = self.keys_to_where()
sql = f"UPDATE {self.address.fqn} SET {values_clause} WHERE {where_clause};"
sql = f"UPDATE {self.address.fqn} SET {clause.set_clause} WHERE {where_clause};"
parameters = {"record": record}

elif self.operation == "delete":
where_clause = self.keys_to_where()
sql = f"DELETE FROM {self.address.fqn} WHERE {where_clause};"
parameters = None

else:
message = f"Unknown CDC event operation: {self.operation}"
logger.warning(message)
raise UnknownOperationError(message, operation=self.operation, record=self.record)

return sql
return SQLOperation(sql, parameters)

def record_to_update(self) -> str:
def update_clause(self) -> SQLParameterizedClause:
"""
Serializes an image to a comma-separated list of column/values pairs
that can be used in the `SET` clause of an `UPDATE` statement.
Expand All @@ -106,17 +117,21 @@ def record_to_update(self) -> str:
OUT
data['age'] = '33', data['attributes'] = '{"foo": "bar"}', data['name'] = 'John'
"""
constraints: t.List[str] = []
for column_name, column_value in self.record["data"].items():
clause = SQLParameterizedClause()
for column, value in self.record["data"].items():
# Skip primary key columns, they cannot be updated
if column_name in self.primary_keys:
if column in self.primary_keys:
continue

constraint = f"{self.DATA_COLUMN}['{column_name}'] = '{column_value}'"
constraints.append(constraint)
return ", ".join(constraints)
param_name = f":{column}"

clause.columns.append(f"{self.DATA_COLUMN}['{column}']")
clause.names.append(param_name)
clause.values.append(value) # noqa: PD011

return clause

def record_to_values(self) -> str:
def record_to_values(self) -> t.Dict[str, t.Any]:
"""
Apply type translations to record, and serialize to JSON.
Expand All @@ -133,7 +148,7 @@ def record_to_values(self) -> str:
if column_type is ColumnType.MAP and isinstance(value, str):
value = json.loads(value)
self.data[column_name] = value
return json.dumps(self.data)
return self.data

def keys_to_where(self) -> str:
"""
Expand Down Expand Up @@ -169,7 +184,7 @@ def __init__(
self.primary_keys = primary_keys or PrimaryKeyStore()
self.column_types = column_types or ColumnTypeMapStore()

def to_sql(self, record: t.Dict[str, t.Any]) -> str:
def to_sql(self, record: t.Dict[str, t.Any]) -> SQLOperation:
"""
Produce INSERT|UPDATE|DELETE SQL statement from load|insert|update|delete CDC event record.
"""
Expand Down
154 changes: 73 additions & 81 deletions src/commons_codec/transform/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
import logging
import typing as t

import simplejson as json
import toolz

from commons_codec.model import SQLOperation, SQLParameterizedClause
from commons_codec.vendor.boto3.dynamodb.types import DYNAMODB_CONTEXT, TypeDeserializer

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -36,17 +36,40 @@ def _deserialize_bs(self, value):
return list(super()._deserialize_bs(value))


class DynamoCDCTranslatorBase:
class DynamoTranslatorBase:
"""
Translate DynamoDB CDC events into different representations.
"""

def __init__(self):
# Define name of the column where CDC's record data will get materialized into.
DATA_COLUMN = "data"

def __init__(self, table_name: str):
super().__init__()
self.table_name = self.quote_table_name(table_name)
self.deserializer = CrateDBTypeDeserializer()

def deserialize_item(self, item: t.Dict[str, t.Dict[str, str]]) -> t.Dict[str, str]:
@property
def sql_ddl(self):
"""`
Define SQL DDL statement for creating table in CrateDB that stores re-materialized CDC events.
"""
return f"CREATE TABLE IF NOT EXISTS {self.table_name} ({self.DATA_COLUMN} OBJECT(DYNAMIC));"

@staticmethod
def quote_table_name(name: str):
"""
Poor man's table quoting.
TODO: Better use or vendorize canonical table quoting function from CrateDB Toolkit, when applicable.
"""
if '"' not in name and "." not in name:
name = f'"{name}"'
return name

def decode_record(self, item: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]:
"""
Deserialize DynamoDB type-enriched nested JSON snippet into vanilla Python.
Deserialize DynamoDB JSON record into vanilla Python.
Example:
{
Expand Down Expand Up @@ -74,7 +97,17 @@ def deserialize_item(self, item: t.Dict[str, t.Dict[str, str]]) -> t.Dict[str, s
return toolz.valmap(self.deserializer.deserialize, item)


class DynamoCDCTranslatorCrateDB(DynamoCDCTranslatorBase):
class DynamoDBFullLoadTranslator(DynamoTranslatorBase):
def to_sql(self, record: t.Dict[str, t.Any]) -> SQLOperation:
"""
Produce INSERT|UPDATE|DELETE SQL statement from INSERT|MODIFY|REMOVE CDC event record.
"""
record = self.decode_record(record)
sql = f"INSERT INTO {self.table_name} ({self.DATA_COLUMN}) VALUES (:record);"
return SQLOperation(sql, {"record": record})


class DynamoDBCDCTranslator(DynamoTranslatorBase):
"""
Translate DynamoDB CDC events into CrateDB SQL statements that materialize them again.
Expand All @@ -85,74 +118,47 @@ class DynamoCDCTranslatorCrateDB(DynamoCDCTranslatorBase):
https://www.singlestore.com/blog/cdc-data-from-dynamodb-to-singlestore-using-dynamodb-streams/
"""

# Define name of the column where CDC's record data will get materialized into.
DATA_COLUMN = "data"

def __init__(self, table_name: str):
super().__init__()
self.table_name = self.quote_table_name(table_name)

@property
def sql_ddl(self):
"""
Define SQL DDL statement for creating table in CrateDB that stores re-materialized CDC events.
"""
return f"CREATE TABLE {self.table_name} ({self.DATA_COLUMN} OBJECT(DYNAMIC));"

def to_sql(self, record: t.Dict[str, t.Any]) -> str:
def to_sql(self, event: t.Dict[str, t.Any]) -> SQLOperation:
"""
Produce INSERT|UPDATE|DELETE SQL statement from INSERT|MODIFY|REMOVE CDC event record.
"""
event_source = record.get("eventSource")
event_name = record.get("eventName")
event_source = event.get("eventSource")
event_name = event.get("eventName")

if event_source != "aws:dynamodb":
raise ValueError(f"Unknown eventSource: {event_source}")

if event_name == "INSERT":
values_clause = self.image_to_values(record["dynamodb"]["NewImage"])
sql = f"INSERT INTO {self.table_name} " f"({self.DATA_COLUMN}) " f"VALUES ('{values_clause}');"
record = self.decode_record(event["dynamodb"]["NewImage"])
sql = f"INSERT INTO {self.table_name} ({self.DATA_COLUMN}) VALUES (:record);"
parameters = {"record": record}

elif event_name == "MODIFY":
new_image_cleaned = record["dynamodb"]["NewImage"]
new_image = event["dynamodb"]["NewImage"]
# Drop primary key columns to not update them.
# Primary key values should be identical (if chosen identical in DynamoDB and CrateDB),
# but CrateDB does not allow having themin an UPDATE's SET clause.
for key in record["dynamodb"]["Keys"]:
del new_image_cleaned[key]
# but CrateDB does not allow having them in an UPDATE's SET clause.
for key in event["dynamodb"]["Keys"]:
del new_image[key]

values_clause = self.values_to_update(new_image_cleaned)
record = self.decode_record(event["dynamodb"]["NewImage"])
clause = self.update_clause(record)

where_clause = self.keys_to_where(record["dynamodb"]["Keys"])
sql = f"UPDATE {self.table_name} " f"SET {values_clause} " f"WHERE {where_clause};"
where_clause = self.keys_to_where(event["dynamodb"]["Keys"])
sql = f"UPDATE {self.table_name} SET {clause.set_clause} WHERE {where_clause};"
parameters = record

elif event_name == "REMOVE":
where_clause = self.keys_to_where(record["dynamodb"]["Keys"])
sql = f"DELETE FROM {self.table_name} " f"WHERE {where_clause};"
where_clause = self.keys_to_where(event["dynamodb"]["Keys"])
sql = f"DELETE FROM {self.table_name} WHERE {where_clause};"
parameters = None

else:
raise ValueError(f"Unknown CDC event name: {event_name}")

return sql

def image_to_values(self, image: t.Dict[str, t.Any]) -> str:
"""
Serialize CDC event's "(New|Old)Image" representation to a `VALUES` clause in CrateDB SQL syntax.
IN (top-level stripped):
"NewImage": {
"humidity": {"N": "84.84"},
"temperature": {"N": "42.42"},
"device": {"S": "foo"},
"timestamp": {"S": "2024-07-12T01:17:42"},
}
OUT:
{"humidity": 84.84, "temperature": 42.42, "device": "foo", "timestamp": "2024-07-12T01:17:42"}
"""
return json.dumps(self.deserialize_item(image))
return SQLOperation(sql, parameters)

def values_to_update(self, keys: t.Dict[str, t.Dict[str, str]]) -> str:
def update_clause(self, record: t.Dict[str, t.Any]) -> SQLParameterizedClause:
"""
Serializes an image to a comma-separated list of column/values pairs
that can be used in the `SET` clause of an `UPDATE` statement.
Expand All @@ -161,28 +167,25 @@ def values_to_update(self, keys: t.Dict[str, t.Dict[str, str]]) -> str:
{'humidity': {'N': '84.84'}, 'temperature': {'N': '55.66'}}
OUT:
data['humidity] = '84.84', temperature = '55.66'
data['humidity'] = '84.84', data['temperature'] = '55.66'
"""
values_clause = self.deserialize_item(keys)

constraints: t.List[str] = []
for key_name, key_value in values_clause.items():
if key_value is None:
key_value = "NULL"

elif isinstance(key_value, str):
key_value = "'" + str(key_value).replace("'", "''") + "'"
clause = SQLParameterizedClause()
for column, value in record.items():
param_name = f":{column}"
if value is None:
value = "NULL"

elif isinstance(key_value, dict):
# TODO: Does it also need escaping of inner TEXT values, like the above?
key_value = "'" + json.dumps(key_value) + "'::OBJECT"
elif isinstance(value, dict):
param_name = f"CAST({param_name} AS OBJECT)"

elif isinstance(key_value, list) and key_value and isinstance(key_value[0], dict):
key_value = "'" + json.dumps(key_value) + "'::OBJECT[]"
elif isinstance(value, list) and value and isinstance(value[0], dict):
param_name = f"CAST({param_name} AS OBJECT[])"

constraint = f"{self.DATA_COLUMN}['{key_name}'] = {key_value}"
constraints.append(constraint)
return ", ".join(constraints)
clause.columns.append(f"{self.DATA_COLUMN}['{column}']")
clause.names.append(param_name)
clause.values.append(value) # noqa: PD011
return clause

def keys_to_where(self, keys: t.Dict[str, t.Dict[str, str]]) -> str:
"""
Expand All @@ -204,14 +207,3 @@ def keys_to_where(self, keys: t.Dict[str, t.Dict[str, str]]) -> str:
constraint = f"{self.DATA_COLUMN}['{key_name}'] = '{key_value}'"
constraints.append(constraint)
return " AND ".join(constraints)

@staticmethod
def quote_table_name(name: str):
"""
Poor man's table quoting.
TODO: Better use or vendorize canonical table quoting function from CrateDB Toolkit, when applicable.
"""
if '"' not in name:
name = f'"{name}"'
return name
Loading

0 comments on commit c9e195c

Please sign in to comment.