diff --git a/posthog/temporal/tests/batch_exports/test_bigquery_batch_export_workflow.py b/posthog/temporal/tests/batch_exports/test_bigquery_batch_export_workflow.py index d21249d43d8ef3..8ea50196d8b2cd 100644 --- a/posthog/temporal/tests/batch_exports/test_bigquery_batch_export_workflow.py +++ b/posthog/temporal/tests/batch_exports/test_bigquery_batch_export_workflow.py @@ -108,9 +108,6 @@ def bigquery_config() -> dict[str, str]: "private_key_id": credentials["private_key_id"], "token_uri": credentials["token_uri"], "client_email": credentials["client_email"], - # Not part of the credentials. - # Hardcoded to test dataset. - "dataset_id": "BatchExports", } @@ -119,10 +116,25 @@ def bigquery_client() -> typing.Generator[bigquery.Client, None, None]: """Manage a bigquery.Client for testing.""" client = bigquery.Client() - try: - yield client - finally: - client.close() + yield client + + client.close() + + +@pytest.fixture +def bigquery_dataset(bigquery_config, bigquery_client) -> typing.Generator[bigquery.Dataset, None, None]: + """Manage a bigquery dataset for testing. + + We clean up the dataset after every test. Could be quite time expensive, but guarantees a clean slate. + """ + dataset_id = f"{bigquery_config['project_id']}.BatchExportsTest_{str(uuid4()).replace('-', '')}" + + dataset = bigquery.Dataset(dataset_id) + dataset = bigquery_client.create_dataset(dataset) + + yield dataset + + bigquery_client.delete_dataset(dataset_id, delete_contents=True, not_found_ok=True) @pytest.mark.skipif( @@ -131,7 +143,7 @@ def bigquery_client() -> typing.Generator[bigquery.Client, None, None]: ) @pytest.mark.parametrize("exclude_events", [None, ["test-exclude"]], indirect=True) async def test_insert_into_bigquery_activity_inserts_data_into_bigquery_table( - clickhouse_client, activity_environment, bigquery_client, bigquery_config, exclude_events + clickhouse_client, activity_environment, bigquery_client, bigquery_config, exclude_events, bigquery_dataset ): """Test that the insert_into_bigquery_activity function inserts data into a BigQuery table. @@ -194,6 +206,7 @@ async def test_insert_into_bigquery_activity_inserts_data_into_bigquery_table( insert_inputs = BigQueryInsertInputs( team_id=team_id, table_id=f"test_insert_activity_table_{team_id}", + dataset_id=bigquery_dataset.dataset_id, data_interval_start=data_interval_start.isoformat(), data_interval_end=data_interval_end.isoformat(), exclude_events=exclude_events, @@ -208,7 +221,7 @@ async def test_insert_into_bigquery_activity_inserts_data_into_bigquery_table( assert_events_in_bigquery( client=bigquery_client, table_id=f"test_insert_activity_table_{team_id}", - dataset_id=bigquery_config["dataset_id"], + dataset_id=bigquery_dataset.dataset_id, events=events + events_with_no_properties, bq_ingested_timestamp=ingested_timestamp, exclude_events=exclude_events, @@ -221,12 +234,15 @@ def table_id(ateam, interval): @pytest_asyncio.fixture -async def bigquery_batch_export(ateam, table_id, bigquery_config, interval, exclude_events, temporal_client): +async def bigquery_batch_export( + ateam, table_id, bigquery_config, interval, exclude_events, temporal_client, bigquery_dataset +): destination_data = { "type": "BigQuery", "config": { **bigquery_config, "table_id": table_id, + "dataset_id": bigquery_dataset.dataset_id, "exclude_events": exclude_events, }, } @@ -257,7 +273,6 @@ async def bigquery_batch_export(ateam, table_id, bigquery_config, interval, excl @pytest.mark.parametrize("exclude_events", [None, ["test-exclude"]], indirect=True) async def test_bigquery_export_workflow( clickhouse_client, - bigquery_config, bigquery_client, bigquery_batch_export, interval, @@ -303,7 +318,7 @@ async def test_bigquery_export_workflow( inputs = BigQueryBatchExportInputs( team_id=ateam.pk, batch_export_id=str(bigquery_batch_export.id), - data_interval_end="2023-04-25 14:30:00.000000", + data_interval_end=data_interval_end.isoformat(), interval=interval, **bigquery_batch_export.destination.config, ) @@ -340,7 +355,7 @@ async def test_bigquery_export_workflow( assert_events_in_bigquery( client=bigquery_client, table_id=table_id, - dataset_id=bigquery_config["dataset_id"], + dataset_id=bigquery_batch_export.destination.config["dataset_id"], events=events, bq_ingested_timestamp=ingested_timestamp, exclude_events=exclude_events,