From 71199faea1ec48f14cebfd85500fbb05ce29f7c9 Mon Sep 17 00:00:00 2001 From: Niklas Schmidtmer Date: Fri, 16 Aug 2024 13:58:10 +0200 Subject: [PATCH] DMS: Skip primary key columns in `SET` clauses for `UPDATE` statements CrateDB does not allow changing primary key columns, even when setting them to their current value. --- CHANGES.md | 2 ++ src/commons_codec/transform/aws_dms.py | 26 ++++++++++++++++++++++++-- tests/transform/test_aws_dms.py | 2 +- 3 files changed, 27 insertions(+), 3 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 775acb2..7e60d75 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,6 +1,8 @@ # Changelog ## Unreleased +- Changed `UPDATE` statements from DMS not to write the entire `data` + column. This allows defining primary keys on the sink table. ## 2024/08/16 v0.0.5 - Changed `UPDATE` statements from DynamoDB not to write the entire `data` diff --git a/src/commons_codec/transform/aws_dms.py b/src/commons_codec/transform/aws_dms.py index 05d63f0..b8e233d 100644 --- a/src/commons_codec/transform/aws_dms.py +++ b/src/commons_codec/transform/aws_dms.py @@ -79,9 +79,9 @@ def to_sql(self) -> str: sql = f"INSERT INTO {self.address.fqn} ({self.DATA_COLUMN}) VALUES ('{values_clause}');" elif self.operation == "update": - values_clause = self.record_to_values() + values_clause = self.record_to_update() where_clause = self.keys_to_where() - sql = f"UPDATE {self.address.fqn} SET {self.DATA_COLUMN} = '{values_clause}' WHERE {where_clause};" + sql = f"UPDATE {self.address.fqn} SET {values_clause} WHERE {where_clause};" elif self.operation == "delete": where_clause = self.keys_to_where() @@ -94,6 +94,28 @@ def to_sql(self) -> str: return sql + def record_to_update(self) -> str: + """ + Serializes an image to a comma-separated list of column/values pairs + that can be used in the `SET` clause of an `UPDATE` statement. + Primary key columns are skipped, since they cannot be updated. + + IN + {'age': 33, 'attributes': '{"foo": "bar"}', 'id': 42, 'name': 'John'} + + OUT + data['age'] = '33', data['attributes'] = '{"foo": "bar"}', data['name'] = 'John' + """ + constraints: t.List[str] = [] + for column_name, column_value in self.record['data'].items(): + # Skip primary key columns, they cannot be updated + if column_name in self.primary_keys: + continue + + constraint = f"{self.DATA_COLUMN}['{column_name}'] = '{column_value}'" + constraints.append(constraint) + return ", ".join(constraints) + def record_to_values(self) -> str: """ Apply type translations to record, and serialize to JSON. diff --git a/tests/transform/test_aws_dms.py b/tests/transform/test_aws_dms.py index ddf825e..cdec25c 100644 --- a/tests/transform/test_aws_dms.py +++ b/tests/transform/test_aws_dms.py @@ -237,7 +237,7 @@ def test_decode_cdc_update_success(cdc): # Emulate an UPDATE operation. assert ( cdc.to_sql(MSG_DATA_UPDATE_VALUE) == 'UPDATE "public"."foo" ' - f"SET data = '{json.dumps(RECORD_UPDATE)}' " + "SET data['age'] = '33', data['attributes'] = '{\"foo\": \"bar\"}', data['name'] = 'John' " "WHERE data['id'] = '42';" )