Skip to content

Commit

Permalink
DynamoDB: Use ON CONFLICT DO NOTHING clause on INSERT CDC operations
Browse files Browse the repository at this point in the history
... to mitigate errors when events are relayed redundantly from retries
after partially failed batches on the Lambda processor.
  • Loading branch information
amotl committed Oct 24, 2024
1 parent a1ffadc commit 928a99a
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 3 deletions.
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## Unreleased
- DynamoDB/Testing: Use CrateDB nightly again
- DynamoDB: Use `ON CONFLICT DO NOTHING` clause on CDC operations
of type `INSERT`, to mitigate errors when events are relayed
redundantly from retries after partially failed batches on the
Lambda processor.

## 2024/10/09 v0.0.21
- MongoDB: Fixed BSON decoding of `{"$date": 1180690093000}` timestamps
Expand Down
3 changes: 2 additions & 1 deletion src/commons_codec/transform/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,8 @@ def to_sql(self, event: t.Dict[str, t.Any]) -> SQLOperation:
f") VALUES ("
f":pk, "
f":typed, "
f":untyped);"
f":untyped) "
f"ON CONFLICT DO NOTHING;"
)
parameters = record.to_dict()

Expand Down
4 changes: 2 additions & 2 deletions tests/transform/test_dynamodb_cdc.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ def test_decode_cdc_unknown_event(dynamodb_cdc_translator_foo):

def test_decode_cdc_insert_basic(dynamodb_cdc_translator_foo):
assert dynamodb_cdc_translator_foo.to_sql(MSG_INSERT_BASIC) == SQLOperation(
statement="INSERT INTO foo (pk, data, aux) VALUES (:pk, :typed, :untyped);",
statement="INSERT INTO foo (pk, data, aux) VALUES (:pk, :typed, :untyped) ON CONFLICT DO NOTHING;",
parameters={
"pk": {
"id": "5F9E-Fsadd41C-4C92-A8C1-70BF3FFB9266",
Expand All @@ -221,7 +221,7 @@ def test_decode_cdc_insert_basic(dynamodb_cdc_translator_foo):

def test_decode_cdc_insert_nested(dynamodb_cdc_translator_foo):
assert dynamodb_cdc_translator_foo.to_sql(MSG_INSERT_NESTED) == SQLOperation(
statement="INSERT INTO foo (pk, data, aux) VALUES (:pk, :typed, :untyped);",
statement="INSERT INTO foo (pk, data, aux) VALUES (:pk, :typed, :untyped) ON CONFLICT DO NOTHING;",
parameters={
"pk": {
"id": "5F9E-Fsadd41C-4C92-A8C1-70BF3FFB9266",
Expand Down

0 comments on commit 928a99a

Please sign in to comment.