From cd5f8efebda6c6b2417e728b62c713f6139d9d11 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Thu, 16 Nov 2023 23:13:34 +0100 Subject: [PATCH 1/2] fix: Redshift batch size matching --- posthog/temporal/workflows/redshift_batch_export.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/posthog/temporal/workflows/redshift_batch_export.py b/posthog/temporal/workflows/redshift_batch_export.py index bbe42ef4890b6..78f5741c3c68f 100644 --- a/posthog/temporal/workflows/redshift_batch_export.py +++ b/posthog/temporal/workflows/redshift_batch_export.py @@ -129,10 +129,11 @@ async def insert_records_to_redshift( rows_exported = get_rows_exported_metric() async with async_client_cursor_from_connection(redshift_connection) as cursor: - batch = [pre_query.as_string(cursor).encode("utf-8")] + batch = [] + pre_query_str = pre_query.as_string(cursor).encode("utf-8") async def flush_to_redshift(batch): - await cursor.execute(b"".join(batch)) + await cursor.execute(pre_query_str + b",".join(batch)) rows_exported.add(len(batch) - 1) # It would be nice to record BYTES_EXPORTED for Redshift, but it's not worth estimating # the byte size of each batch the way things are currently written. We can revisit this @@ -142,14 +143,13 @@ async def flush_to_redshift(batch): batch.append(cursor.mogrify(template, record).encode("utf-8")) if len(batch) < batch_size: - batch.append(b",") continue await flush_to_redshift(batch) - batch = [pre_query.as_string(cursor).encode("utf-8")] + batch = [] if len(batch) > 0: - await flush_to_redshift(batch[:-1]) + await flush_to_redshift(batch) @contextlib.asynccontextmanager From 32591756e8e5bb36972db029f4fd843bde8078ce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Thu, 16 Nov 2023 23:30:39 +0100 Subject: [PATCH 2/2] fix: Do not export elements --- .../batch_exports/test_redshift_batch_export_workflow.py | 3 +-- posthog/temporal/workflows/redshift_batch_export.py | 5 +++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/posthog/temporal/tests/batch_exports/test_redshift_batch_export_workflow.py b/posthog/temporal/tests/batch_exports/test_redshift_batch_export_workflow.py index 835e8731a8fcc..cea71a458013f 100644 --- a/posthog/temporal/tests/batch_exports/test_redshift_batch_export_workflow.py +++ b/posthog/temporal/tests/batch_exports/test_redshift_batch_export_workflow.py @@ -66,10 +66,9 @@ async def assert_events_in_redshift(connection, schema, table_name, events, excl raw_properties = event.get("properties", None) properties = remove_escaped_whitespace_recursive(raw_properties) if raw_properties else None - elements_chain = event.get("elements_chain", None) expected_event = { "distinct_id": event.get("distinct_id"), - "elements": json.dumps(elements_chain) if elements_chain else None, + "elements": "", "event": event_name, "ip": properties.get("$ip", None) if properties else None, "properties": json.dumps(properties, ensure_ascii=False) if properties else None, diff --git a/posthog/temporal/workflows/redshift_batch_export.py b/posthog/temporal/workflows/redshift_batch_export.py index 78f5741c3c68f..520c8e0f4b12a 100644 --- a/posthog/temporal/workflows/redshift_batch_export.py +++ b/posthog/temporal/workflows/redshift_batch_export.py @@ -141,7 +141,6 @@ async def flush_to_redshift(batch): for record in itertools.chain([first_record], records): batch.append(cursor.mogrify(template, record).encode("utf-8")) - if len(batch) < batch_size: continue @@ -278,12 +277,14 @@ async def insert_into_redshift_activity(inputs: RedshiftInsertInputs): def map_to_record(row: dict) -> dict: """Map row to a record to insert to Redshift.""" - return { + record = { key: json.dumps(remove_escaped_whitespace_recursive(row[key]), ensure_ascii=False) if key in json_columns and row[key] is not None else row[key] for key in schema_columns } + record["elements"] = "" + return record async with postgres_connection(inputs) as connection: await insert_records_to_redshift(