-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
DMS/DynamoDB/MongoDB: Use SQL with parameters instead of inlining values #243
Conversation
bdb0428
to
ed5a465
Compare
tests/io/test_processor.py
Outdated
# Define two CDC events: INSERT and UPDATE. | ||
# They have to be conveyed separately because CrateDB needs a | ||
# `REFRESH TABLE` operation between them. | ||
event1 = { | ||
"Records": [ | ||
wrap_kinesis(DYNAMODB_CDC_INSERT_NESTED), | ||
] | ||
} | ||
event2 = { | ||
"Records": [ | ||
wrap_kinesis(DYNAMODB_CDC_MODIFY_NESTED), | ||
] | ||
} | ||
|
||
# Run transfer command. | ||
handler(event1, None) | ||
cratedb.database.refresh_table(table_name) | ||
handler(event2, None) | ||
cratedb.database.refresh_table(table_name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By adding and exercising this test case, I observed that an INSERT operation followed by an UPDATE operation on the same document/record indeed requires a REFRESH operation in between.
Before, the test case processed both CDC events in direct succession, and the outcome in CrateDB lacks corresponding fields added by the second operation (UPDATE), which adjusts data in the same record (CrateDB's data
column).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This section has been amended in favor of adding the REFRESH TABLE operation to the handler itself. See below.
# Process record. | ||
sql = cdc.to_sql(record_data) | ||
connection.execute(sa.text(sql)) | ||
operation = cdc.to_sql(record_data) | ||
connection.execute(sa.text(operation.statement), parameters=operation.parameters) | ||
|
||
# Processing alternating CDC events requires write synchronization. | ||
# FIXME: Needs proper table name quoting. | ||
connection.execute(sa.text(f"REFRESH TABLE {CRATEDB_TABLE}")) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hereby, I am adding a REFRESH TABLE operation again. Please share your objections if you have any.
/cc @wierdvanderhaar, @hlcianfagna, @hammerhead, @proddata, @widmogrod
pyproject.toml
Outdated
@@ -139,7 +139,7 @@ docs = [ | |||
] | |||
dynamodb = [ | |||
"boto3", | |||
"commons-codec", | |||
"commons-codec @ git+https://github.com/crate/commons-codec.git@sql-with-params", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure enough, this and other relevant spots need to be amended after running another release of commons-codec.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
commons-codec v0.0.12 has been released.
16df9f6
to
52bcc50
Compare
- `SQLOperation` bundles data about an SQL operation, including statement and parameters. - Also, refactor `DynamoDBFullLoadTranslator` to `commons-codec`.
52bcc50
to
8436e6b
Compare
About
This patch adjusts for an improvement to commons-codec where the
SQLOperation
class has been introduced to bundle together an SQL statement and its parameters.References