Skip to content

Commit

Permalink
feat: Use a background task to set batch export status to running
Browse files Browse the repository at this point in the history
This allows us to run the task concurrently instead of timing out on
it. It also has the additional benefit of removing a lot of nesting levels!
  • Loading branch information
tomasfarias committed Jun 24, 2024
1 parent a8f5d09 commit 5bd140c
Show file tree
Hide file tree
Showing 6 changed files with 328 additions and 242 deletions.
114 changes: 90 additions & 24 deletions posthog/temporal/batch_exports/bigquery_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,9 +214,11 @@ async def insert_into_bigquery_activity(inputs: BigQueryInsertInputs) -> Records
inputs.table_id,
)

async with Heartbeater() as heartbeater:
await try_set_batch_export_run_to_running(run_id=inputs.run_id, logger=logger)

async with (
Heartbeater() as heartbeater,
get_client(team_id=inputs.team_id) as client,
set_status_to_running_task(run_id=inputs.run_id, logger=logger),
):
should_resume, details = await should_resume_from_activity_heartbeat(activity, BigQueryHeartbeatDetails, logger)

if should_resume is True and details is not None:
Expand All @@ -226,17 +228,78 @@ async def insert_into_bigquery_activity(inputs: BigQueryInsertInputs) -> Records
data_interval_start = inputs.data_interval_start
last_inserted_at = None

async with get_client(team_id=inputs.team_id) as client:
if not await client.is_alive():
raise ConnectionError("Cannot establish connection to ClickHouse")
if not await client.is_alive():
raise ConnectionError("Cannot establish connection to ClickHouse")

if inputs.batch_export_schema is None:
fields = bigquery_default_fields()
query_parameters = None

else:
fields = inputs.batch_export_schema["fields"]
query_parameters = inputs.batch_export_schema["values"]

records_iterator = iter_records(
client=client,
team_id=inputs.team_id,
interval_start=data_interval_start,
interval_end=inputs.data_interval_end,
exclude_events=inputs.exclude_events,
include_events=inputs.include_events,
fields=fields,
extra_query_parameters=query_parameters,
is_backfill=inputs.is_backfill,
)

first_record_batch, records_iterator = peek_first_and_rewind(records_iterator)
if first_record_batch is None:
return 0

bigquery_table = None
inserted_at = None

with bigquery_client(inputs) as bq_client, BatchExportTemporaryFile() as jsonl_file:
rows_exported = get_rows_exported_metric()
bytes_exported = get_bytes_exported_metric()

async def flush_to_bigquery(bigquery_table, table_schema):
logger.debug(
"Loading %s records of size %s bytes",
jsonl_file.records_since_last_reset,
jsonl_file.bytes_since_last_reset,
)
await load_jsonl_file_to_bigquery_table(jsonl_file, bigquery_table, table_schema, bq_client)

rows_exported.add(jsonl_file.records_since_last_reset)
bytes_exported.add(jsonl_file.bytes_since_last_reset)

if inputs.use_json_type is True:
json_type = "JSON"
json_columns = ["properties", "set", "set_once", "person_properties"]
else:
json_type = "STRING"
json_columns = []

if inputs.batch_export_schema is None:
fields = bigquery_default_fields()
query_parameters = None
schema = [
bigquery.SchemaField("uuid", "STRING"),
bigquery.SchemaField("event", "STRING"),
bigquery.SchemaField("properties", json_type),
bigquery.SchemaField("elements", "STRING"),
bigquery.SchemaField("set", json_type),
bigquery.SchemaField("set_once", json_type),
bigquery.SchemaField("distinct_id", "STRING"),
bigquery.SchemaField("team_id", "INT64"),
bigquery.SchemaField("ip", "STRING"),
bigquery.SchemaField("site_url", "STRING"),
bigquery.SchemaField("timestamp", "TIMESTAMP"),
bigquery.SchemaField("bq_ingested_timestamp", "TIMESTAMP"),
]

else:
fields = inputs.batch_export_schema["fields"]
query_parameters = inputs.batch_export_schema["values"]
column_names = [column for column in first_record_batch.schema.names if column != "_inserted_at"]
record_schema = first_record_batch.select(column_names).schema
schema = get_bigquery_fields_from_record_schema(record_schema, known_json_columns=json_columns)

records_iterator = iter_model_records(
client=client,
Expand All @@ -255,21 +318,16 @@ async def insert_into_bigquery_activity(inputs: BigQueryInsertInputs) -> Records
if first_record_batch is None:
return 0

bigquery_table = None
inserted_at = None
for record_batch in records_iterator:
for record in record_batch.select(record_columns).to_pylist():
inserted_at = record.pop("_inserted_at")

with bigquery_client(inputs) as bq_client:
with BatchExportTemporaryFile() as jsonl_file:
rows_exported = get_rows_exported_metric()
bytes_exported = get_bytes_exported_metric()
for json_column in json_columns:
if json_column in record and (json_str := record.get(json_column, None)) is not None:
record[json_column] = json.loads(json_str)

async def flush_to_bigquery(bigquery_table, table_schema):
logger.debug(
"Loading %s records of size %s bytes",
jsonl_file.records_since_last_reset,
jsonl_file.bytes_since_last_reset,
)
await load_jsonl_file_to_bigquery_table(jsonl_file, bigquery_table, table_schema, bq_client)
# TODO: Parquet is a much more efficient format to send data to BigQuery.
jsonl_file.write_records_to_jsonl([record])

rows_exported.add(jsonl_file.records_since_last_reset)
bytes_exported.add(jsonl_file.bytes_since_last_reset)
Expand Down Expand Up @@ -342,7 +400,15 @@ async def flush_to_bigquery(bigquery_table, table_schema):

jsonl_file.reset()

return jsonl_file.records_total
if jsonl_file.tell() > 0 and inserted_at is not None:
await flush_to_bigquery(bigquery_table, schema)

last_inserted_at = inserted_at.isoformat()
heartbeater.details = (str(last_inserted_at),)

jsonl_file.reset()

return jsonl_file.records_total


@workflow.defn(name="bigquery-export")
Expand Down
136 changes: 65 additions & 71 deletions posthog/temporal/batch_exports/postgres_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,20 +255,36 @@ async def insert_into_postgres_activity(inputs: PostgresInsertInputs) -> Records
inputs.table_name,
)

async with Heartbeater():
await try_set_batch_export_run_to_running(run_id=inputs.run_id, logger=logger)
async with (
Heartbeater(),
get_client(team_id=inputs.team_id) as client,
set_status_to_running_task(run_id=inputs.run_id, logger=logger),
):
if not await client.is_alive():
raise ConnectionError("Cannot establish connection to ClickHouse")

if inputs.batch_export_schema is None:
fields = postgres_default_fields()
query_parameters = None

async with get_client(team_id=inputs.team_id) as client:
if not await client.is_alive():
raise ConnectionError("Cannot establish connection to ClickHouse")

if inputs.batch_export_schema is None:
fields = postgres_default_fields()
query_parameters = None
else:
fields = inputs.batch_export_schema["fields"]
query_parameters = inputs.batch_export_schema["values"]

else:
fields = inputs.batch_export_schema["fields"]
query_parameters = inputs.batch_export_schema["values"]
record_iterator = iter_records(
client=client,
team_id=inputs.team_id,
interval_start=inputs.data_interval_start,
interval_end=inputs.data_interval_end,
exclude_events=inputs.exclude_events,
include_events=inputs.include_events,
fields=fields,
extra_query_parameters=query_parameters,
is_backfill=inputs.is_backfill,
)
first_record_batch, record_iterator = peek_first_and_rewind(record_iterator)
if first_record_batch is None:
return 0

record_iterator = iter_model_records(
client=client,
Expand All @@ -286,79 +302,57 @@ async def insert_into_postgres_activity(inputs: PostgresInsertInputs) -> Records
if first_record_batch is None:
return 0

if inputs.batch_export_schema is None:
table_fields = [
("uuid", "VARCHAR(200)"),
("event", "VARCHAR(200)"),
("properties", "JSONB"),
("elements", "JSONB"),
("set", "JSONB"),
("set_once", "JSONB"),
("distinct_id", "VARCHAR(200)"),
("team_id", "INTEGER"),
("ip", "VARCHAR(200)"),
("site_url", "VARCHAR(200)"),
("timestamp", "TIMESTAMP WITH TIME ZONE"),
]
async with postgres_connection(inputs) as connection:
await create_table_in_postgres(
connection,
schema=inputs.schema,
table_name=inputs.table_name,
fields=table_fields,
)

else:
column_names = [column for column in first_record_batch.schema.names if column != "_inserted_at"]
record_schema = first_record_batch.select(column_names).schema
table_fields = get_postgres_fields_from_record_schema(
record_schema, known_json_columns=["properties", "set", "set_once", "person_properties"]
)
schema_columns = [field[0] for field in table_fields]

rows_exported = get_rows_exported_metric()
bytes_exported = get_bytes_exported_metric()

with BatchExportTemporaryFile() as pg_file:
async with postgres_connection(inputs) as connection:
await create_table_in_postgres(
connection,
schema=inputs.schema,
table_name=inputs.table_name,
fields=table_fields,
)

schema_columns = [field[0] for field in table_fields]
async def flush_to_postgres():
logger.debug(
"Copying %s records of size %s bytes",
pg_file.records_since_last_reset,
pg_file.bytes_since_last_reset,
)
await copy_tsv_to_postgres(
pg_file,
connection,
inputs.schema,
inputs.table_name,
schema_columns,
)
rows_exported.add(pg_file.records_since_last_reset)
bytes_exported.add(pg_file.bytes_since_last_reset)

rows_exported = get_rows_exported_metric()
bytes_exported = get_bytes_exported_metric()
for record_batch in record_iterator:
for result in record_batch.select(schema_columns).to_pylist():
row = result

with BatchExportTemporaryFile() as pg_file:
async with postgres_connection(inputs) as connection:
if "elements" in row and inputs.batch_export_schema is None:
row["elements"] = json.dumps(row["elements"])

async def flush_to_postgres():
logger.debug(
"Copying %s records of size %s bytes",
pg_file.records_since_last_reset,
pg_file.bytes_since_last_reset,
pg_file.write_records_to_tsv(
[row], fieldnames=schema_columns, quoting=csv.QUOTE_MINIMAL, escapechar=None
)
await copy_tsv_to_postgres(
pg_file,
connection,
inputs.schema,
inputs.table_name,
schema_columns,
)
rows_exported.add(pg_file.records_since_last_reset)
bytes_exported.add(pg_file.bytes_since_last_reset)

async for record_batch in record_iterator:
for result in record_batch.select(schema_columns).to_pylist():
row = result

if "elements" in row and inputs.batch_export_schema is None:
row["elements"] = json.dumps(row["elements"])

pg_file.write_records_to_tsv(
[row], fieldnames=schema_columns, quoting=csv.QUOTE_MINIMAL, escapechar=None
)

if pg_file.tell() > settings.BATCH_EXPORT_POSTGRES_UPLOAD_CHUNK_SIZE_BYTES:
await flush_to_postgres()
pg_file.reset()

if pg_file.tell() > 0:
await flush_to_postgres()
if pg_file.tell() > 0:
await flush_to_postgres()

return pg_file.records_total
return pg_file.records_total


@workflow.defn(name="postgres-export")
Expand Down
Loading

0 comments on commit 5bd140c

Please sign in to comment.