Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support for s3 compatible destinations in batch exports #20894

Merged
merged 9 commits into from
Mar 20, 2024
Merged
9 changes: 9 additions & 0 deletions frontend/src/scenes/batch_exports/BatchExportEditForm.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,15 @@ export function BatchExportsEditFields({
)}
</div>

<LemonField
name="endpoint_url"
label="Endpoint URL"
showOptional
info={<>Only required if exporting to an S3-compatible blob storage (like MinIO)</>}
>
<LemonInput placeholder={isNew ? 'e.g. https://your-minio-host:9000' : 'Leave unchanged'} />
</LemonField>

<LemonField name="exclude_events" label="Events to exclude" className="flex-1">
<LemonInputSelect
mode="multiple"
Expand Down
1 change: 1 addition & 0 deletions frontend/src/scenes/batch_exports/BatchExports.stories.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ export default {
prefix: 'my-prefix',
aws_access_key_id: 'my-access-key-id',
aws_secret_access_key: '',
endpoint_url: null,
compression: null,
exclude_events: [],
include_events: [],
Expand Down
1 change: 1 addition & 0 deletions frontend/src/scenes/batch_exports/batchExportEditLogic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ export const batchExportFormFields = (
kms_key_id: !config.kms_key_id && config.encryption == 'aws:kms' ? 'This field is required' : '',
exclude_events: '',
include_events: '',
endpoint_url: null,
}
: destination === 'BigQuery'
? {
Expand Down
1 change: 1 addition & 0 deletions frontend/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3595,6 +3595,7 @@ export type BatchExportDestinationS3 = {
compression: string | null
encryption: string | null
kms_key_id: string | null
endpoint_url: string | null
}
}

Expand Down
1 change: 1 addition & 0 deletions posthog/batch_exports/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ class S3BatchExportInputs:
encryption: str | None = None
kms_key_id: str | None = None
batch_export_schema: BatchExportSchema | None = None
endpoint_url: str | None = None


@dataclass
Expand Down
7 changes: 7 additions & 0 deletions posthog/temporal/batch_exports/s3_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,13 @@ def __init__(
kms_key_id: str | None,
aws_access_key_id: str | None = None,
aws_secret_access_key: str | None = None,
endpoint_url: str | None = None,
):
self._session = aioboto3.Session()
self.region_name = region_name
self.aws_access_key_id = aws_access_key_id
self.aws_secret_access_key = aws_secret_access_key
self.endpoint_url = endpoint_url
self.bucket_name = bucket_name
self.key = key
self.encryption = encryption
Expand Down Expand Up @@ -154,11 +156,13 @@ def is_upload_in_progress(self) -> bool:
@contextlib.asynccontextmanager
async def s3_client(self):
"""Asynchronously yield an S3 client."""

async with self._session.client(
"s3",
region_name=self.region_name,
aws_access_key_id=self.aws_access_key_id,
aws_secret_access_key=self.aws_secret_access_key,
endpoint_url=self.endpoint_url,
) as client:
yield client

Expand Down Expand Up @@ -306,6 +310,7 @@ class S3InsertInputs:
encryption: str | None = None
kms_key_id: str | None = None
batch_export_schema: BatchExportSchema | None = None
endpoint_url: str | None = None


async def initialize_and_resume_multipart_upload(inputs: S3InsertInputs) -> tuple[S3MultiPartUpload, str]:
Expand All @@ -321,6 +326,7 @@ async def initialize_and_resume_multipart_upload(inputs: S3InsertInputs) -> tupl
region_name=inputs.region,
aws_access_key_id=inputs.aws_access_key_id,
aws_secret_access_key=inputs.aws_secret_access_key,
endpoint_url=inputs.endpoint_url,
)

details = activity.info().heartbeat_details
Expand Down Expand Up @@ -555,6 +561,7 @@ async def run(self, inputs: S3BatchExportInputs):
team_id=inputs.team_id,
aws_access_key_id=inputs.aws_access_key_id,
aws_secret_access_key=inputs.aws_secret_access_key,
endpoint_url=inputs.endpoint_url,
data_interval_start=data_interval_start.isoformat(),
data_interval_end=data_interval_end.isoformat(),
compression=inputs.compression,
Expand Down
11 changes: 7 additions & 4 deletions posthog/temporal/tests/batch_exports/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ This module contains unit tests covering activities, workflows, and helper funct

BigQuery batch exports can be tested against a real BigQuery instance, but doing so requires additional setup. For this reason, these tests are skipped unless an environment variable pointing to a BigQuery credentials file (`GOOGLE_APPLICATION_CREDENTIALS=/path/to/my/project-credentials.json`) is set.

> :warning: Since BigQuery batch export tests require additional setup, we skip them by default and will not be ran by automated CI pipelines. Please ensure these tests pass when making changes that affect BigQuery batch exports.
> [!WARNING]
> Since BigQuery batch export tests require additional setup, we skip them by default and will not be ran by automated CI pipelines. Please ensure these tests pass when making changes that affect BigQuery batch exports.

To enable testing for BigQuery batch exports, we require:
1. A BigQuery project and dataset
Expand All @@ -24,7 +25,8 @@ DEBUG=1 GOOGLE_APPLICATION_CREDENTIALS=/path/to/my/project-credentials.json pyte

Redshift batch exports can be tested against a real Redshift (or Redshift Serverless) instance, with additional setup steps required. Due to this requirement, these tests are skipped unless Redshift credentials are specified in the environment.

> :warning: Since Redshift batch export tests require additional setup, we skip them by default and will not be ran by automated CI pipelines. Please ensure these tests pass when making changes that affect Redshift batch exports.
> [!WARNING]
> Since Redshift batch export tests require additional setup, we skip them by default and will not be ran by automated CI pipelines. Please ensure these tests pass when making changes that affect Redshift batch exports.

To enable testing for Redshift batch exports, we require:
1. A Redshift (or Redshift Serverless) instance.
Expand All @@ -41,14 +43,15 @@ Replace the `REDSHIFT_*` environment variables with the values obtained from the

## Testing S3 batch exports

S3 batch exports are tested against a MinIO bucket available in the local development stack. However there are also unit tests that specifically target an S3 bucket. Additional setup is required to run those specific tests:
S3 batch exports are tested against a MinIO bucket available in the local development stack. However there are also unit tests that specifically target an S3 bucket (like `test_s3_export_workflow_with_s3_bucket`). Additional setup is required to run those specific tests:

1. Ensure you are logged in to an AWS account.
2. Create or choose an existing S3 bucket from that AWS account to use as the test bucket.
3. Create or choose an existing KMS key id from that AWS account to use in tests.
4. Make sure the role/user you are logged in as has permissions to use the bucket and KMS key.

For PostHog employees, check your password manager for these details.
> [!NOTE]
> For PostHog employees, your password manager contains a set of credentials for S3 batch exports development testing. You may populate your development environment with these credentials and use the provided test bucket and KMS key.

With these setup steps done, we can run all tests (MinIO and S3 bucket) from the root of the `posthog` repo with:

Expand Down
147 changes: 63 additions & 84 deletions posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
import asyncio
import contextlib
import datetime as dt
import functools
import gzip
import json
import os
from random import randint
from unittest import mock
from uuid import uuid4

import aioboto3
Expand Down Expand Up @@ -337,6 +335,7 @@ async def test_insert_into_s3_activity_puts_data_into_s3(
data_interval_end=data_interval_end.isoformat(),
aws_access_key_id="object_storage_root_user",
aws_secret_access_key="object_storage_root_password",
endpoint_url=settings.OBJECT_STORAGE_ENDPOINT,
compression=compression,
exclude_events=exclude_events,
batch_export_schema=batch_export_schema,
Expand All @@ -345,11 +344,7 @@ async def test_insert_into_s3_activity_puts_data_into_s3(
with override_settings(
BATCH_EXPORT_S3_UPLOAD_CHUNK_SIZE_BYTES=5 * 1024**2
): # 5MB, the minimum for Multipart uploads
with mock.patch(
"posthog.temporal.batch_exports.s3_batch_export.aioboto3.Session.client",
side_effect=create_test_client,
):
await activity_environment.run(insert_into_s3_activity, insert_inputs)
await activity_environment.run(insert_into_s3_activity, insert_inputs)

await assert_clickhouse_records_in_s3(
s3_compatible_client=minio_client,
Expand All @@ -368,7 +363,14 @@ async def test_insert_into_s3_activity_puts_data_into_s3(

@pytest_asyncio.fixture
async def s3_batch_export(
ateam, s3_key_prefix, bucket_name, compression, interval, exclude_events, temporal_client, encryption
ateam,
s3_key_prefix,
bucket_name,
compression,
interval,
exclude_events,
temporal_client,
encryption,
):
destination_data = {
"type": "S3",
Expand All @@ -378,6 +380,7 @@ async def s3_batch_export(
"prefix": s3_key_prefix,
"aws_access_key_id": "object_storage_root_user",
"aws_secret_access_key": "object_storage_root_password",
"endpoint_url": settings.OBJECT_STORAGE_ENDPOINT,
"compression": compression,
"exclude_events": exclude_events,
"encryption": encryption,
Expand Down Expand Up @@ -479,19 +482,14 @@ async def test_s3_export_workflow_with_minio_bucket(
],
workflow_runner=UnsandboxedWorkflowRunner(),
):
# We patch the S3 client to return our client that targets MinIO.
with mock.patch(
"posthog.temporal.batch_exports.s3_batch_export.aioboto3.Session.client",
side_effect=create_test_client,
):
await activity_environment.client.execute_workflow(
S3BatchExportWorkflow.run,
inputs,
id=workflow_id,
task_queue=settings.TEMPORAL_TASK_QUEUE,
retry_policy=RetryPolicy(maximum_attempts=1),
execution_timeout=dt.timedelta(minutes=10),
)
await activity_environment.client.execute_workflow(
S3BatchExportWorkflow.run,
inputs,
id=workflow_id,
task_queue=settings.TEMPORAL_TASK_QUEUE,
retry_policy=RetryPolicy(maximum_attempts=1),
execution_timeout=dt.timedelta(minutes=10),
)

runs = await afetch_batch_export_runs(batch_export_id=s3_batch_export.id)
assert len(runs) == 1
Expand Down Expand Up @@ -523,10 +521,10 @@ async def s3_client(bucket_name, s3_key_prefix):
using a disposable bucket to run these tests or sticking to other tests that use the
local development MinIO.
"""
async with aioboto3.Session().client("s3") as minio_client:
yield minio_client
async with aioboto3.Session().client("s3") as s3_client:
yield s3_client

await delete_all_from_s3(minio_client, bucket_name, key_prefix=s3_key_prefix)
await delete_all_from_s3(s3_client, bucket_name, key_prefix=s3_key_prefix)


@pytest.mark.skipif(
Expand Down Expand Up @@ -595,20 +593,20 @@ async def test_s3_export_workflow_with_s3_bucket(
)

workflow_id = str(uuid4())
destination_config = s3_batch_export.destination.config | {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh cool, I wasn't aware of this | syntax — that's useful.

"endpoint_url": None,
"aws_access_key_id": os.getenv("AWS_ACCESS_KEY_ID"),
"aws_secret_access_key": os.getenv("AWS_SECRET_ACCESS_KEY"),
}
inputs = S3BatchExportInputs(
team_id=ateam.pk,
batch_export_id=str(s3_batch_export.id),
data_interval_end=data_interval_end.isoformat(),
interval=interval,
batch_export_schema=batch_export_schema,
**s3_batch_export.destination.config,
**destination_config,
)

@contextlib.asynccontextmanager
async def create_minio_client(*args, **kwargs):
"""Mock function to return an already initialized S3 client."""
yield s3_client

async with await WorkflowEnvironment.start_time_skipping() as activity_environment:
async with Worker(
activity_environment.client,
Expand All @@ -621,18 +619,14 @@ async def create_minio_client(*args, **kwargs):
],
workflow_runner=UnsandboxedWorkflowRunner(),
):
with mock.patch(
"posthog.temporal.batch_exports.s3_batch_export.aioboto3.Session.client",
side_effect=create_minio_client,
):
await activity_environment.client.execute_workflow(
S3BatchExportWorkflow.run,
inputs,
id=workflow_id,
task_queue=settings.TEMPORAL_TASK_QUEUE,
retry_policy=RetryPolicy(maximum_attempts=1),
execution_timeout=dt.timedelta(seconds=10),
)
await activity_environment.client.execute_workflow(
S3BatchExportWorkflow.run,
inputs,
id=workflow_id,
task_queue=settings.TEMPORAL_TASK_QUEUE,
retry_policy=RetryPolicy(maximum_attempts=1),
execution_timeout=dt.timedelta(seconds=10),
)

runs = await afetch_batch_export_runs(batch_export_id=s3_batch_export.id)
assert len(runs) == 1
Expand All @@ -641,7 +635,7 @@ async def create_minio_client(*args, **kwargs):
assert run.status == "Completed"

await assert_clickhouse_records_in_s3(
s3_compatible_client=minio_client,
s3_compatible_client=s3_client,
clickhouse_client=clickhouse_client,
bucket_name=bucket_name,
key_prefix=s3_key_prefix,
Expand Down Expand Up @@ -708,18 +702,14 @@ async def test_s3_export_workflow_with_minio_bucket_and_a_lot_of_data(
],
workflow_runner=UnsandboxedWorkflowRunner(),
):
with mock.patch(
"posthog.temporal.batch_exports.s3_batch_export.aioboto3.Session.client",
side_effect=create_test_client,
):
await activity_environment.client.execute_workflow(
S3BatchExportWorkflow.run,
inputs,
id=workflow_id,
task_queue=settings.TEMPORAL_TASK_QUEUE,
retry_policy=RetryPolicy(maximum_attempts=1),
execution_timeout=dt.timedelta(seconds=360),
)
await activity_environment.client.execute_workflow(
S3BatchExportWorkflow.run,
inputs,
id=workflow_id,
task_queue=settings.TEMPORAL_TASK_QUEUE,
retry_policy=RetryPolicy(maximum_attempts=1),
execution_timeout=dt.timedelta(seconds=360),
)

runs = await afetch_batch_export_runs(batch_export_id=s3_batch_export.id)
assert len(runs) == 1
Expand Down Expand Up @@ -787,18 +777,14 @@ async def test_s3_export_workflow_defaults_to_timestamp_on_null_inserted_at(
],
workflow_runner=UnsandboxedWorkflowRunner(),
):
with mock.patch(
"posthog.temporal.batch_exports.s3_batch_export.aioboto3.Session.client",
side_effect=create_test_client,
):
await activity_environment.client.execute_workflow(
S3BatchExportWorkflow.run,
inputs,
id=workflow_id,
task_queue=settings.TEMPORAL_TASK_QUEUE,
retry_policy=RetryPolicy(maximum_attempts=1),
execution_timeout=dt.timedelta(seconds=10),
)
await activity_environment.client.execute_workflow(
S3BatchExportWorkflow.run,
inputs,
id=workflow_id,
task_queue=settings.TEMPORAL_TASK_QUEUE,
retry_policy=RetryPolicy(maximum_attempts=1),
execution_timeout=dt.timedelta(seconds=10),
)

runs = await afetch_batch_export_runs(batch_export_id=s3_batch_export.id)
assert len(runs) == 1
Expand Down Expand Up @@ -875,18 +861,14 @@ async def test_s3_export_workflow_with_minio_bucket_and_custom_key_prefix(
],
workflow_runner=UnsandboxedWorkflowRunner(),
):
with mock.patch(
"posthog.temporal.batch_exports.s3_batch_export.aioboto3.Session.client",
side_effect=create_test_client,
):
await activity_environment.client.execute_workflow(
S3BatchExportWorkflow.run,
inputs,
id=workflow_id,
task_queue=settings.TEMPORAL_TASK_QUEUE,
retry_policy=RetryPolicy(maximum_attempts=1),
execution_timeout=dt.timedelta(seconds=10),
)
await activity_environment.client.execute_workflow(
S3BatchExportWorkflow.run,
inputs,
id=workflow_id,
task_queue=settings.TEMPORAL_TASK_QUEUE,
retry_policy=RetryPolicy(maximum_attempts=1),
execution_timeout=dt.timedelta(seconds=10),
)

runs = await afetch_batch_export_runs(batch_export_id=s3_batch_export.id)
assert len(runs) == 1
Expand Down Expand Up @@ -1283,14 +1265,11 @@ def assert_heartbeat_details(*details):
data_interval_end=data_interval_end.isoformat(),
aws_access_key_id="object_storage_root_user",
aws_secret_access_key="object_storage_root_password",
endpoint_url=settings.OBJECT_STORAGE_ENDPOINT,
)

with override_settings(BATCH_EXPORT_S3_UPLOAD_CHUNK_SIZE_BYTES=5 * 1024**2):
with mock.patch(
"posthog.temporal.batch_exports.s3_batch_export.aioboto3.Session.client",
side_effect=create_test_client,
):
await activity_environment.run(insert_into_s3_activity, insert_inputs)
await activity_environment.run(insert_into_s3_activity, insert_inputs)

# This checks that the assert_heartbeat_details function was actually called.
# The '+ 1' is because we increment current_part_number one last time after we are done.
Expand Down
Loading