diff --git a/posthog/temporal/workflows/redshift_batch_export.py b/posthog/temporal/workflows/redshift_batch_export.py index eab68d5eeb2aeb..06843289aee5e9 100644 --- a/posthog/temporal/workflows/redshift_batch_export.py +++ b/posthog/temporal/workflows/redshift_batch_export.py @@ -66,17 +66,14 @@ async def insert_records_to_redshift( fields=sql.SQL(", ").join(map(sql.Identifier, columns)), ) template = sql.SQL("({})").format(sql.SQL(", ").join(map(sql.Placeholder, columns))) - - redshift_connection.cursor_factory = psycopg.AsyncClientCursor + rows_exported = get_rows_exported_metric() async with async_client_cursor_from_connection(redshift_connection) as cursor: batch = [pre_query.as_string(cursor).encode("utf-8")] - rows_exported = get_rows_exported_metric() - async def flush_to_redshift(batch): await cursor.execute(b"".join(batch)) - rows_exported.add(len(batch)) + rows_exported.add(len(batch) - 1) # It would be nice to record BYTES_EXPORTED for Redshift, but it's not worth estimating # the byte size of each batch the way things are currently written. We can revisit this # in the future if we decide it's useful enough. @@ -88,7 +85,6 @@ async def flush_to_redshift(batch): batch.append(b",") continue - if len(batch) > 0: await flush_to_redshift(batch) batch = [pre_query.as_string(cursor).encode("utf-8")]