Skip to content

Commit

Permalink
feat: Start tracking records exported (#21008)
Browse files Browse the repository at this point in the history
* feat: Start tracking records exported

* fix: Move records_completed inside try

* test: Add more destination tests

* fix: Redshift returns int

* fix: Also return records_total from HTTP

* Update query snapshots

* Update query snapshots

* Update query snapshots

* Update query snapshots

* fix: Return inside context manager

Co-authored-by: Brett Hoerner <[email protected]>

* fix: Return inside context manager

Co-authored-by: Brett Hoerner <[email protected]>

---------

Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: Brett Hoerner <[email protected]>
  • Loading branch information
3 people authored Mar 20, 2024
1 parent 7076cd4 commit 53355af
Show file tree
Hide file tree
Showing 14 changed files with 55 additions and 17 deletions.
6 changes: 4 additions & 2 deletions posthog/batch_exports/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -439,14 +439,16 @@ def create_batch_export_run(
return run


def update_batch_export_run_status(run_id: UUID, status: str, latest_error: str | None) -> BatchExportRun:
def update_batch_export_run_status(
run_id: UUID, status: str, latest_error: str | None, records_completed: int = 0
) -> BatchExportRun:
"""Update the status of an BatchExportRun with given id.
Arguments:
id: The id of the BatchExportRun to update.
"""
model = BatchExportRun.objects.filter(id=run_id)
updated = model.update(status=status, latest_error=latest_error)
updated = model.update(status=status, latest_error=latest_error, records_completed=records_completed)

if not updated:
raise ValueError(f"BatchExportRun with id {run_id} not found.")
Expand Down
6 changes: 5 additions & 1 deletion posthog/temporal/batch_exports/batch_exports.py
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,7 @@ class UpdateBatchExportRunStatusInputs:
status: str
team_id: int
latest_error: str | None = None
records_completed: int = 0


@activity.defn
Expand All @@ -545,6 +546,7 @@ async def update_export_run_status(inputs: UpdateBatchExportRunStatusInputs) ->
run_id=uuid.UUID(inputs.id),
status=inputs.status,
latest_error=inputs.latest_error,
records_completed=inputs.records_completed,
)

if batch_export_run.status in (BatchExportRun.Status.FAILED, BatchExportRun.Status.FAILED_RETRYABLE):
Expand Down Expand Up @@ -664,13 +666,14 @@ async def execute_batch_export_insert_activity(
)

try:
await workflow.execute_activity(
records_completed = await workflow.execute_activity(
activity,
inputs,
start_to_close_timeout=dt.timedelta(seconds=start_to_close_timeout_seconds),
heartbeat_timeout=dt.timedelta(seconds=heartbeat_timeout_seconds) if heartbeat_timeout_seconds else None,
retry_policy=retry_policy,
)
update_inputs.records_completed = records_completed

except exceptions.ActivityError as e:
if isinstance(e.cause, exceptions.CancelledError):
Expand All @@ -690,6 +693,7 @@ async def execute_batch_export_insert_activity(

finally:
get_export_finished_metric(status=update_inputs.status.lower()).add(1)

await workflow.execute_activity(
update_export_run_status,
update_inputs,
Expand Down
6 changes: 4 additions & 2 deletions posthog/temporal/batch_exports/bigquery_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ def bigquery_default_fields() -> list[BatchExportField]:


@activity.defn
async def insert_into_bigquery_activity(inputs: BigQueryInsertInputs):
async def insert_into_bigquery_activity(inputs: BigQueryInsertInputs) -> int:
"""Activity streams data from ClickHouse to BigQuery."""
logger = await bind_temporal_worker_logger(team_id=inputs.team_id, destination="BigQuery")
logger.info(
Expand Down Expand Up @@ -230,7 +230,7 @@ async def insert_into_bigquery_activity(inputs: BigQueryInsertInputs):
inputs.data_interval_start,
inputs.data_interval_end,
)
return
return 0

logger.info("BatchExporting %s rows", count)

Expand Down Expand Up @@ -354,6 +354,8 @@ async def flush_to_bigquery(bigquery_table, table_schema):

jsonl_file.reset()

return jsonl_file.records_total


@workflow.defn(name="bigquery-export")
class BigQueryBatchExportWorkflow(PostHogWorkflow):
Expand Down
6 changes: 4 additions & 2 deletions posthog/temporal/batch_exports/http_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ async def post_json_file_to_url(url, batch_file, session: aiohttp.ClientSession)


@activity.defn
async def insert_into_http_activity(inputs: HttpInsertInputs):
async def insert_into_http_activity(inputs: HttpInsertInputs) -> int:
"""Activity streams data from ClickHouse to an HTTP Endpoint."""
logger = await bind_temporal_worker_logger(team_id=inputs.team_id, destination="HTTP")
logger.info(
Expand Down Expand Up @@ -180,7 +180,7 @@ async def insert_into_http_activity(inputs: HttpInsertInputs):
inputs.data_interval_start,
inputs.data_interval_end,
)
return
return 0

logger.info("BatchExporting %s rows", count)

Expand Down Expand Up @@ -303,6 +303,8 @@ async def flush_batch_to_http_endpoint(last_uploaded_timestamp: str, session: ai
last_uploaded_timestamp = str(inserted_at)
await flush_batch_to_http_endpoint(last_uploaded_timestamp, session)

return batch_file.records_total


@workflow.defn(name="http-export")
class HttpBatchExportWorkflow(PostHogWorkflow):
Expand Down
6 changes: 4 additions & 2 deletions posthog/temporal/batch_exports/postgres_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ class PostgresInsertInputs:


@activity.defn
async def insert_into_postgres_activity(inputs: PostgresInsertInputs):
async def insert_into_postgres_activity(inputs: PostgresInsertInputs) -> int:
"""Activity streams data from ClickHouse to Postgres."""
logger = await bind_temporal_worker_logger(team_id=inputs.team_id, destination="PostgreSQL")
logger.info(
Expand Down Expand Up @@ -262,7 +262,7 @@ async def insert_into_postgres_activity(inputs: PostgresInsertInputs):
inputs.data_interval_start,
inputs.data_interval_end,
)
return
return 0

logger.info("BatchExporting %s rows", count)

Expand Down Expand Up @@ -359,6 +359,8 @@ async def flush_to_postgres():
if pg_file.tell() > 0:
await flush_to_postgres()

return pg_file.records_total


@workflow.defn(name="postgres-export")
class PostgresBatchExportWorkflow(PostHogWorkflow):
Expand Down
17 changes: 13 additions & 4 deletions posthog/temporal/batch_exports/redshift_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ async def insert_records_to_redshift(
schema: str | None,
table: str,
batch_size: int = 100,
):
) -> int:
"""Execute an INSERT query with given Redshift connection.
The recommended way to insert multiple values into Redshift is using a COPY statement (see:
Expand Down Expand Up @@ -206,15 +206,20 @@ async def insert_records_to_redshift(
template = sql.SQL("({})").format(sql.SQL(", ").join(map(sql.Placeholder, columns)))
rows_exported = get_rows_exported_metric()

total_rows_exported = 0

async with async_client_cursor_from_connection(redshift_connection) as cursor:
batch = []
pre_query_str = pre_query.as_string(cursor).encode("utf-8")

async def flush_to_redshift(batch):
nonlocal total_rows_exported

values = b",".join(batch).replace(b" E'", b" '")

await cursor.execute(pre_query_str + values)
rows_exported.add(len(batch))
total_rows_exported += len(batch)
# It would be nice to record BYTES_EXPORTED for Redshift, but it's not worth estimating
# the byte size of each batch the way things are currently written. We can revisit this
# in the future if we decide it's useful enough.
Expand All @@ -230,6 +235,8 @@ async def flush_to_redshift(batch):
if len(batch) > 0:
await flush_to_redshift(batch)

return total_rows_exported


@contextlib.asynccontextmanager
async def async_client_cursor_from_connection(
Expand Down Expand Up @@ -264,7 +271,7 @@ class RedshiftInsertInputs(PostgresInsertInputs):


@activity.defn
async def insert_into_redshift_activity(inputs: RedshiftInsertInputs):
async def insert_into_redshift_activity(inputs: RedshiftInsertInputs) -> int:
"""Activity to insert data from ClickHouse to Redshift.
This activity executes the following steps:
Expand Down Expand Up @@ -306,7 +313,7 @@ async def insert_into_redshift_activity(inputs: RedshiftInsertInputs):
inputs.data_interval_start,
inputs.data_interval_end,
)
return
return 0

logger.info("BatchExporting %s rows", count)

Expand Down Expand Up @@ -383,13 +390,15 @@ def map_to_record(row: dict) -> dict:
return record

async with postgres_connection(inputs) as connection:
await insert_records_to_redshift(
records_completed = await insert_records_to_redshift(
(map_to_record(record) for record_batch in record_iterator for record in record_batch.to_pylist()),
connection,
inputs.schema,
inputs.table_name,
)

return records_completed


@workflow.defn(name="redshift-export")
class RedshiftBatchExportWorkflow(PostHogWorkflow):
Expand Down
6 changes: 4 additions & 2 deletions posthog/temporal/batch_exports/s3_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ def s3_default_fields() -> list[BatchExportField]:


@activity.defn
async def insert_into_s3_activity(inputs: S3InsertInputs):
async def insert_into_s3_activity(inputs: S3InsertInputs) -> int:
"""Activity to batch export data from PostHog's ClickHouse to S3.
It currently only creates a single file per run, and uploads as a multipart upload.
Expand Down Expand Up @@ -424,7 +424,7 @@ async def insert_into_s3_activity(inputs: S3InsertInputs):
inputs.data_interval_start,
inputs.data_interval_end,
)
return
return 0

logger.info("BatchExporting %s rows to S3", count)

Expand Down Expand Up @@ -509,6 +509,8 @@ async def flush_to_s3(last_uploaded_part_timestamp: str, last=False):

await s3_upload.complete()

return local_results_file.records_total


@workflow.defn(name="s3-export")
class S3BatchExportWorkflow(PostHogWorkflow):
Expand Down
6 changes: 4 additions & 2 deletions posthog/temporal/batch_exports/snowflake_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ async def copy_loaded_files_to_snowflake_table(


@activity.defn
async def insert_into_snowflake_activity(inputs: SnowflakeInsertInputs):
async def insert_into_snowflake_activity(inputs: SnowflakeInsertInputs) -> int:
"""Activity streams data from ClickHouse to Snowflake.
TODO: We're using JSON here, it's not the most efficient way to do this.
Expand Down Expand Up @@ -430,7 +430,7 @@ async def insert_into_snowflake_activity(inputs: SnowflakeInsertInputs):
inputs.data_interval_start,
inputs.data_interval_end,
)
return
return 0

logger.info("BatchExporting %s rows", count)

Expand Down Expand Up @@ -553,6 +553,8 @@ async def worker_shutdown_handler():

await copy_loaded_files_to_snowflake_table(connection, inputs.table_name)

return local_results_file.records_total


@workflow.defn(name="snowflake-export")
class SnowflakeBatchExportWorkflow(PostHogWorkflow):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,7 @@ async def test_bigquery_export_workflow(

run = runs[0]
assert run.status == "Completed"
assert run.records_completed == 100

ingested_timestamp = frozen_time().replace(tzinfo=dt.timezone.utc)
assert_clickhouse_records_in_bigquery(
Expand Down Expand Up @@ -566,6 +567,7 @@ class RefreshError(Exception):
run = runs[0]
assert run.status == "Failed"
assert run.latest_error == "RefreshError: A useful error message"
assert run.records_completed == 0


async def test_bigquery_export_workflow_handles_cancellation(ateam, bigquery_batch_export, interval):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@ async def test_http_export_workflow(

run = runs[0]
assert run.status == "Completed"
assert run.records_completed == 100

await assert_clickhouse_records_in_mock_server(
mock_server=mock_server,
Expand Down Expand Up @@ -425,6 +426,7 @@ async def insert_into_http_activity_mocked(_: HttpInsertInputs) -> str:
run = runs[0]
assert run.status == "FailedRetryable"
assert run.latest_error == "ValueError: A useful error message"
assert run.records_completed == 0


async def test_http_export_workflow_handles_insert_activity_non_retryable_errors(ateam, http_batch_export, interval):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,7 @@ async def test_postgres_export_workflow(

run = runs[0]
assert run.status == "Completed"
assert run.records_completed == 100

await assert_clickhouse_records_in_postgres(
postgres_connection=postgres_connection,
Expand Down Expand Up @@ -494,6 +495,7 @@ class InsufficientPrivilege(Exception):
run = runs[0]
assert run.status == "Failed"
assert run.latest_error == "InsufficientPrivilege: A useful error message"
assert run.records_completed == 0


async def test_postgres_export_workflow_handles_cancellation(ateam, postgres_batch_export, interval):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,7 @@ async def test_redshift_export_workflow(

run = runs[0]
assert run.status == "Completed"
assert run.records_completed == 100

await assert_clickhouse_records_in_redshfit(
redshift_connection=psycopg_connection,
Expand Down Expand Up @@ -559,3 +560,4 @@ class InsufficientPrivilege(Exception):
run = runs[0]
assert run.status == "Failed"
assert run.latest_error == "InsufficientPrivilege: A useful error message"
assert run.records_completed == 0
Original file line number Diff line number Diff line change
Expand Up @@ -791,6 +791,7 @@ async def test_s3_export_workflow_defaults_to_timestamp_on_null_inserted_at(

run = runs[0]
assert run.status == "Completed"
assert run.records_completed == 100

await assert_clickhouse_records_in_s3(
s3_compatible_client=minio_client,
Expand Down Expand Up @@ -875,6 +876,7 @@ async def test_s3_export_workflow_with_minio_bucket_and_custom_key_prefix(

run = runs[0]
assert run.status == "Completed"
assert run.records_completed == 100

expected_key_prefix = s3_key_prefix.format(
table="events",
Expand Down Expand Up @@ -950,6 +952,7 @@ async def insert_into_s3_activity_mocked(_: S3InsertInputs) -> str:
run = runs[0]
assert run.status == "FailedRetryable"
assert run.latest_error == "ValueError: A useful error message"
assert run.records_completed == 0


async def test_s3_export_workflow_handles_insert_activity_non_retryable_errors(ateam, s3_batch_export, interval):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,7 @@ async def test_snowflake_export_workflow_exports_events(

run = runs[0]
assert run.status == "Completed"
assert run.records_completed == 10


@pytest.mark.parametrize("interval", ["hour", "day"], indirect=True)
Expand Down Expand Up @@ -695,6 +696,7 @@ async def insert_into_snowflake_activity_mocked(_: SnowflakeInsertInputs) -> str
run = runs[0]
assert run.status == "FailedRetryable"
assert run.latest_error == "ValueError: A useful error message"
assert run.records_completed == 0


async def test_snowflake_export_workflow_handles_insert_activity_non_retryable_errors(ateam, snowflake_batch_export):
Expand Down

0 comments on commit 53355af

Please sign in to comment.