Skip to content

Commit

Permalink
DynamoDB: Improve to_sql() to accept list of records, for bulk support
Browse files Browse the repository at this point in the history
  • Loading branch information
amotl committed Sep 1, 2024
1 parent d621aaa commit d8e0677
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 9 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
`quote_relation_name` from `sqlalchemy-cratedb` package.
- DynamoDB: Add special decoding for varied lists, storing them into a separate
`OBJECT(IGNORED)` column in CrateDB
- DynamoDB: Improve `to_sql()` to accept list of records

## 2024/08/27 v0.0.13
- DMS/DynamoDB: Use parameterized SQL WHERE clauses instead of inlining values
Expand Down
3 changes: 3 additions & 0 deletions src/commons_codec/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,3 +139,6 @@ class DualRecord:

typed: t.Dict[str, t.Any]
untyped: t.Dict[str, t.Any]

def to_dict(self):
return {"typed": self.typed, "untyped": self.untyped}
12 changes: 8 additions & 4 deletions src/commons_codec/transform/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
# Inhibit Rounded Exceptions
DYNAMODB_CONTEXT.traps[decimal.Rounded] = False

RecordType = t.Dict[str, t.Any]


class CrateDBTypeDeserializer(TypeDeserializer):
def _deserialize_n(self, value):
Expand Down Expand Up @@ -132,13 +134,15 @@ def decode_record(self, item: t.Dict[str, t.Any]) -> DualRecord:


class DynamoDBFullLoadTranslator(DynamoTranslatorBase):
def to_sql(self, record: t.Dict[str, t.Any]) -> SQLOperation:
def to_sql(self, data: t.Union[RecordType, t.List[RecordType]]) -> SQLOperation:
"""
Produce INSERT|UPDATE|DELETE SQL statement from INSERT|MODIFY|REMOVE CDC event record.
Produce INSERT SQL operations (SQL statement and parameters) from DynamoDB record(s).
"""
dual_record = self.decode_record(record)
sql = f"INSERT INTO {self.table_name} ({self.TYPED_COLUMN}, {self.UNTYPED_COLUMN}) VALUES (:typed, :untyped);"
return SQLOperation(sql, {"typed": dual_record.typed, "untyped": dual_record.untyped})
if not isinstance(data, list):
data = [data]
parameters = [self.decode_record(record).to_dict() for record in data]
return SQLOperation(sql, parameters)


class DynamoDBCDCTranslator(DynamoTranslatorBase):
Expand Down
12 changes: 7 additions & 5 deletions tests/transform/test_dynamodb_full.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,12 @@ def test_to_sql_operation():
"""
assert DynamoDBFullLoadTranslator(table_name="foo").to_sql(RECORD_IN) == SQLOperation(
statement="INSERT INTO foo (data, aux) VALUES (:typed, :untyped);",
parameters={
"typed": RECORD_OUT_DATA,
"untyped": RECORD_OUT_AUX,
},
parameters=[
{
"typed": RECORD_OUT_DATA,
"untyped": RECORD_OUT_AUX,
}
],
)


Expand All @@ -103,7 +105,7 @@ def test_to_sql_cratedb(caplog, cratedb):

# Compute CrateDB operation (SQL+parameters) from DynamoDB record.
translator = DynamoDBFullLoadTranslator(table_name="from.dynamodb")
operation = translator.to_sql(record=RECORD_IN)
operation = translator.to_sql(RECORD_IN)

# Insert into CrateDB.
cratedb.database.run_sql(translator.sql_ddl)
Expand Down

0 comments on commit d8e0677

Please sign in to comment.