Skip to content

Commit

Permalink
fix: More Redshift whitespace handling (#18688)
Browse files Browse the repository at this point in the history
  • Loading branch information
tomasfarias authored Nov 16, 2023
1 parent 07c50e7 commit 0735621
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
RedshiftBatchExportWorkflow,
RedshiftInsertInputs,
insert_into_redshift_activity,
translate_whitespace_recursive,
remove_escaped_whitespace_recursive,
)

REQUIRED_ENV_VARS = (
Expand Down Expand Up @@ -65,7 +65,7 @@ async def assert_events_in_redshift(connection, schema, table_name, events, excl
continue

raw_properties = event.get("properties", None)
properties = translate_whitespace_recursive(raw_properties) if raw_properties else None
properties = remove_escaped_whitespace_recursive(raw_properties) if raw_properties else None
elements_chain = event.get("elements_chain", None)
expected_event = {
"distinct_id": event.get("distinct_id"),
Expand Down Expand Up @@ -116,7 +116,7 @@ def redshift_config():
return {
"user": user,
"password": password,
"database": "posthog_batch_exports_test",
"database": "posthog_batch_exports_test_2",
"schema": "exports_test_schema",
"host": host,
"port": int(port),
Expand Down Expand Up @@ -362,14 +362,14 @@ async def test_redshift_export_workflow(
"value,expected",
[
([1, 2, 3], [1, 2, 3]),
("\n\n", ""),
([["\t\n"]], [[""]]),
(("\t\n",), ("",)),
({"\t\n"}, {""}),
({"key": "\t\n"}, {"key": ""}),
({"key": ["\t\n"]}, {"key": [""]}),
("hi\t\n\r\f\bhi", "hi hi"),
([["\t\n\r\f\b"]], [[""]]),
(("\t\n\r\f\b",), ("",)),
({"\t\n\r\f\b"}, {""}),
({"key": "\t\n\r\f\b"}, {"key": ""}),
({"key": ["\t\n\r\f\b"]}, {"key": [""]}),
],
)
def test_translate_whitespace_recursive(value, expected):
"""Test we translate some whitespace values."""
assert translate_whitespace_recursive(value) == expected
def test_remove_escaped_whitespace_recursive(value, expected):
"""Test we remove some whitespace values."""
assert remove_escaped_whitespace_recursive(value) == expected
29 changes: 8 additions & 21 deletions posthog/temporal/workflows/redshift_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,9 @@
postgres_connection,
)

WHITESPACE_TRANSLATE = str.maketrans(
{
whitespace_char: None
for whitespace_char in (
"\n",
"\t",
"\f",
"\r",
"\b",
)
}
)


def translate_whitespace_recursive(value):
"""Translate all whitespace from given value using WHITESPACE_TRANSLATE.
def remove_escaped_whitespace_recursive(value):
"""Remove all escaped whitespace characters from given value.
PostgreSQL supports constant escaped strings by appending an E' to each string that
contains whitespace in them (amongst other characters). See:
Expand All @@ -61,22 +48,22 @@ def translate_whitespace_recursive(value):
"""
match value:
case str(s):
return s.translate(WHITESPACE_TRANSLATE)
return " ".join(s.replace("\b", " ").split())

case bytes(b):
return b.decode("utf-8").translate(WHITESPACE_TRANSLATE).encode("utf-8")
return remove_escaped_whitespace_recursive(b.decode("utf-8"))

case [*sequence]:
# mypy could be bugged as it's raising a Statement unreachable error.
# But we are definitely reaching this statement in tests; hence the ignore comment.
# Maybe: https://github.com/python/mypy/issues/16272.
return type(value)(translate_whitespace_recursive(sequence_value) for sequence_value in sequence) # type: ignore
return type(value)(remove_escaped_whitespace_recursive(sequence_value) for sequence_value in sequence) # type: ignore

case set(elements):
return set(translate_whitespace_recursive(element) for element in elements)
return set(remove_escaped_whitespace_recursive(element) for element in elements)

case {**mapping}:
return {k: translate_whitespace_recursive(v) for k, v in mapping.items()}
return {k: remove_escaped_whitespace_recursive(v) for k, v in mapping.items()}

case value:
return value
Expand Down Expand Up @@ -292,7 +279,7 @@ async def insert_into_redshift_activity(inputs: RedshiftInsertInputs):
def map_to_record(row: dict) -> dict:
"""Map row to a record to insert to Redshift."""
return {
key: json.dumps(translate_whitespace_recursive(row[key]))
key: json.dumps(remove_escaped_whitespace_recursive(row[key]))
if key in json_columns and row[key] is not None
else row[key]
for key in schema_columns
Expand Down

0 comments on commit 0735621

Please sign in to comment.