Skip to content

Commit

Permalink
DMS: Skip primary key columns in SET clauses for UPDATE statements
Browse files Browse the repository at this point in the history
CrateDB does not allow changing primary key columns,
even when setting them to their current value.
  • Loading branch information
hammerhead committed Aug 16, 2024
1 parent 3ddb9df commit 71199fa
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 3 deletions.
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Changelog

## Unreleased
- Changed `UPDATE` statements from DMS not to write the entire `data`
column. This allows defining primary keys on the sink table.

## 2024/08/16 v0.0.5
- Changed `UPDATE` statements from DynamoDB not to write the entire `data`
Expand Down
26 changes: 24 additions & 2 deletions src/commons_codec/transform/aws_dms.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,9 @@ def to_sql(self) -> str:
sql = f"INSERT INTO {self.address.fqn} ({self.DATA_COLUMN}) VALUES ('{values_clause}');"

elif self.operation == "update":
values_clause = self.record_to_values()
values_clause = self.record_to_update()
where_clause = self.keys_to_where()
sql = f"UPDATE {self.address.fqn} SET {self.DATA_COLUMN} = '{values_clause}' WHERE {where_clause};"
sql = f"UPDATE {self.address.fqn} SET {values_clause} WHERE {where_clause};"

elif self.operation == "delete":
where_clause = self.keys_to_where()
Expand All @@ -94,6 +94,28 @@ def to_sql(self) -> str:

return sql

def record_to_update(self) -> str:
"""
Serializes an image to a comma-separated list of column/values pairs
that can be used in the `SET` clause of an `UPDATE` statement.
Primary key columns are skipped, since they cannot be updated.
IN
{'age': 33, 'attributes': '{"foo": "bar"}', 'id': 42, 'name': 'John'}
OUT
data['age'] = '33', data['attributes'] = '{"foo": "bar"}', data['name'] = 'John'
"""
constraints: t.List[str] = []
for column_name, column_value in self.record['data'].items():
# Skip primary key columns, they cannot be updated
if column_name in self.primary_keys:
continue

constraint = f"{self.DATA_COLUMN}['{column_name}'] = '{column_value}'"
constraints.append(constraint)
return ", ".join(constraints)

def record_to_values(self) -> str:
"""
Apply type translations to record, and serialize to JSON.
Expand Down
2 changes: 1 addition & 1 deletion tests/transform/test_aws_dms.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ def test_decode_cdc_update_success(cdc):
# Emulate an UPDATE operation.
assert (
cdc.to_sql(MSG_DATA_UPDATE_VALUE) == 'UPDATE "public"."foo" '
f"SET data = '{json.dumps(RECORD_UPDATE)}' "
"SET data['age'] = '33', data['attributes'] = '{\"foo\": \"bar\"}', data['name'] = 'John' "
"WHERE data['id'] = '42';"
)

Expand Down

0 comments on commit 71199fa

Please sign in to comment.