Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Start tracking records exported #21008

Merged
merged 11 commits into from
Mar 20, 2024
6 changes: 4 additions & 2 deletions posthog/batch_exports/service.py
Original file line number Diff line number Diff line change
@@ -438,14 +438,16 @@ def create_batch_export_run(
return run


def update_batch_export_run_status(run_id: UUID, status: str, latest_error: str | None) -> BatchExportRun:
def update_batch_export_run_status(
run_id: UUID, status: str, latest_error: str | None, records_completed: int = 0
) -> BatchExportRun:
Comment on lines +441 to +443
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could just be named update_batch_export_run as it's doing more than setting the status.

Copy link
Contributor Author

@tomasfarias tomasfarias Mar 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On a broader note, I think we should get rid of the ORM (in batch exports) and move to something like aiosql.

"""Update the status of an BatchExportRun with given id.

Arguments:
id: The id of the BatchExportRun to update.
"""
model = BatchExportRun.objects.filter(id=run_id)
updated = model.update(status=status, latest_error=latest_error)
updated = model.update(status=status, latest_error=latest_error, records_completed=records_completed)

if not updated:
raise ValueError(f"BatchExportRun with id {run_id} not found.")
Original file line number Diff line number Diff line change
@@ -778,7 +778,7 @@
FROM "posthog_persondistinctid"
INNER JOIN "posthog_person" ON ("posthog_persondistinctid"."person_id" = "posthog_person"."id")
WHERE ("posthog_persondistinctid"."distinct_id" IN ('user2',
'user_one_2')
'user_one_0')
AND "posthog_persondistinctid"."team_id" = 2) /*controller='project_session_recordings-list',route='api/projects/%28%3FP%3Cparent_lookup_team_id%3E%5B%5E/.%5D%2B%29/session_recordings/%3F%24'*/
'''
# ---
6 changes: 5 additions & 1 deletion posthog/temporal/batch_exports/batch_exports.py
Original file line number Diff line number Diff line change
@@ -534,6 +534,7 @@ class UpdateBatchExportRunStatusInputs:
status: str
team_id: int
latest_error: str | None = None
records_completed: int = 0


@activity.defn
@@ -545,6 +546,7 @@ async def update_export_run_status(inputs: UpdateBatchExportRunStatusInputs) ->
run_id=uuid.UUID(inputs.id),
status=inputs.status,
latest_error=inputs.latest_error,
records_completed=inputs.records_completed,
)

if batch_export_run.status in (BatchExportRun.Status.FAILED, BatchExportRun.Status.FAILED_RETRYABLE):
@@ -664,13 +666,14 @@ async def execute_batch_export_insert_activity(
)

try:
await workflow.execute_activity(
records_completed = await workflow.execute_activity(
activity,
inputs,
start_to_close_timeout=dt.timedelta(seconds=start_to_close_timeout_seconds),
heartbeat_timeout=dt.timedelta(seconds=heartbeat_timeout_seconds) if heartbeat_timeout_seconds else None,
retry_policy=retry_policy,
)
update_inputs.records_completed = records_completed

except exceptions.ActivityError as e:
if isinstance(e.cause, exceptions.CancelledError):
@@ -690,6 +693,7 @@ async def execute_batch_export_insert_activity(

finally:
get_export_finished_metric(status=update_inputs.status.lower()).add(1)

await workflow.execute_activity(
update_export_run_status,
update_inputs,
6 changes: 4 additions & 2 deletions posthog/temporal/batch_exports/bigquery_batch_export.py
Original file line number Diff line number Diff line change
@@ -193,7 +193,7 @@ def bigquery_default_fields() -> list[BatchExportField]:


@activity.defn
async def insert_into_bigquery_activity(inputs: BigQueryInsertInputs):
async def insert_into_bigquery_activity(inputs: BigQueryInsertInputs) -> int:
"""Activity streams data from ClickHouse to BigQuery."""
logger = await bind_temporal_worker_logger(team_id=inputs.team_id, destination="BigQuery")
logger.info(
@@ -230,7 +230,7 @@ async def insert_into_bigquery_activity(inputs: BigQueryInsertInputs):
inputs.data_interval_start,
inputs.data_interval_end,
)
return
return 0

logger.info("BatchExporting %s rows", count)

@@ -354,6 +354,8 @@ async def flush_to_bigquery(bigquery_table, table_schema):

jsonl_file.reset()

return jsonl_file.records_total
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is de-dented outside of the with BatchExportTemporaryFile() as jsonl_file:, while it may work (which surprises me, but Python loves to leak things from scope) I think we should put it inside?

Copy link
Contributor Author

@tomasfarias tomasfarias Mar 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, with statements do not introduce start a new scope. I am fine with de-denting this though, and I do plan to address it more in future PRs.

tomasfarias marked this conversation as resolved.
Show resolved Hide resolved


@workflow.defn(name="bigquery-export")
class BigQueryBatchExportWorkflow(PostHogWorkflow):
6 changes: 4 additions & 2 deletions posthog/temporal/batch_exports/http_batch_export.py
Original file line number Diff line number Diff line change
@@ -152,7 +152,7 @@ async def post_json_file_to_url(url, batch_file, session: aiohttp.ClientSession)


@activity.defn
async def insert_into_http_activity(inputs: HttpInsertInputs):
async def insert_into_http_activity(inputs: HttpInsertInputs) -> int:
"""Activity streams data from ClickHouse to an HTTP Endpoint."""
logger = await bind_temporal_worker_logger(team_id=inputs.team_id, destination="HTTP")
logger.info(
@@ -180,7 +180,7 @@ async def insert_into_http_activity(inputs: HttpInsertInputs):
inputs.data_interval_start,
inputs.data_interval_end,
)
return
return 0

logger.info("BatchExporting %s rows", count)

@@ -303,6 +303,8 @@ async def flush_batch_to_http_endpoint(last_uploaded_timestamp: str, session: ai
last_uploaded_timestamp = str(inserted_at)
await flush_batch_to_http_endpoint(last_uploaded_timestamp, session)

return batch_file.records_total


@workflow.defn(name="http-export")
class HttpBatchExportWorkflow(PostHogWorkflow):
6 changes: 4 additions & 2 deletions posthog/temporal/batch_exports/postgres_batch_export.py
Original file line number Diff line number Diff line change
@@ -234,7 +234,7 @@ class PostgresInsertInputs:


@activity.defn
async def insert_into_postgres_activity(inputs: PostgresInsertInputs):
async def insert_into_postgres_activity(inputs: PostgresInsertInputs) -> int:
"""Activity streams data from ClickHouse to Postgres."""
logger = await bind_temporal_worker_logger(team_id=inputs.team_id, destination="PostgreSQL")
logger.info(
@@ -262,7 +262,7 @@ async def insert_into_postgres_activity(inputs: PostgresInsertInputs):
inputs.data_interval_start,
inputs.data_interval_end,
)
return
return 0

logger.info("BatchExporting %s rows", count)

@@ -359,6 +359,8 @@ async def flush_to_postgres():
if pg_file.tell() > 0:
await flush_to_postgres()

return pg_file.records_total
tomasfarias marked this conversation as resolved.
Show resolved Hide resolved


@workflow.defn(name="postgres-export")
class PostgresBatchExportWorkflow(PostHogWorkflow):
17 changes: 13 additions & 4 deletions posthog/temporal/batch_exports/redshift_batch_export.py
Original file line number Diff line number Diff line change
@@ -171,7 +171,7 @@ async def insert_records_to_redshift(
schema: str | None,
table: str,
batch_size: int = 100,
):
) -> int:
"""Execute an INSERT query with given Redshift connection.

The recommended way to insert multiple values into Redshift is using a COPY statement (see:
@@ -206,15 +206,20 @@ async def insert_records_to_redshift(
template = sql.SQL("({})").format(sql.SQL(", ").join(map(sql.Placeholder, columns)))
rows_exported = get_rows_exported_metric()

total_rows_exported = 0

async with async_client_cursor_from_connection(redshift_connection) as cursor:
batch = []
pre_query_str = pre_query.as_string(cursor).encode("utf-8")

async def flush_to_redshift(batch):
nonlocal total_rows_exported

values = b",".join(batch).replace(b" E'", b" '")

await cursor.execute(pre_query_str + values)
rows_exported.add(len(batch))
total_rows_exported += len(batch)
Comment on lines 221 to +222
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if we could just read the counter instead of having this duplicated.

# It would be nice to record BYTES_EXPORTED for Redshift, but it's not worth estimating
# the byte size of each batch the way things are currently written. We can revisit this
# in the future if we decide it's useful enough.
@@ -230,6 +235,8 @@ async def flush_to_redshift(batch):
if len(batch) > 0:
await flush_to_redshift(batch)

return total_rows_exported


@contextlib.asynccontextmanager
async def async_client_cursor_from_connection(
@@ -264,7 +271,7 @@ class RedshiftInsertInputs(PostgresInsertInputs):


@activity.defn
async def insert_into_redshift_activity(inputs: RedshiftInsertInputs):
async def insert_into_redshift_activity(inputs: RedshiftInsertInputs) -> int:
"""Activity to insert data from ClickHouse to Redshift.

This activity executes the following steps:
@@ -306,7 +313,7 @@ async def insert_into_redshift_activity(inputs: RedshiftInsertInputs):
inputs.data_interval_start,
inputs.data_interval_end,
)
return
return 0

logger.info("BatchExporting %s rows", count)

@@ -383,13 +390,15 @@ def map_to_record(row: dict) -> dict:
return record

async with postgres_connection(inputs) as connection:
await insert_records_to_redshift(
records_completed = await insert_records_to_redshift(
(map_to_record(record) for record_batch in record_iterator for record in record_batch.to_pylist()),
connection,
inputs.schema,
inputs.table_name,
)

return records_completed


@workflow.defn(name="redshift-export")
class RedshiftBatchExportWorkflow(PostHogWorkflow):
6 changes: 4 additions & 2 deletions posthog/temporal/batch_exports/s3_batch_export.py
Original file line number Diff line number Diff line change
@@ -382,7 +382,7 @@ def s3_default_fields() -> list[BatchExportField]:


@activity.defn
async def insert_into_s3_activity(inputs: S3InsertInputs):
async def insert_into_s3_activity(inputs: S3InsertInputs) -> int:
"""Activity to batch export data from PostHog's ClickHouse to S3.

It currently only creates a single file per run, and uploads as a multipart upload.
@@ -418,7 +418,7 @@ async def insert_into_s3_activity(inputs: S3InsertInputs):
inputs.data_interval_start,
inputs.data_interval_end,
)
return
return 0

logger.info("BatchExporting %s rows to S3", count)

@@ -503,6 +503,8 @@ async def flush_to_s3(last_uploaded_part_timestamp: str, last=False):

await s3_upload.complete()

return local_results_file.records_total
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also de-dented, but it competes with the s3_upload which happens at an outer scope...

Maybe I'm being un-Pythonic? The use of important information outside of the with gives me the creeps, but maybe this is fine and good. I guess it's fine as long as __exit__ leaves the state we need?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

await s3_upload.complete() should be in the __exit__ of S3MultiPartUpload. I took it out as I was going crazy trying to debug a batch export completing with an extra number of parts, so I tried being explicit to throw some clarity at the problem.

In the end, I think I randomly fixed the bug in another PR when I wasn't looking for it. Anyways, this remained outside of the __exit__ and should be re-added in as it wasn't the cause of the bug.

Copy link
Contributor Author

@tomasfarias tomasfarias Mar 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we move it we do have to account for exceptions: We don't want to complete the upload in case of an exception and we have to be very careful if we are aborting, as a retry could continue the upload. Maybe the solution is to move this outside of the activity and in the workflow, as starting, completing, or aborting an upload should never fail except for wrong credentials, which we can very precisely wrap in a try/except.

I can open a follow-up PR to deal with this.



@workflow.defn(name="s3-export")
class S3BatchExportWorkflow(PostHogWorkflow):
6 changes: 4 additions & 2 deletions posthog/temporal/batch_exports/snowflake_batch_export.py
Original file line number Diff line number Diff line change
@@ -388,7 +388,7 @@ async def copy_loaded_files_to_snowflake_table(


@activity.defn
async def insert_into_snowflake_activity(inputs: SnowflakeInsertInputs):
async def insert_into_snowflake_activity(inputs: SnowflakeInsertInputs) -> int:
"""Activity streams data from ClickHouse to Snowflake.

TODO: We're using JSON here, it's not the most efficient way to do this.
@@ -430,7 +430,7 @@ async def insert_into_snowflake_activity(inputs: SnowflakeInsertInputs):
inputs.data_interval_start,
inputs.data_interval_end,
)
return
return 0

logger.info("BatchExporting %s rows", count)

@@ -553,6 +553,8 @@ async def worker_shutdown_handler():

await copy_loaded_files_to_snowflake_table(connection, inputs.table_name)

return local_results_file.records_total


@workflow.defn(name="snowflake-export")
class SnowflakeBatchExportWorkflow(PostHogWorkflow):
Original file line number Diff line number Diff line change
@@ -453,6 +453,7 @@ async def test_bigquery_export_workflow(

run = runs[0]
assert run.status == "Completed"
assert run.records_completed == 100

ingested_timestamp = frozen_time().replace(tzinfo=dt.timezone.utc)
assert_clickhouse_records_in_bigquery(
@@ -566,6 +567,7 @@ class RefreshError(Exception):
run = runs[0]
assert run.status == "Failed"
assert run.latest_error == "RefreshError: A useful error message"
assert run.records_completed == 0


async def test_bigquery_export_workflow_handles_cancellation(ateam, bigquery_batch_export, interval):
Original file line number Diff line number Diff line change
@@ -370,6 +370,7 @@ async def test_http_export_workflow(

run = runs[0]
assert run.status == "Completed"
assert run.records_completed == 100

await assert_clickhouse_records_in_mock_server(
mock_server=mock_server,
@@ -425,6 +426,7 @@ async def insert_into_http_activity_mocked(_: HttpInsertInputs) -> str:
run = runs[0]
assert run.status == "FailedRetryable"
assert run.latest_error == "ValueError: A useful error message"
assert run.records_completed == 0


async def test_http_export_workflow_handles_insert_activity_non_retryable_errors(ateam, http_batch_export, interval):
Original file line number Diff line number Diff line change
@@ -385,6 +385,7 @@ async def test_postgres_export_workflow(

run = runs[0]
assert run.status == "Completed"
assert run.records_completed == 100

await assert_clickhouse_records_in_postgres(
postgres_connection=postgres_connection,
@@ -494,6 +495,7 @@ class InsufficientPrivilege(Exception):
run = runs[0]
assert run.status == "Failed"
assert run.latest_error == "InsufficientPrivilege: A useful error message"
assert run.records_completed == 0


async def test_postgres_export_workflow_handles_cancellation(ateam, postgres_batch_export, interval):
Original file line number Diff line number Diff line change
@@ -433,6 +433,7 @@ async def test_redshift_export_workflow(

run = runs[0]
assert run.status == "Completed"
assert run.records_completed == 100

await assert_clickhouse_records_in_redshfit(
redshift_connection=psycopg_connection,
@@ -559,3 +560,4 @@ class InsufficientPrivilege(Exception):
run = runs[0]
assert run.status == "Failed"
assert run.latest_error == "InsufficientPrivilege: A useful error message"
assert run.records_completed == 0
Original file line number Diff line number Diff line change
@@ -805,6 +805,7 @@ async def test_s3_export_workflow_defaults_to_timestamp_on_null_inserted_at(

run = runs[0]
assert run.status == "Completed"
assert run.records_completed == 100

await assert_clickhouse_records_in_s3(
s3_compatible_client=minio_client,
@@ -893,6 +894,7 @@ async def test_s3_export_workflow_with_minio_bucket_and_custom_key_prefix(

run = runs[0]
assert run.status == "Completed"
assert run.records_completed == 100

expected_key_prefix = s3_key_prefix.format(
table="events",
@@ -968,6 +970,7 @@ async def insert_into_s3_activity_mocked(_: S3InsertInputs) -> str:
run = runs[0]
assert run.status == "FailedRetryable"
assert run.latest_error == "ValueError: A useful error message"
assert run.records_completed == 0


async def test_s3_export_workflow_handles_insert_activity_non_retryable_errors(ateam, s3_batch_export, interval):
Original file line number Diff line number Diff line change
@@ -455,6 +455,7 @@ async def test_snowflake_export_workflow_exports_events(

run = runs[0]
assert run.status == "Completed"
assert run.records_completed == 10


@pytest.mark.parametrize("interval", ["hour", "day"], indirect=True)
@@ -695,6 +696,7 @@ async def insert_into_snowflake_activity_mocked(_: SnowflakeInsertInputs) -> str
run = runs[0]
assert run.status == "FailedRetryable"
assert run.latest_error == "ValueError: A useful error message"
assert run.records_completed == 0


async def test_snowflake_export_workflow_handles_insert_activity_non_retryable_errors(ateam, snowflake_batch_export):