Skip to content

Commit

Permalink
feat: Start tracking records exported
Browse files Browse the repository at this point in the history
  • Loading branch information
tomasfarias committed Mar 19, 2024
1 parent 1778471 commit f0a4c12
Show file tree
Hide file tree
Showing 8 changed files with 36 additions and 12 deletions.
6 changes: 4 additions & 2 deletions posthog/batch_exports/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -438,14 +438,16 @@ def create_batch_export_run(
return run


def update_batch_export_run_status(run_id: UUID, status: str, latest_error: str | None) -> BatchExportRun:
def update_batch_export_run_status(
run_id: UUID, status: str, latest_error: str | None, records_completed: int = 0
) -> BatchExportRun:
"""Update the status of an BatchExportRun with given id.
Arguments:
id: The id of the BatchExportRun to update.
"""
model = BatchExportRun.objects.filter(id=run_id)
updated = model.update(status=status, latest_error=latest_error)
updated = model.update(status=status, latest_error=latest_error, records_completed=records_completed)

if not updated:
raise ValueError(f"BatchExportRun with id {run_id} not found.")
Expand Down
6 changes: 5 additions & 1 deletion posthog/temporal/batch_exports/batch_exports.py
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,7 @@ class UpdateBatchExportRunStatusInputs:
status: str
team_id: int
latest_error: str | None = None
records_completed: int = 0


@activity.defn
Expand All @@ -545,6 +546,7 @@ async def update_export_run_status(inputs: UpdateBatchExportRunStatusInputs) ->
run_id=uuid.UUID(inputs.id),
status=inputs.status,
latest_error=inputs.latest_error,
records_completed=inputs.records_completed,
)

if batch_export_run.status in (BatchExportRun.Status.FAILED, BatchExportRun.Status.FAILED_RETRYABLE):
Expand Down Expand Up @@ -664,7 +666,7 @@ async def execute_batch_export_insert_activity(
)

try:
await workflow.execute_activity(
records_completed = await workflow.execute_activity(
activity,
inputs,
start_to_close_timeout=dt.timedelta(seconds=start_to_close_timeout_seconds),
Expand All @@ -690,6 +692,8 @@ async def execute_batch_export_insert_activity(

finally:
get_export_finished_metric(status=update_inputs.status.lower()).add(1)

update_inputs.records_completed = records_completed
await workflow.execute_activity(
update_export_run_status,
update_inputs,
Expand Down
6 changes: 4 additions & 2 deletions posthog/temporal/batch_exports/bigquery_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ def bigquery_default_fields() -> list[BatchExportField]:


@activity.defn
async def insert_into_bigquery_activity(inputs: BigQueryInsertInputs):
async def insert_into_bigquery_activity(inputs: BigQueryInsertInputs) -> int:
"""Activity streams data from ClickHouse to BigQuery."""
logger = await bind_temporal_worker_logger(team_id=inputs.team_id, destination="BigQuery")
logger.info(
Expand Down Expand Up @@ -230,7 +230,7 @@ async def insert_into_bigquery_activity(inputs: BigQueryInsertInputs):
inputs.data_interval_start,
inputs.data_interval_end,
)
return
return 0

logger.info("BatchExporting %s rows", count)

Expand Down Expand Up @@ -354,6 +354,8 @@ async def flush_to_bigquery(bigquery_table, table_schema):

jsonl_file.reset()

return jsonl_file.records_total


@workflow.defn(name="bigquery-export")
class BigQueryBatchExportWorkflow(PostHogWorkflow):
Expand Down
6 changes: 4 additions & 2 deletions posthog/temporal/batch_exports/postgres_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ class PostgresInsertInputs:


@activity.defn
async def insert_into_postgres_activity(inputs: PostgresInsertInputs):
async def insert_into_postgres_activity(inputs: PostgresInsertInputs) -> int:
"""Activity streams data from ClickHouse to Postgres."""
logger = await bind_temporal_worker_logger(team_id=inputs.team_id, destination="PostgreSQL")
logger.info(
Expand Down Expand Up @@ -262,7 +262,7 @@ async def insert_into_postgres_activity(inputs: PostgresInsertInputs):
inputs.data_interval_start,
inputs.data_interval_end,
)
return
return 0

logger.info("BatchExporting %s rows", count)

Expand Down Expand Up @@ -359,6 +359,8 @@ async def flush_to_postgres():
if pg_file.tell() > 0:
await flush_to_postgres()

return pg_file.records_total


@workflow.defn(name="postgres-export")
class PostgresBatchExportWorkflow(PostHogWorkflow):
Expand Down
9 changes: 8 additions & 1 deletion posthog/temporal/batch_exports/redshift_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ async def insert_records_to_redshift(
schema: str | None,
table: str,
batch_size: int = 100,
):
) -> int:
"""Execute an INSERT query with given Redshift connection.
The recommended way to insert multiple values into Redshift is using a COPY statement (see:
Expand Down Expand Up @@ -206,15 +206,20 @@ async def insert_records_to_redshift(
template = sql.SQL("({})").format(sql.SQL(", ").join(map(sql.Placeholder, columns)))
rows_exported = get_rows_exported_metric()

total_rows_exported = 0

async with async_client_cursor_from_connection(redshift_connection) as cursor:
batch = []
pre_query_str = pre_query.as_string(cursor).encode("utf-8")

async def flush_to_redshift(batch):
nonlocal total_rows_exported

values = b",".join(batch).replace(b" E'", b" '")

await cursor.execute(pre_query_str + values)
rows_exported.add(len(batch))
total_rows_exported += len(batch)
# It would be nice to record BYTES_EXPORTED for Redshift, but it's not worth estimating
# the byte size of each batch the way things are currently written. We can revisit this
# in the future if we decide it's useful enough.
Expand All @@ -230,6 +235,8 @@ async def flush_to_redshift(batch):
if len(batch) > 0:
await flush_to_redshift(batch)

return total_rows_exported


@contextlib.asynccontextmanager
async def async_client_cursor_from_connection(
Expand Down
6 changes: 4 additions & 2 deletions posthog/temporal/batch_exports/s3_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ def s3_default_fields() -> list[BatchExportField]:


@activity.defn
async def insert_into_s3_activity(inputs: S3InsertInputs):
async def insert_into_s3_activity(inputs: S3InsertInputs) -> int:
"""Activity to batch export data from PostHog's ClickHouse to S3.
It currently only creates a single file per run, and uploads as a multipart upload.
Expand Down Expand Up @@ -418,7 +418,7 @@ async def insert_into_s3_activity(inputs: S3InsertInputs):
inputs.data_interval_start,
inputs.data_interval_end,
)
return
return 0

logger.info("BatchExporting %s rows to S3", count)

Expand Down Expand Up @@ -503,6 +503,8 @@ async def flush_to_s3(last_uploaded_part_timestamp: str, last=False):

await s3_upload.complete()

return local_results_file.records_total


@workflow.defn(name="s3-export")
class S3BatchExportWorkflow(PostHogWorkflow):
Expand Down
6 changes: 4 additions & 2 deletions posthog/temporal/batch_exports/snowflake_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ async def copy_loaded_files_to_snowflake_table(


@activity.defn
async def insert_into_snowflake_activity(inputs: SnowflakeInsertInputs):
async def insert_into_snowflake_activity(inputs: SnowflakeInsertInputs) -> int:
"""Activity streams data from ClickHouse to Snowflake.
TODO: We're using JSON here, it's not the most efficient way to do this.
Expand Down Expand Up @@ -430,7 +430,7 @@ async def insert_into_snowflake_activity(inputs: SnowflakeInsertInputs):
inputs.data_interval_start,
inputs.data_interval_end,
)
return
return 0

logger.info("BatchExporting %s rows", count)

Expand Down Expand Up @@ -553,6 +553,8 @@ async def worker_shutdown_handler():

await copy_loaded_files_to_snowflake_table(connection, inputs.table_name)

return local_results_file.records_total


@workflow.defn(name="snowflake-export")
class SnowflakeBatchExportWorkflow(PostHogWorkflow):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -805,6 +805,7 @@ async def test_s3_export_workflow_defaults_to_timestamp_on_null_inserted_at(

run = runs[0]
assert run.status == "Completed"
assert run.records_completed == 100

await assert_clickhouse_records_in_s3(
s3_compatible_client=minio_client,
Expand Down Expand Up @@ -893,6 +894,7 @@ async def test_s3_export_workflow_with_minio_bucket_and_custom_key_prefix(

run = runs[0]
assert run.status == "Completed"
assert run.records_completed == 100

expected_key_prefix = s3_key_prefix.format(
table="events",
Expand Down Expand Up @@ -968,6 +970,7 @@ async def insert_into_s3_activity_mocked(_: S3InsertInputs) -> str:
run = runs[0]
assert run.status == "FailedRetryable"
assert run.latest_error == "ValueError: A useful error message"
assert run.records_completed == 0


async def test_s3_export_workflow_handles_insert_activity_non_retryable_errors(ateam, s3_batch_export, interval):
Expand Down

0 comments on commit f0a4c12

Please sign in to comment.