Skip to content

Commit

Permalink
fix: Redshift batch export issues (#18697)
Browse files Browse the repository at this point in the history
* fix: Redshift batch size matching

* fix: Do not export elements
  • Loading branch information
tomasfarias authored Nov 16, 2023
1 parent de6bf8e commit 55356b4
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,9 @@ async def assert_events_in_redshift(connection, schema, table_name, events, excl

raw_properties = event.get("properties", None)
properties = remove_escaped_whitespace_recursive(raw_properties) if raw_properties else None
elements_chain = event.get("elements_chain", None)
expected_event = {
"distinct_id": event.get("distinct_id"),
"elements": json.dumps(elements_chain) if elements_chain else None,
"elements": "",
"event": event_name,
"ip": properties.get("$ip", None) if properties else None,
"properties": json.dumps(properties, ensure_ascii=False) if properties else None,
Expand Down
15 changes: 8 additions & 7 deletions posthog/temporal/workflows/redshift_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,27 +129,26 @@ async def insert_records_to_redshift(
rows_exported = get_rows_exported_metric()

async with async_client_cursor_from_connection(redshift_connection) as cursor:
batch = [pre_query.as_string(cursor).encode("utf-8")]
batch = []
pre_query_str = pre_query.as_string(cursor).encode("utf-8")

async def flush_to_redshift(batch):
await cursor.execute(b"".join(batch))
await cursor.execute(pre_query_str + b",".join(batch))
rows_exported.add(len(batch) - 1)
# It would be nice to record BYTES_EXPORTED for Redshift, but it's not worth estimating
# the byte size of each batch the way things are currently written. We can revisit this
# in the future if we decide it's useful enough.

for record in itertools.chain([first_record], records):
batch.append(cursor.mogrify(template, record).encode("utf-8"))

if len(batch) < batch_size:
batch.append(b",")
continue

await flush_to_redshift(batch)
batch = [pre_query.as_string(cursor).encode("utf-8")]
batch = []

if len(batch) > 0:
await flush_to_redshift(batch[:-1])
await flush_to_redshift(batch)


@contextlib.asynccontextmanager
Expand Down Expand Up @@ -278,12 +277,14 @@ async def insert_into_redshift_activity(inputs: RedshiftInsertInputs):

def map_to_record(row: dict) -> dict:
"""Map row to a record to insert to Redshift."""
return {
record = {
key: json.dumps(remove_escaped_whitespace_recursive(row[key]), ensure_ascii=False)
if key in json_columns and row[key] is not None
else row[key]
for key in schema_columns
}
record["elements"] = ""
return record

async with postgres_connection(inputs) as connection:
await insert_records_to_redshift(
Expand Down

0 comments on commit 55356b4

Please sign in to comment.