diff --git a/posthog/temporal/tests/batch_exports/test_batch_exports.py b/posthog/temporal/tests/batch_exports/test_batch_exports.py index 15f726255cce45..097f13869d2ebd 100644 --- a/posthog/temporal/tests/batch_exports/test_batch_exports.py +++ b/posthog/temporal/tests/batch_exports/test_batch_exports.py @@ -19,7 +19,7 @@ json_dumps_bytes, ) -pytestmark = [pytest.mark.django_db, pytest.mark.asyncio] +pytestmark = [pytest.mark.asyncio, pytest.mark.django_db] async def test_get_rows_count(clickhouse_client): diff --git a/posthog/temporal/tests/batch_exports/test_bigquery_batch_export_workflow.py b/posthog/temporal/tests/batch_exports/test_bigquery_batch_export_workflow.py index 8ea50196d8b2cd..d6696407c60e26 100644 --- a/posthog/temporal/tests/batch_exports/test_bigquery_batch_export_workflow.py +++ b/posthog/temporal/tests/batch_exports/test_bigquery_batch_export_workflow.py @@ -30,7 +30,13 @@ insert_into_bigquery_activity, ) -pytestmark = [pytest.mark.asyncio, pytest.mark.asyncio_event_loop, pytest.mark.django_db] +SKIP_IF_MISSING_GOOGLE_APPLICATION_CREDENTIALS = pytest.mark.skipif( + "GOOGLE_APPLICATION_CREDENTIALS" not in os.environ, + reason="Google credentials not set in environment", +) + +pytestmark = [SKIP_IF_MISSING_GOOGLE_APPLICATION_CREDENTIALS, pytest.mark.asyncio, pytest.mark.django_db] + TEST_TIME = dt.datetime.utcnow() @@ -137,10 +143,6 @@ def bigquery_dataset(bigquery_config, bigquery_client) -> typing.Generator[bigqu bigquery_client.delete_dataset(dataset_id, delete_contents=True, not_found_ok=True) -@pytest.mark.skipif( - "GOOGLE_APPLICATION_CREDENTIALS" not in os.environ, - reason="Google credentials not set in environment", -) @pytest.mark.parametrize("exclude_events", [None, ["test-exclude"]], indirect=True) async def test_insert_into_bigquery_activity_inserts_data_into_bigquery_table( clickhouse_client, activity_environment, bigquery_client, bigquery_config, exclude_events, bigquery_dataset @@ -265,10 +267,6 @@ async def bigquery_batch_export( await adelete_batch_export(batch_export, temporal_client) -@pytest.mark.skipif( - "GOOGLE_APPLICATION_CREDENTIALS" not in os.environ, - reason="Google credentials not set in environment", -) @pytest.mark.parametrize("interval", ["hour", "day"]) @pytest.mark.parametrize("exclude_events", [None, ["test-exclude"]], indirect=True) async def test_bigquery_export_workflow( @@ -362,10 +360,6 @@ async def test_bigquery_export_workflow( ) -@pytest.mark.skipif( - "GOOGLE_APPLICATION_CREDENTIALS" not in os.environ, - reason="Google credentials not set in environment", -) async def test_bigquery_export_workflow_handles_insert_activity_errors(ateam, bigquery_batch_export, interval): """Test that BigQuery Export Workflow can gracefully handle errors when inserting BigQuery data.""" data_interval_end = dt.datetime.fromisoformat("2023-04-25T14:30:00.000000+00:00") @@ -412,10 +406,6 @@ async def insert_into_bigquery_activity_mocked(_: BigQueryInsertInputs) -> str: assert run.latest_error == "ValueError: A useful error message" -@pytest.mark.skipif( - "GOOGLE_APPLICATION_CREDENTIALS" not in os.environ, - reason="Google credentials not set in environment", -) async def test_bigquery_export_workflow_handles_cancellation(ateam, bigquery_batch_export, interval): """Test that BigQuery Export Workflow can gracefully handle cancellations when inserting BigQuery data.""" data_interval_end = dt.datetime.fromisoformat("2023-04-25T14:30:00.000000+00:00") diff --git a/posthog/temporal/tests/batch_exports/test_postgres_batch_export_workflow.py b/posthog/temporal/tests/batch_exports/test_postgres_batch_export_workflow.py index b87f5ac369d965..1cdfb595b1e477 100644 --- a/posthog/temporal/tests/batch_exports/test_postgres_batch_export_workflow.py +++ b/posthog/temporal/tests/batch_exports/test_postgres_batch_export_workflow.py @@ -29,7 +29,10 @@ insert_into_postgres_activity, ) -pytestmark = [pytest.mark.asyncio, pytest.mark.asyncio_event_loop, pytest.mark.django_db] +pytestmark = [ + pytest.mark.asyncio, + pytest.mark.django_db, +] def assert_events_in_postgres(connection, schema, table_name, events, exclude_events: list[str] | None = None): diff --git a/posthog/temporal/tests/conftest.py b/posthog/temporal/tests/conftest.py index 7c756ed44f717a..4c480989db92b6 100644 --- a/posthog/temporal/tests/conftest.py +++ b/posthog/temporal/tests/conftest.py @@ -63,7 +63,7 @@ def activity_environment(): return ActivityEnvironment() -@pytest.fixture(scope="module") +@pytest.fixture def clickhouse_client(): """Provide a ClickHouseClient to use in tests.""" client = ClickHouseClient( @@ -76,14 +76,7 @@ def clickhouse_client(): yield client -@pytest.fixture(scope="module") -def event_loop(): - loop = asyncio.get_event_loop() - yield loop - loop.close() - - -@pytest_asyncio.fixture(scope="module") +@pytest_asyncio.fixture async def temporal_client(): """Provide a temporalio.client.Client to use in tests.""" client = await connect( diff --git a/posthog/temporal/workflows/postgres_batch_export.py b/posthog/temporal/workflows/postgres_batch_export.py index 70bae5e92e40fb..2f489889d08d78 100644 --- a/posthog/temporal/workflows/postgres_batch_export.py +++ b/posthog/temporal/workflows/postgres_batch_export.py @@ -1,3 +1,4 @@ +import asyncio import collections.abc import contextlib import datetime as dt @@ -54,7 +55,7 @@ def postgres_connection(inputs) -> collections.abc.Iterator[psycopg2.extensions. connection.close() -def copy_tsv_to_postgres( +async def copy_tsv_to_postgres( tsv_file, postgres_connection: psycopg2.extensions.connection, schema: str, @@ -75,7 +76,8 @@ def copy_tsv_to_postgres( with postgres_connection.cursor() as cursor: if schema: cursor.execute(sql.SQL("SET search_path TO {schema}").format(schema=sql.Identifier(schema))) - cursor.copy_from( + await asyncio.to_thread( + cursor.copy_from, tsv_file, table_name, null="", @@ -231,7 +233,7 @@ async def insert_into_postgres_activity(inputs: PostgresInsertInputs): pg_file.records_since_last_reset, pg_file.bytes_since_last_reset, ) - copy_tsv_to_postgres( + await copy_tsv_to_postgres( pg_file, connection, inputs.schema, @@ -246,7 +248,7 @@ async def insert_into_postgres_activity(inputs: PostgresInsertInputs): pg_file.records_since_last_reset, pg_file.bytes_since_last_reset, ) - copy_tsv_to_postgres( + await copy_tsv_to_postgres( pg_file, connection, inputs.schema,