From 928a99aac9c1c6febc0f4e2662406edb8dce0d30 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Thu, 24 Oct 2024 09:45:38 +0200 Subject: [PATCH] DynamoDB: Use `ON CONFLICT DO NOTHING` clause on `INSERT` CDC operations ... to mitigate errors when events are relayed redundantly from retries after partially failed batches on the Lambda processor. --- CHANGES.md | 4 ++++ src/commons_codec/transform/dynamodb.py | 3 ++- tests/transform/test_dynamodb_cdc.py | 4 ++-- 3 files changed, 8 insertions(+), 3 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 0ddf928..af1a7ae 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -2,6 +2,10 @@ ## Unreleased - DynamoDB/Testing: Use CrateDB nightly again +- DynamoDB: Use `ON CONFLICT DO NOTHING` clause on CDC operations + of type `INSERT`, to mitigate errors when events are relayed + redundantly from retries after partially failed batches on the + Lambda processor. ## 2024/10/09 v0.0.21 - MongoDB: Fixed BSON decoding of `{"$date": 1180690093000}` timestamps diff --git a/src/commons_codec/transform/dynamodb.py b/src/commons_codec/transform/dynamodb.py index 2814b78..7bd8408 100644 --- a/src/commons_codec/transform/dynamodb.py +++ b/src/commons_codec/transform/dynamodb.py @@ -201,7 +201,8 @@ def to_sql(self, event: t.Dict[str, t.Any]) -> SQLOperation: f") VALUES (" f":pk, " f":typed, " - f":untyped);" + f":untyped) " + f"ON CONFLICT DO NOTHING;" ) parameters = record.to_dict() diff --git a/tests/transform/test_dynamodb_cdc.py b/tests/transform/test_dynamodb_cdc.py index a4d7add..a75875a 100644 --- a/tests/transform/test_dynamodb_cdc.py +++ b/tests/transform/test_dynamodb_cdc.py @@ -200,7 +200,7 @@ def test_decode_cdc_unknown_event(dynamodb_cdc_translator_foo): def test_decode_cdc_insert_basic(dynamodb_cdc_translator_foo): assert dynamodb_cdc_translator_foo.to_sql(MSG_INSERT_BASIC) == SQLOperation( - statement="INSERT INTO foo (pk, data, aux) VALUES (:pk, :typed, :untyped);", + statement="INSERT INTO foo (pk, data, aux) VALUES (:pk, :typed, :untyped) ON CONFLICT DO NOTHING;", parameters={ "pk": { "id": "5F9E-Fsadd41C-4C92-A8C1-70BF3FFB9266", @@ -221,7 +221,7 @@ def test_decode_cdc_insert_basic(dynamodb_cdc_translator_foo): def test_decode_cdc_insert_nested(dynamodb_cdc_translator_foo): assert dynamodb_cdc_translator_foo.to_sql(MSG_INSERT_NESTED) == SQLOperation( - statement="INSERT INTO foo (pk, data, aux) VALUES (:pk, :typed, :untyped);", + statement="INSERT INTO foo (pk, data, aux) VALUES (:pk, :typed, :untyped) ON CONFLICT DO NOTHING;", parameters={ "pk": { "id": "5F9E-Fsadd41C-4C92-A8C1-70BF3FFB9266",