Skip to content

Commit

Permalink
DMS/DynamoDB: Use parameterized SQL WHERE clauses instead of inlining
Browse files Browse the repository at this point in the history
  • Loading branch information
Andreas Motl authored and amotl committed Aug 27, 2024
1 parent 2ddef5e commit 19a2775
Show file tree
Hide file tree
Showing 7 changed files with 102 additions and 80 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Changelog

## Unreleased
- DMS/DynamoDB: Use parameterized SQL WHERE clauses instead of inlining values

## 2024/08/26 v0.0.12
- DMS/DynamoDB/MongoDB: Use SQL with parameters instead of inlining values
Expand Down
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,8 @@ lint.extend-ignore = [
"RET505",
# Probable insecure usage of temporary file or directory
"S108",
# Possible SQL injection vector through string-based query construction
"S608",
]

lint.per-file-ignores."examples/*" = [
Expand Down
37 changes: 31 additions & 6 deletions src/commons_codec/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,38 @@ class SQLParameterizedClause:
Manage details about a SQL parameterized clause, including column names, parameter names, and values.
"""

columns: t.List[str] = Factory(list)
names: t.List[str] = Factory(list)
values: t.List[str] = Factory(list)
lvals: t.List[str] = Factory(list)
rvals: t.List[str] = Factory(list)
values: t.Dict[str, t.Any] = Factory(dict)

def add(self, lval: str, value: t.Any, name: str, rval: str = None):
self.lvals.append(lval)
if rval is None:
self.rvals.append(f":{name}")
else:
self.rvals.append(rval)
self.values[name] = value

def render(self, delimiter: str) -> str:
"""
Render a clause of an SQL statement.
"""
return delimiter.join([f"{lval}={rval}" for lval, rval in zip(self.lvals, self.rvals)])

@property
def set_clause(self):

@define
class SQLParameterizedSetClause(SQLParameterizedClause):
def to_sql(self):
"""
Render a SET clause of an SQL statement.
"""
return ", ".join([f"{key}={value}" for key, value in zip(self.columns, self.names)])
return self.render(", ")


@define
class SQLParameterizedWhereClause(SQLParameterizedClause):
def to_sql(self):
"""
Render a WHERE clause of an SQL statement.
"""
return self.render(" AND ")
67 changes: 29 additions & 38 deletions src/commons_codec/transform/aws_dms.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
# Copyright (c) 2023-2024, The Kotori Developers and contributors.
# Copyright (c) 2021-2024, Crate.io Inc.
# Distributed under the terms of the LGPLv3 license, see LICENSE.

# ruff: noqa: S608 FIXME: Possible SQL injection vector through string-based query construction

import logging
import typing as t

Expand All @@ -14,7 +12,8 @@
ColumnTypeMapStore,
PrimaryKeyStore,
SQLOperation,
SQLParameterizedClause,
SQLParameterizedSetClause,
SQLParameterizedWhereClause,
TableAddress,
)

Expand All @@ -31,15 +30,15 @@ class DMSTranslatorCrateDBRecord:

def __init__(
self,
record: t.Dict[str, t.Any],
event: t.Dict[str, t.Any],
container: "DMSTranslatorCrateDB",
):
self.record = record
self.event = event
self.container = container

self.metadata: t.Dict[str, t.Any] = self.record.get("metadata", {})
self.control: t.Dict[str, t.Any] = self.record.get("control", {})
self.data: t.Dict[str, t.Any] = self.record.get("data", {})
self.metadata: t.Dict[str, t.Any] = self.event.get("metadata", {})
self.control: t.Dict[str, t.Any] = self.event.get("control", {})
self.data: t.Dict[str, t.Any] = self.event.get("data", {})

self.operation: t.Union[str, None] = self.metadata.get("operation")

Expand Down Expand Up @@ -82,30 +81,31 @@ def to_sql(self) -> SQLOperation:
return SQLOperation(f"CREATE TABLE IF NOT EXISTS {self.address.fqn} ({self.DATA_COLUMN} OBJECT(DYNAMIC));")

elif self.operation in ["load", "insert"]:
record = self.record_to_values()
self.decode_data()
sql = f"INSERT INTO {self.address.fqn} ({self.DATA_COLUMN}) VALUES (:record);"
parameters = {"record": record}
parameters = {"record": self.data}

elif self.operation == "update":
record = self.record_to_values()
clause = self.update_clause()
self.decode_data()
set_clause = self.update_clause()
where_clause = self.keys_to_where()
sql = f"UPDATE {self.address.fqn} SET {clause.set_clause} WHERE {where_clause};"
parameters = {"record": record}
sql = f"UPDATE {self.address.fqn} SET {set_clause.to_sql()} WHERE {where_clause.to_sql()};"
parameters = set_clause.values # noqa: PD011
parameters.update(where_clause.values)

elif self.operation == "delete":
where_clause = self.keys_to_where()
sql = f"DELETE FROM {self.address.fqn} WHERE {where_clause};"
parameters = None
sql = f"DELETE FROM {self.address.fqn} WHERE {where_clause.to_sql()};"
parameters = where_clause.values # noqa: PD011

else:
message = f"Unknown CDC event operation: {self.operation}"
logger.warning(message)
raise UnknownOperationError(message, operation=self.operation, record=self.record)
raise UnknownOperationError(message, operation=self.operation, record=self.event)

return SQLOperation(sql, parameters)

def update_clause(self) -> SQLParameterizedClause:
def update_clause(self) -> SQLParameterizedSetClause:
"""
Serializes an image to a comma-separated list of column/values pairs
that can be used in the `SET` clause of an `UPDATE` statement.
Expand All @@ -117,21 +117,15 @@ def update_clause(self) -> SQLParameterizedClause:
OUT
data['age'] = '33', data['attributes'] = '{"foo": "bar"}', data['name'] = 'John'
"""
clause = SQLParameterizedClause()
for column, value in self.record["data"].items():
# Skip primary key columns, they cannot be updated
clause = SQLParameterizedSetClause()
for column, value in self.event["data"].items():
# Skip primary key columns, they cannot be updated.
if column in self.primary_keys:
continue

param_name = f":{column}"

clause.columns.append(f"{self.DATA_COLUMN}['{column}']")
clause.names.append(param_name)
clause.values.append(value) # noqa: PD011

clause.add(lval=f"{self.DATA_COLUMN}['{column}']", value=value, name=column)
return clause

def record_to_values(self) -> t.Dict[str, t.Any]:
def decode_data(self):
"""
Apply type translations to record, and serialize to JSON.
Expand All @@ -148,21 +142,18 @@ def record_to_values(self) -> t.Dict[str, t.Any]:
if column_type is ColumnType.MAP and isinstance(value, str):
value = json.loads(value)
self.data[column_name] = value
return self.data

def keys_to_where(self) -> str:
def keys_to_where(self) -> SQLParameterizedWhereClause:
"""
Produce an SQL WHERE clause based on primary key definition and current record's data.
"""
if not self.primary_keys:
raise ValueError("Unable to invoke DML operation without primary key information")
constraints: t.List[str] = []
clause = SQLParameterizedWhereClause()
for key_name in self.primary_keys:
key_value = self.data.get(key_name)
# FIXME: Does the quoting of the value on the right hand side need to take the data type into account?
constraint = f"{self.DATA_COLUMN}['{key_name}'] = '{key_value}'"
constraints.append(constraint)
return " AND ".join(constraints)
clause.add(lval=f"{self.DATA_COLUMN}['{key_name}']", value=key_value, name=key_name)
return clause


class DMSTranslatorCrateDB:
Expand All @@ -188,5 +179,5 @@ def to_sql(self, record: t.Dict[str, t.Any]) -> SQLOperation:
"""
Produce INSERT|UPDATE|DELETE SQL statement from load|insert|update|delete CDC event record.
"""
record_decoded = DMSTranslatorCrateDBRecord(record=record, container=self)
record_decoded = DMSTranslatorCrateDBRecord(event=record, container=self)
return record_decoded.to_sql()
54 changes: 25 additions & 29 deletions src/commons_codec/transform/dynamodb.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
# Copyright (c) 2023-2024, The Kotori Developers and contributors.
# Copyright (c) 2021-2024, Crate.io Inc.
# Distributed under the terms of the LGPLv3 license, see LICENSE.
import decimal

# ruff: noqa: S608 FIXME: Possible SQL injection vector through string-based query construction
import logging
import typing as t

import toolz

from commons_codec.model import SQLOperation, SQLParameterizedClause
from commons_codec.model import (
SQLOperation,
SQLParameterizedSetClause,
SQLParameterizedWhereClause,
)
from commons_codec.vendor.boto3.dynamodb.types import DYNAMODB_CONTEXT, TypeDeserializer

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -142,23 +144,24 @@ def to_sql(self, event: t.Dict[str, t.Any]) -> SQLOperation:
del new_image[key]

record = self.decode_record(event["dynamodb"]["NewImage"])
clause = self.update_clause(record)
set_clause = self.update_clause(record)

where_clause = self.keys_to_where(event["dynamodb"]["Keys"])
sql = f"UPDATE {self.table_name} SET {clause.set_clause} WHERE {where_clause};"
parameters = record
sql = f"UPDATE {self.table_name} SET {set_clause.to_sql()} WHERE {where_clause.to_sql()};"
parameters = set_clause.values # noqa: PD011
parameters.update(where_clause.values)

elif event_name == "REMOVE":
where_clause = self.keys_to_where(event["dynamodb"]["Keys"])
sql = f"DELETE FROM {self.table_name} WHERE {where_clause};"
parameters = None
sql = f"DELETE FROM {self.table_name} WHERE {where_clause.to_sql()};"
parameters = where_clause.values # noqa: PD011

else:
raise ValueError(f"Unknown CDC event name: {event_name}")

return SQLOperation(sql, parameters)

def update_clause(self, record: t.Dict[str, t.Any]) -> SQLParameterizedClause:
def update_clause(self, record: t.Dict[str, t.Any]) -> SQLParameterizedSetClause:
"""
Serializes an image to a comma-separated list of column/values pairs
that can be used in the `SET` clause of an `UPDATE` statement.
Expand All @@ -170,24 +173,19 @@ def update_clause(self, record: t.Dict[str, t.Any]) -> SQLParameterizedClause:
data['humidity'] = '84.84', data['temperature'] = '55.66'
"""

clause = SQLParameterizedClause()
clause = SQLParameterizedSetClause()
for column, value in record.items():
param_name = f":{column}"
if value is None:
value = "NULL"

elif isinstance(value, dict):
param_name = f"CAST({param_name} AS OBJECT)"
rval = None
if isinstance(value, dict):
rval = f"CAST(:{column} AS OBJECT)"

elif isinstance(value, list) and value and isinstance(value[0], dict):
param_name = f"CAST({param_name} AS OBJECT[])"
rval = f"CAST(:{column} AS OBJECT[])"

clause.columns.append(f"{self.DATA_COLUMN}['{column}']")
clause.names.append(param_name)
clause.values.append(value) # noqa: PD011
clause.add(lval=f"{self.DATA_COLUMN}['{column}']", name=column, value=value, rval=rval)
return clause

def keys_to_where(self, keys: t.Dict[str, t.Dict[str, str]]) -> str:
def keys_to_where(self, keys: t.Dict[str, t.Dict[str, str]]) -> SQLParameterizedWhereClause:
"""
Serialize CDC event's "Keys" representation to an SQL `WHERE` clause in CrateDB SQL syntax.
Expand All @@ -200,10 +198,8 @@ def keys_to_where(self, keys: t.Dict[str, t.Dict[str, str]]) -> str:
OUT:
WHERE data['device'] = 'foo' AND data['timestamp'] = '2024-07-12T01:17:42'
"""
constraints: t.List[str] = []
for key_name, key_value_raw in keys.items():
key_value = self.deserializer.deserialize(key_value_raw)
# FIXME: Does the quoting of the value on the right hand side need to take the data type into account?
constraint = f"{self.DATA_COLUMN}['{key_name}'] = '{key_value}'"
constraints.append(constraint)
return " AND ".join(constraints)
keys = self.decode_record(keys)
clause = SQLParameterizedWhereClause()
for key_name, key_value in keys.items():
clause.add(lval=f"{self.DATA_COLUMN}['{key_name}']", name=key_name, value=key_value)
return clause
6 changes: 3 additions & 3 deletions tests/transform/test_aws_dms.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,8 @@ def test_decode_cdc_update_success(cdc):
assert cdc.to_sql(MSG_DATA_UPDATE_VALUE) == SQLOperation(
statement='UPDATE "public"."foo" SET '
"data['age']=:age, data['attributes']=:attributes, data['name']=:name "
"WHERE data['id'] = '42';",
parameters={"record": RECORD_UPDATE},
"WHERE data['id']=:id;",
parameters=RECORD_UPDATE,
)


Expand All @@ -267,7 +267,7 @@ def test_decode_cdc_delete_success(cdc):

# Emulate a DELETE operation.
assert cdc.to_sql(MSG_DATA_DELETE) == SQLOperation(
statement="DELETE FROM \"public\".\"foo\" WHERE data['id'] = '45';", parameters=None
statement='DELETE FROM "public"."foo" WHERE data[\'id\']=:id;', parameters={"id": 45}
)


Expand Down
15 changes: 11 additions & 4 deletions tests/transform/test_dynamodb_cdc.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,10 @@ def test_decode_cdc_modify_basic():
"data['humidity']=:humidity, data['temperature']=:temperature, data['location']=:location, "
"data['string_set']=:string_set, data['number_set']=:number_set, data['binary_set']=:binary_set, "
"data['empty_string']=:empty_string, data['null_string']=:null_string "
"WHERE data['device'] = 'foo' AND data['timestamp'] = '2024-07-12T01:17:42';",
"WHERE data['device']=:device AND data['timestamp']=:timestamp;",
parameters={
"device": "foo",
"timestamp": "2024-07-12T01:17:42",
"humidity": 84.84,
"temperature": 55.66,
"location": "Sydney",
Expand All @@ -254,8 +256,10 @@ def test_decode_cdc_modify_nested():
"data['tags']=:tags, data['empty_map']=CAST(:empty_map AS OBJECT), data['empty_list']=:empty_list, "
"data['string_set']=:string_set, data['number_set']=:number_set, data['binary_set']=:binary_set, "
"data['somemap']=CAST(:somemap AS OBJECT), data['list_of_objects']=CAST(:list_of_objects AS OBJECT[]) "
"WHERE data['device'] = 'foo' AND data['timestamp'] = '2024-07-12T01:17:42';",
"WHERE data['device']=:device AND data['timestamp']=:timestamp;",
parameters={
"device": "foo",
"timestamp": "2024-07-12T01:17:42",
"tags": ["foo", "bar"],
"empty_map": {},
"empty_list": [],
Expand All @@ -270,8 +274,11 @@ def test_decode_cdc_modify_nested():

def test_decode_cdc_remove():
assert DynamoDBCDCTranslator(table_name="foo").to_sql(MSG_REMOVE) == SQLOperation(
statement="DELETE FROM \"foo\" WHERE data['device'] = 'bar' AND data['timestamp'] = '2024-07-12T01:17:42';",
parameters=None,
statement="DELETE FROM \"foo\" WHERE data['device']=:device AND data['timestamp']=:timestamp;",
parameters={
"device": "bar",
"timestamp": "2024-07-12T01:17:42",
},
)


Expand Down

0 comments on commit 19a2775

Please sign in to comment.