Skip to content

Commit

Permalink
fix: Main insert batch loop
Browse files Browse the repository at this point in the history
  • Loading branch information
tomasfarias committed Nov 14, 2023
1 parent fdb5b6b commit 5f78c22
Showing 1 changed file with 2 additions and 6 deletions.
8 changes: 2 additions & 6 deletions posthog/temporal/workflows/redshift_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,17 +66,14 @@ async def insert_records_to_redshift(
fields=sql.SQL(", ").join(map(sql.Identifier, columns)),
)
template = sql.SQL("({})").format(sql.SQL(", ").join(map(sql.Placeholder, columns)))

redshift_connection.cursor_factory = psycopg.AsyncClientCursor
rows_exported = get_rows_exported_metric()

async with async_client_cursor_from_connection(redshift_connection) as cursor:
batch = [pre_query.as_string(cursor).encode("utf-8")]

rows_exported = get_rows_exported_metric()

async def flush_to_redshift(batch):
await cursor.execute(b"".join(batch))
rows_exported.add(len(batch))
rows_exported.add(len(batch) - 1)
# It would be nice to record BYTES_EXPORTED for Redshift, but it's not worth estimating
# the byte size of each batch the way things are currently written. We can revisit this
# in the future if we decide it's useful enough.
Expand All @@ -88,7 +85,6 @@ async def flush_to_redshift(batch):
batch.append(b",")
continue

if len(batch) > 0:
await flush_to_redshift(batch)
batch = [pre_query.as_string(cursor).encode("utf-8")]

Expand Down

0 comments on commit 5f78c22

Please sign in to comment.