Skip to content

Commit

Permalink
fix: Don't use async logging as it's unsupported by temporal runtime
Browse files Browse the repository at this point in the history
  • Loading branch information
tomasfarias committed Nov 2, 2023
1 parent 6c60f36 commit ed57cc9
Show file tree
Hide file tree
Showing 9 changed files with 44 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@

TEST_TIME = dt.datetime.utcnow()

pytestmark = [pytest.mark.asyncio_event_loop, pytest.mark.asyncio]


def assert_events_in_bigquery(client, table_id, dataset_id, events, bq_ingested_timestamp):
"""Assert provided events written to a given BigQuery table."""
Expand Down Expand Up @@ -121,7 +123,6 @@ def bigquery_client() -> bigquery.Client:
reason="Google credentials not set in environment",
)
@pytest.mark.django_db
@pytest.mark.asyncio
async def test_insert_into_bigquery_activity_inserts_data_into_bigquery_table(
activity_environment, bigquery_client, bigquery_config
):
Expand Down Expand Up @@ -271,7 +272,6 @@ async def test_insert_into_bigquery_activity_inserts_data_into_bigquery_table(
reason="Google credentials not set in environment",
)
@pytest.mark.django_db
@pytest.mark.asyncio
@pytest.mark.parametrize("interval", ["hour", "day"])
async def test_bigquery_export_workflow(
bigquery_config,
Expand Down Expand Up @@ -480,7 +480,6 @@ async def batch_export(team):


@pytest.mark.django_db
@pytest.mark.asyncio
async def test_bigquery_export_workflow_handles_insert_activity_errors(team, batch_export):
"""Test that BigQuery Export Workflow can gracefully handle errors when inserting BigQuery data."""
workflow_id = str(uuid4())
Expand Down Expand Up @@ -525,7 +524,6 @@ async def insert_into_bigquery_activity_mocked(_: BigQueryInsertInputs) -> str:


@pytest.mark.django_db
@pytest.mark.asyncio
async def test_bigquery_export_workflow_handles_cancellation(team, batch_export):
"""Test that BigQuery Export Workflow can gracefully handle cancellations when inserting BigQuery data."""
workflow_id = str(uuid4())
Expand Down Expand Up @@ -561,6 +559,7 @@ async def never_finish_activity(_: BigQueryInsertInputs) -> str:
task_queue=settings.TEMPORAL_TASK_QUEUE,
retry_policy=RetryPolicy(maximum_attempts=1),
)

await asyncio.sleep(5)
await handle.cancel()

Expand Down
8 changes: 4 additions & 4 deletions posthog/temporal/workflows/backfill_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ def parse_inputs(inputs: list[str]) -> BackfillBatchExportInputs:
async def run(self, inputs: BackfillBatchExportInputs) -> None:
"""Workflow implementation to backfill a BatchExport."""
logger = await bind_batch_exports_logger(team_id=inputs.team_id)
await logger.ainfo(
logger.info(
"Starting Backfill for BatchExport: %s - %s",
inputs.start_at,
inputs.end_at,
Expand Down Expand Up @@ -347,16 +347,16 @@ async def run(self, inputs: BackfillBatchExportInputs) -> None:

except temporalio.exceptions.ActivityError as e:
if isinstance(e.cause, temporalio.exceptions.CancelledError):
await logger.aerror("Backfill was cancelled.")
logger.error("Backfill was cancelled.")
update_inputs.status = "Cancelled"
else:
await logger.aexception("Backfill failed.", exc_info=e.cause)
logger.exception("Backfill failed.", exc_info=e.cause)
update_inputs.status = "Failed"

raise

except Exception as e:
await logger.aexception("Backfill failed with an unexpected error.", exc_info=e)
logger.exception("Backfill failed with an unexpected error.", exc_info=e)
update_inputs.status = "Failed"
raise

Expand Down
8 changes: 4 additions & 4 deletions posthog/temporal/workflows/batch_exports.py
Original file line number Diff line number Diff line change
Expand Up @@ -607,23 +607,23 @@ async def execute_batch_export_insert_activity(
)
except exceptions.ActivityError as e:
if isinstance(e.cause, exceptions.CancelledError):
await logger.error("BatchExport was cancelled.")
logger.error("BatchExport was cancelled.")
update_inputs.status = "Cancelled"
else:
await logger.exception("BatchExport failed.", exc_info=e.cause)
logger.exception("BatchExport failed.", exc_info=e.cause)
update_inputs.status = "Failed"

update_inputs.latest_error = str(e.cause)
raise

except Exception as e:
await logger.exception("BatchExport failed with an unexpected error.", exc_info=e)
logger.exception("BatchExport failed with an unexpected error.", exc_info=e)
update_inputs.status = "Failed"
update_inputs.latest_error = "An unexpected error has ocurred"
raise

else:
await logger.info(
logger.info(
"Successfully finished exporting batch %s - %s", inputs.data_interval_start, inputs.data_interval_end
)

Expand Down
12 changes: 6 additions & 6 deletions posthog/temporal/workflows/bigquery_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def bigquery_client(inputs: BigQueryInsertInputs):
async def insert_into_bigquery_activity(inputs: BigQueryInsertInputs):
"""Activity streams data from ClickHouse to BigQuery."""
logger = await bind_batch_exports_logger(team_id=inputs.team_id, destination="BigQuery")
await logger.ainfo(
logger.info(
"Exporting batch %s - %s",
inputs.data_interval_start,
inputs.data_interval_end,
Expand All @@ -119,14 +119,14 @@ async def insert_into_bigquery_activity(inputs: BigQueryInsertInputs):
)

if count == 0:
await logger.ainfo(
logger.info(
"Nothing to export in batch %s - %s",
inputs.data_interval_start,
inputs.data_interval_end,
)
return

await logger.ainfo("BatchExporting %s rows", count)
logger.info("BatchExporting %s rows", count)

results_iterator = get_results_iterator(
client=client,
Expand Down Expand Up @@ -173,7 +173,7 @@ async def insert_into_bigquery_activity(inputs: BigQueryInsertInputs):
jsonl_file.write_records_to_jsonl([row])

if jsonl_file.tell() > settings.BATCH_EXPORT_BIGQUERY_UPLOAD_CHUNK_SIZE_BYTES:
await logger.ainfo(
logger.info(
"Copying %s records of size %s bytes",
jsonl_file.records_since_last_reset,
jsonl_file.bytes_since_last_reset,
Expand All @@ -187,7 +187,7 @@ async def insert_into_bigquery_activity(inputs: BigQueryInsertInputs):
jsonl_file.reset()

if jsonl_file.tell() > 0:
await logger.ainfo(
logger.info(
"Copying %s records of size %s bytes",
jsonl_file.records_since_last_reset,
jsonl_file.bytes_since_last_reset,
Expand Down Expand Up @@ -216,7 +216,7 @@ async def run(self, inputs: BigQueryBatchExportInputs):
"""Workflow implementation to export data to BigQuery."""
logger = await bind_batch_exports_logger(team_id=inputs.team_id, destination="BigQuery")
data_interval_start, data_interval_end = get_data_interval(inputs.interval, inputs.data_interval_end)
await logger.ainfo(
logger.info(
"Starting batch export %s - %s",
data_interval_start,
data_interval_end,
Expand Down
1 change: 0 additions & 1 deletion posthog/temporal/workflows/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ async def worker_shutdown_handler():
"""
await temporalio.activity.wait_for_worker_shutdown()

await log_queue.join()
listen_task.cancel()

await asyncio.wait([listen_task])
Expand Down
12 changes: 6 additions & 6 deletions posthog/temporal/workflows/postgres_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ class PostgresInsertInputs:
async def insert_into_postgres_activity(inputs: PostgresInsertInputs):
"""Activity streams data from ClickHouse to Postgres."""
logger = await bind_batch_exports_logger(team_id=inputs.team_id, destination="PostgreSQL")
await logger.ainfo(
logger.info(
"Running Postgres export batch %s - %s",
inputs.data_interval_start,
inputs.data_interval_end,
Expand All @@ -164,14 +164,14 @@ async def insert_into_postgres_activity(inputs: PostgresInsertInputs):
)

if count == 0:
await logger.ainfo(
logger.info(
"Nothing to export in batch %s - %s",
inputs.data_interval_start,
inputs.data_interval_end,
)
return

await logger.ainfo("BatchExporting %s rows to Postgres", count)
logger.info("BatchExporting %s rows to Postgres", count)

results_iterator = get_results_iterator(
client=client,
Expand Down Expand Up @@ -226,7 +226,7 @@ async def insert_into_postgres_activity(inputs: PostgresInsertInputs):
pg_file.write_records_to_tsv([row], fieldnames=schema_columns)

if pg_file.tell() > settings.BATCH_EXPORT_POSTGRES_UPLOAD_CHUNK_SIZE_BYTES:
await logger.ainfo(
logger.info(
"Copying %s records of size %s bytes to Postgres",
pg_file.records_since_last_reset,
pg_file.bytes_since_last_reset,
Expand All @@ -241,7 +241,7 @@ async def insert_into_postgres_activity(inputs: PostgresInsertInputs):
pg_file.reset()

if pg_file.tell() > 0:
await logger.ainfo(
logger.info(
"Copying %s records of size %s bytes to Postgres",
pg_file.records_since_last_reset,
pg_file.bytes_since_last_reset,
Expand Down Expand Up @@ -276,7 +276,7 @@ async def run(self, inputs: PostgresBatchExportInputs):
"""Workflow implementation to export data to Postgres."""
logger = await bind_batch_exports_logger(team_id=inputs.team_id, destination="PostgreSQL")
data_interval_start, data_interval_end = get_data_interval(inputs.interval, inputs.data_interval_end)
await logger.ainfo(
logger.info(
"Starting Postgres export batch %s - %s",
data_interval_start,
data_interval_end,
Expand Down
8 changes: 4 additions & 4 deletions posthog/temporal/workflows/redshift_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ async def insert_into_redshift_activity(inputs: RedshiftInsertInputs):
fields.
"""
logger = await bind_batch_exports_logger(team_id=inputs.team_id, destination="Redshift")
await logger.ainfo(
logger.info(
"Exporting batch %s - %s",
inputs.data_interval_start,
inputs.data_interval_end,
Expand All @@ -131,14 +131,14 @@ async def insert_into_redshift_activity(inputs: RedshiftInsertInputs):
)

if count == 0:
await logger.ainfo(
logger.info(
"Nothing to export in batch %s - %s",
inputs.data_interval_start,
inputs.data_interval_end,
)
return

await logger.ainfo("BatchExporting %s rows", count)
logger.info("BatchExporting %s rows", count)

results_iterator = get_results_iterator(
client=client,
Expand Down Expand Up @@ -219,7 +219,7 @@ async def run(self, inputs: RedshiftBatchExportInputs):
"""Workflow implementation to export data to Redshift."""
logger = await bind_batch_exports_logger(team_id=inputs.team_id, destination="Redshift")
data_interval_start, data_interval_end = get_data_interval(inputs.interval, inputs.data_interval_end)
await logger.ainfo("Starting Redshift export batch %s - %s", data_interval_start, data_interval_end)
logger.info("Starting Redshift export batch %s - %s", data_interval_start, data_interval_end)

create_export_run_inputs = CreateBatchExportRunInputs(
team_id=inputs.team_id,
Expand Down
22 changes: 11 additions & 11 deletions posthog/temporal/workflows/s3_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,20 +323,20 @@ async def initialize_and_resume_multipart_upload(inputs: S3InsertInputs) -> tupl
except IndexError:
# This is the error we expect when no details as the sequence will be empty.
interval_start = inputs.data_interval_start
await logger.adebug(
logger.debug(
"Did not receive details from previous activity Excecution. Export will start from the beginning %s",
interval_start,
)
except Exception:
# We still start from the beginning, but we make a point to log unexpected errors.
# Ideally, any new exceptions should be added to the previous block after the first time and we will never land here.
interval_start = inputs.data_interval_start
await logger.awarning(
logger.warning(
"Did not receive details from previous activity Excecution due to an unexpected error. Export will start from the beginning %s",
interval_start,
)
else:
await logger.ainfo(
logger.info(
"Received details from previous activity. Export will attempt to resume from %s",
interval_start,
)
Expand All @@ -346,7 +346,7 @@ async def initialize_and_resume_multipart_upload(inputs: S3InsertInputs) -> tupl
# Even if we receive details we cannot resume a brotli compressed upload as we have lost the compressor state.
interval_start = inputs.data_interval_start

await logger.ainfo(
logger.info(
f"Export will start from the beginning as we are using brotli compression: %s",
interval_start,
)
Expand All @@ -367,7 +367,7 @@ async def insert_into_s3_activity(inputs: S3InsertInputs):
files.
"""
logger = await bind_batch_exports_logger(team_id=inputs.team_id, destination="S3")
await logger.ainfo(
logger.info(
"Exporting batch %s - %s",
inputs.data_interval_start,
inputs.data_interval_end,
Expand All @@ -387,14 +387,14 @@ async def insert_into_s3_activity(inputs: S3InsertInputs):
)

if count == 0:
await logger.ainfo(
logger.info(
"Nothing to export in batch %s - %s",
inputs.data_interval_start,
inputs.data_interval_end,
)
return

await logger.ainfo("BatchExporting %s rows to S3", count)
logger.info("BatchExporting %s rows to S3", count)

s3_upload, interval_start = await initialize_and_resume_multipart_upload(inputs)

Expand All @@ -420,7 +420,7 @@ async def insert_into_s3_activity(inputs: S3InsertInputs):
async def worker_shutdown_handler():
"""Handle the Worker shutting down by heart-beating our latest status."""
await activity.wait_for_worker_shutdown()
await logger.awarn(
logger.warn(
f"Worker shutting down! Reporting back latest exported part {last_uploaded_part_timestamp}",
)
activity.heartbeat(last_uploaded_part_timestamp, s3_upload.to_state())
Expand All @@ -446,7 +446,7 @@ async def worker_shutdown_handler():
local_results_file.write_records_to_jsonl([record])

if local_results_file.tell() > settings.BATCH_EXPORT_S3_UPLOAD_CHUNK_SIZE_BYTES:
await logger.info(
logger.info(
"Uploading part %s containing %s records with size %s bytes",
s3_upload.part_number + 1,
local_results_file.records_since_last_reset,
Expand All @@ -461,7 +461,7 @@ async def worker_shutdown_handler():
local_results_file.reset()

if local_results_file.tell() > 0 and result is not None:
await logger.ainfo(
logger.info(
"Uploading last part %s containing %s records with size %s bytes",
s3_upload.part_number + 1,
local_results_file.records_since_last_reset,
Expand Down Expand Up @@ -496,7 +496,7 @@ async def run(self, inputs: S3BatchExportInputs):
"""Workflow implementation to export data to S3 bucket."""
logger = await bind_batch_exports_logger(team_id=inputs.team_id, destination="S3")
data_interval_start, data_interval_end = get_data_interval(inputs.interval, inputs.data_interval_end)
await logger.ainfo("Starting batch export %s - %s", data_interval_start, data_interval_end)
logger.info("Starting batch export %s - %s", data_interval_start, data_interval_end)

create_export_run_inputs = CreateBatchExportRunInputs(
team_id=inputs.team_id,
Expand Down
Loading

0 comments on commit ed57cc9

Please sign in to comment.