diff --git a/frontend/src/scenes/batch_exports/BatchExportEditForm.tsx b/frontend/src/scenes/batch_exports/BatchExportEditForm.tsx index 106da039cd22d..7fbbc8cc29d69 100644 --- a/frontend/src/scenes/batch_exports/BatchExportEditForm.tsx +++ b/frontend/src/scenes/batch_exports/BatchExportEditForm.tsx @@ -267,6 +267,15 @@ export function BatchExportsEditFields({ )} + Only required if exporting to an S3-compatible blob storage (like MinIO)} + > + + + bool: @contextlib.asynccontextmanager async def s3_client(self): """Asynchronously yield an S3 client.""" + async with self._session.client( "s3", region_name=self.region_name, aws_access_key_id=self.aws_access_key_id, aws_secret_access_key=self.aws_secret_access_key, + endpoint_url=self.endpoint_url, ) as client: yield client @@ -306,6 +310,7 @@ class S3InsertInputs: encryption: str | None = None kms_key_id: str | None = None batch_export_schema: BatchExportSchema | None = None + endpoint_url: str | None = None async def initialize_and_resume_multipart_upload(inputs: S3InsertInputs) -> tuple[S3MultiPartUpload, str]: @@ -321,6 +326,7 @@ async def initialize_and_resume_multipart_upload(inputs: S3InsertInputs) -> tupl region_name=inputs.region, aws_access_key_id=inputs.aws_access_key_id, aws_secret_access_key=inputs.aws_secret_access_key, + endpoint_url=inputs.endpoint_url, ) details = activity.info().heartbeat_details @@ -555,6 +561,7 @@ async def run(self, inputs: S3BatchExportInputs): team_id=inputs.team_id, aws_access_key_id=inputs.aws_access_key_id, aws_secret_access_key=inputs.aws_secret_access_key, + endpoint_url=inputs.endpoint_url, data_interval_start=data_interval_start.isoformat(), data_interval_end=data_interval_end.isoformat(), compression=inputs.compression, diff --git a/posthog/temporal/tests/batch_exports/README.md b/posthog/temporal/tests/batch_exports/README.md index 12bf2f6177cd3..c3f63f176ebde 100644 --- a/posthog/temporal/tests/batch_exports/README.md +++ b/posthog/temporal/tests/batch_exports/README.md @@ -6,7 +6,8 @@ This module contains unit tests covering activities, workflows, and helper funct BigQuery batch exports can be tested against a real BigQuery instance, but doing so requires additional setup. For this reason, these tests are skipped unless an environment variable pointing to a BigQuery credentials file (`GOOGLE_APPLICATION_CREDENTIALS=/path/to/my/project-credentials.json`) is set. -> :warning: Since BigQuery batch export tests require additional setup, we skip them by default and will not be ran by automated CI pipelines. Please ensure these tests pass when making changes that affect BigQuery batch exports. +> [!WARNING] +> Since BigQuery batch export tests require additional setup, we skip them by default and will not be ran by automated CI pipelines. Please ensure these tests pass when making changes that affect BigQuery batch exports. To enable testing for BigQuery batch exports, we require: 1. A BigQuery project and dataset @@ -24,7 +25,8 @@ DEBUG=1 GOOGLE_APPLICATION_CREDENTIALS=/path/to/my/project-credentials.json pyte Redshift batch exports can be tested against a real Redshift (or Redshift Serverless) instance, with additional setup steps required. Due to this requirement, these tests are skipped unless Redshift credentials are specified in the environment. -> :warning: Since Redshift batch export tests require additional setup, we skip them by default and will not be ran by automated CI pipelines. Please ensure these tests pass when making changes that affect Redshift batch exports. +> [!WARNING] +> Since Redshift batch export tests require additional setup, we skip them by default and will not be ran by automated CI pipelines. Please ensure these tests pass when making changes that affect Redshift batch exports. To enable testing for Redshift batch exports, we require: 1. A Redshift (or Redshift Serverless) instance. @@ -41,14 +43,15 @@ Replace the `REDSHIFT_*` environment variables with the values obtained from the ## Testing S3 batch exports -S3 batch exports are tested against a MinIO bucket available in the local development stack. However there are also unit tests that specifically target an S3 bucket. Additional setup is required to run those specific tests: +S3 batch exports are tested against a MinIO bucket available in the local development stack. However there are also unit tests that specifically target an S3 bucket (like `test_s3_export_workflow_with_s3_bucket`). Additional setup is required to run those specific tests: 1. Ensure you are logged in to an AWS account. 2. Create or choose an existing S3 bucket from that AWS account to use as the test bucket. 3. Create or choose an existing KMS key id from that AWS account to use in tests. 4. Make sure the role/user you are logged in as has permissions to use the bucket and KMS key. -For PostHog employees, check your password manager for these details. +> [!NOTE] +> For PostHog employees, your password manager contains a set of credentials for S3 batch exports development testing. You may populate your development environment with these credentials and use the provided test bucket and KMS key. With these setup steps done, we can run all tests (MinIO and S3 bucket) from the root of the `posthog` repo with: diff --git a/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py b/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py index 17b1aaaa94d4a..5e741f9223321 100644 --- a/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py +++ b/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py @@ -1,12 +1,10 @@ import asyncio -import contextlib import datetime as dt import functools import gzip import json import os from random import randint -from unittest import mock from uuid import uuid4 import aioboto3 @@ -337,6 +335,7 @@ async def test_insert_into_s3_activity_puts_data_into_s3( data_interval_end=data_interval_end.isoformat(), aws_access_key_id="object_storage_root_user", aws_secret_access_key="object_storage_root_password", + endpoint_url=settings.OBJECT_STORAGE_ENDPOINT, compression=compression, exclude_events=exclude_events, batch_export_schema=batch_export_schema, @@ -345,11 +344,7 @@ async def test_insert_into_s3_activity_puts_data_into_s3( with override_settings( BATCH_EXPORT_S3_UPLOAD_CHUNK_SIZE_BYTES=5 * 1024**2 ): # 5MB, the minimum for Multipart uploads - with mock.patch( - "posthog.temporal.batch_exports.s3_batch_export.aioboto3.Session.client", - side_effect=create_test_client, - ): - await activity_environment.run(insert_into_s3_activity, insert_inputs) + await activity_environment.run(insert_into_s3_activity, insert_inputs) await assert_clickhouse_records_in_s3( s3_compatible_client=minio_client, @@ -368,7 +363,14 @@ async def test_insert_into_s3_activity_puts_data_into_s3( @pytest_asyncio.fixture async def s3_batch_export( - ateam, s3_key_prefix, bucket_name, compression, interval, exclude_events, temporal_client, encryption + ateam, + s3_key_prefix, + bucket_name, + compression, + interval, + exclude_events, + temporal_client, + encryption, ): destination_data = { "type": "S3", @@ -378,6 +380,7 @@ async def s3_batch_export( "prefix": s3_key_prefix, "aws_access_key_id": "object_storage_root_user", "aws_secret_access_key": "object_storage_root_password", + "endpoint_url": settings.OBJECT_STORAGE_ENDPOINT, "compression": compression, "exclude_events": exclude_events, "encryption": encryption, @@ -479,19 +482,14 @@ async def test_s3_export_workflow_with_minio_bucket( ], workflow_runner=UnsandboxedWorkflowRunner(), ): - # We patch the S3 client to return our client that targets MinIO. - with mock.patch( - "posthog.temporal.batch_exports.s3_batch_export.aioboto3.Session.client", - side_effect=create_test_client, - ): - await activity_environment.client.execute_workflow( - S3BatchExportWorkflow.run, - inputs, - id=workflow_id, - task_queue=settings.TEMPORAL_TASK_QUEUE, - retry_policy=RetryPolicy(maximum_attempts=1), - execution_timeout=dt.timedelta(minutes=10), - ) + await activity_environment.client.execute_workflow( + S3BatchExportWorkflow.run, + inputs, + id=workflow_id, + task_queue=settings.TEMPORAL_TASK_QUEUE, + retry_policy=RetryPolicy(maximum_attempts=1), + execution_timeout=dt.timedelta(minutes=10), + ) runs = await afetch_batch_export_runs(batch_export_id=s3_batch_export.id) assert len(runs) == 1 @@ -523,10 +521,10 @@ async def s3_client(bucket_name, s3_key_prefix): using a disposable bucket to run these tests or sticking to other tests that use the local development MinIO. """ - async with aioboto3.Session().client("s3") as minio_client: - yield minio_client + async with aioboto3.Session().client("s3") as s3_client: + yield s3_client - await delete_all_from_s3(minio_client, bucket_name, key_prefix=s3_key_prefix) + await delete_all_from_s3(s3_client, bucket_name, key_prefix=s3_key_prefix) @pytest.mark.skipif( @@ -595,20 +593,20 @@ async def test_s3_export_workflow_with_s3_bucket( ) workflow_id = str(uuid4()) + destination_config = s3_batch_export.destination.config | { + "endpoint_url": None, + "aws_access_key_id": os.getenv("AWS_ACCESS_KEY_ID"), + "aws_secret_access_key": os.getenv("AWS_SECRET_ACCESS_KEY"), + } inputs = S3BatchExportInputs( team_id=ateam.pk, batch_export_id=str(s3_batch_export.id), data_interval_end=data_interval_end.isoformat(), interval=interval, batch_export_schema=batch_export_schema, - **s3_batch_export.destination.config, + **destination_config, ) - @contextlib.asynccontextmanager - async def create_minio_client(*args, **kwargs): - """Mock function to return an already initialized S3 client.""" - yield s3_client - async with await WorkflowEnvironment.start_time_skipping() as activity_environment: async with Worker( activity_environment.client, @@ -621,18 +619,14 @@ async def create_minio_client(*args, **kwargs): ], workflow_runner=UnsandboxedWorkflowRunner(), ): - with mock.patch( - "posthog.temporal.batch_exports.s3_batch_export.aioboto3.Session.client", - side_effect=create_minio_client, - ): - await activity_environment.client.execute_workflow( - S3BatchExportWorkflow.run, - inputs, - id=workflow_id, - task_queue=settings.TEMPORAL_TASK_QUEUE, - retry_policy=RetryPolicy(maximum_attempts=1), - execution_timeout=dt.timedelta(seconds=10), - ) + await activity_environment.client.execute_workflow( + S3BatchExportWorkflow.run, + inputs, + id=workflow_id, + task_queue=settings.TEMPORAL_TASK_QUEUE, + retry_policy=RetryPolicy(maximum_attempts=1), + execution_timeout=dt.timedelta(seconds=10), + ) runs = await afetch_batch_export_runs(batch_export_id=s3_batch_export.id) assert len(runs) == 1 @@ -641,7 +635,7 @@ async def create_minio_client(*args, **kwargs): assert run.status == "Completed" await assert_clickhouse_records_in_s3( - s3_compatible_client=minio_client, + s3_compatible_client=s3_client, clickhouse_client=clickhouse_client, bucket_name=bucket_name, key_prefix=s3_key_prefix, @@ -708,18 +702,14 @@ async def test_s3_export_workflow_with_minio_bucket_and_a_lot_of_data( ], workflow_runner=UnsandboxedWorkflowRunner(), ): - with mock.patch( - "posthog.temporal.batch_exports.s3_batch_export.aioboto3.Session.client", - side_effect=create_test_client, - ): - await activity_environment.client.execute_workflow( - S3BatchExportWorkflow.run, - inputs, - id=workflow_id, - task_queue=settings.TEMPORAL_TASK_QUEUE, - retry_policy=RetryPolicy(maximum_attempts=1), - execution_timeout=dt.timedelta(seconds=360), - ) + await activity_environment.client.execute_workflow( + S3BatchExportWorkflow.run, + inputs, + id=workflow_id, + task_queue=settings.TEMPORAL_TASK_QUEUE, + retry_policy=RetryPolicy(maximum_attempts=1), + execution_timeout=dt.timedelta(seconds=360), + ) runs = await afetch_batch_export_runs(batch_export_id=s3_batch_export.id) assert len(runs) == 1 @@ -787,18 +777,14 @@ async def test_s3_export_workflow_defaults_to_timestamp_on_null_inserted_at( ], workflow_runner=UnsandboxedWorkflowRunner(), ): - with mock.patch( - "posthog.temporal.batch_exports.s3_batch_export.aioboto3.Session.client", - side_effect=create_test_client, - ): - await activity_environment.client.execute_workflow( - S3BatchExportWorkflow.run, - inputs, - id=workflow_id, - task_queue=settings.TEMPORAL_TASK_QUEUE, - retry_policy=RetryPolicy(maximum_attempts=1), - execution_timeout=dt.timedelta(seconds=10), - ) + await activity_environment.client.execute_workflow( + S3BatchExportWorkflow.run, + inputs, + id=workflow_id, + task_queue=settings.TEMPORAL_TASK_QUEUE, + retry_policy=RetryPolicy(maximum_attempts=1), + execution_timeout=dt.timedelta(seconds=10), + ) runs = await afetch_batch_export_runs(batch_export_id=s3_batch_export.id) assert len(runs) == 1 @@ -875,18 +861,14 @@ async def test_s3_export_workflow_with_minio_bucket_and_custom_key_prefix( ], workflow_runner=UnsandboxedWorkflowRunner(), ): - with mock.patch( - "posthog.temporal.batch_exports.s3_batch_export.aioboto3.Session.client", - side_effect=create_test_client, - ): - await activity_environment.client.execute_workflow( - S3BatchExportWorkflow.run, - inputs, - id=workflow_id, - task_queue=settings.TEMPORAL_TASK_QUEUE, - retry_policy=RetryPolicy(maximum_attempts=1), - execution_timeout=dt.timedelta(seconds=10), - ) + await activity_environment.client.execute_workflow( + S3BatchExportWorkflow.run, + inputs, + id=workflow_id, + task_queue=settings.TEMPORAL_TASK_QUEUE, + retry_policy=RetryPolicy(maximum_attempts=1), + execution_timeout=dt.timedelta(seconds=10), + ) runs = await afetch_batch_export_runs(batch_export_id=s3_batch_export.id) assert len(runs) == 1 @@ -1283,14 +1265,11 @@ def assert_heartbeat_details(*details): data_interval_end=data_interval_end.isoformat(), aws_access_key_id="object_storage_root_user", aws_secret_access_key="object_storage_root_password", + endpoint_url=settings.OBJECT_STORAGE_ENDPOINT, ) with override_settings(BATCH_EXPORT_S3_UPLOAD_CHUNK_SIZE_BYTES=5 * 1024**2): - with mock.patch( - "posthog.temporal.batch_exports.s3_batch_export.aioboto3.Session.client", - side_effect=create_test_client, - ): - await activity_environment.run(insert_into_s3_activity, insert_inputs) + await activity_environment.run(insert_into_s3_activity, insert_inputs) # This checks that the assert_heartbeat_details function was actually called. # The '+ 1' is because we increment current_part_number one last time after we are done.