Skip to content

Commit

Permalink
DynamoDB: Skip primary key columns in SET clauses for UPDATE stat…
Browse files Browse the repository at this point in the history
…ements

CrateDB does not allow changing primary key columns,
even when setting them to their current value.
  • Loading branch information
hammerhead authored and amotl committed Aug 16, 2024
1 parent 4fdde50 commit 74d1738
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 4 deletions.
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

## Unreleased

- Changed `UPDATE` statements from DynamoDB not to write the entire `data`
column. This allows defining primary keys on the sink table.

## 2024/08/14 v0.0.4
- Added `BucketTransformation`, a minimal transformation engine
based on JSON Pointer (RFC 6901).
Expand Down
31 changes: 29 additions & 2 deletions src/commons_codec/transform/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,17 @@ def to_sql(self, record: t.Dict[str, t.Any]) -> str:
sql = f"INSERT INTO {self.table_name} " f"({self.DATA_COLUMN}) " f"VALUES ('{values_clause}');"

elif event_name == "MODIFY":
values_clause = self.image_to_values(record["dynamodb"]["NewImage"])
new_image_cleaned = record["dynamodb"]["NewImage"]
# Drop primary key columns to not update them.
# Primary key values should be identical (if chosen identical in DynamoDB and CrateDB),
# but CrateDB does not allow having themin an UPDATE's SET clause.
for key in record["dynamodb"]["Keys"]:
del new_image_cleaned[key]

values_clause = self.values_to_update(new_image_cleaned)

where_clause = self.keys_to_where(record["dynamodb"]["Keys"])
sql = f"UPDATE {self.table_name} " f"SET {self.DATA_COLUMN} = '{values_clause}' " f"WHERE {where_clause};"
sql = f"UPDATE {self.table_name} " f"SET {values_clause} " f"WHERE {where_clause};"

elif event_name == "REMOVE":
where_clause = self.keys_to_where(record["dynamodb"]["Keys"])
Expand Down Expand Up @@ -122,6 +130,25 @@ def image_to_values(self, image: t.Dict[str, t.Any]) -> str:
"""
return json.dumps(self.deserialize_item(image))

def values_to_update(self, keys: t.Dict[str, t.Dict[str, str]]) -> str:
"""
Serializes an image to a comma-separated list of column/values pairs
that can be used in the `SET` clause of an `UPDATE` statement.
IN:
{'humidity': {'N': '84.84'}, 'temperature': {'N': '55.66'}}
OUT:
data['humidity] = 84.84, temperature = 55.66
"""
values_clause = self.deserialize_item(keys)

constraints: t.List[str] = []
for key_name, key_value in values_clause.items():
constraint = f"{self.DATA_COLUMN}['{key_name}'] = {key_value}"
constraints.append(constraint)
return ", ".join(constraints)

def keys_to_where(self, keys: t.Dict[str, t.Dict[str, str]]) -> str:
"""
Serialize CDC event's "Keys" representation to an SQL `WHERE` clause in CrateDB SQL syntax.
Expand Down
3 changes: 1 addition & 2 deletions tests/transform/test_dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,7 @@ def test_decode_cdc_insert_nested():
def test_decode_cdc_modify():
assert (
DynamoCDCTranslatorCrateDB(table_name="foo").to_sql(MSG_MODIFY) == 'UPDATE "foo" '
'SET data = \'{"humidity": 84.84, "temperature": 55.66, '
'"device": "bar", "timestamp": "2024-07-12T01:17:42"}\' '
"SET data['humidity'] = 84.84, data['temperature'] = 55.66 "
"WHERE data['device'] = 'foo' AND data['timestamp'] = '2024-07-12T01:17:42';"
)

Expand Down

0 comments on commit 74d1738

Please sign in to comment.