Skip to content

Commit

Permalink
feat(batch-exports): Add Redshift to BatchExport destinations (#18059)
Browse files Browse the repository at this point in the history
* feat(batch-exports): Add backfill model and service support

* feat(batch-export-backfills): Account for potential restarts while backfilling

* test(batch-exports-backfills): Add Workflow test

* chore(batch-exports-backfill): Bump migration

* feat(batch-exports): Abstract insert activity execution

* feat(batch-exports): Add RedshiftBatchExportWorkflow

* feat(batch-exports): Add Redshift to BatchExport destinations

* feat(batch-exports): Support properties_data_type Redshift plugin parameter

* refactor(batch-exports): Insert rows instead of using COPY

* test: Add unit test for insert_into_redshift_activity

* fix: Address typing issue

* test: Add workflow test

* feat: Frontend support for Redshift batch exports

* docs: Add tests README.md

* fix: Use correct fixture name in test

* fix: Set default properties data type

* fix(batch-exports): Update test

* fix: Add activity to list of supported activities
  • Loading branch information
tomasfarias authored Nov 1, 2023
1 parent 0b90a98 commit 2adc2f9
Show file tree
Hide file tree
Showing 15 changed files with 1,091 additions and 34 deletions.
60 changes: 59 additions & 1 deletion frontend/src/scenes/batch_exports/BatchExportEditForm.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,8 @@ export function BatchExportsEditForm(props: BatchExportsEditLogicProps): JSX.Ele
<LemonSelect
options={[
{ value: 'BigQuery', label: 'BigQuery' },
{ value: 'Postgres', label: 'Postgres' },
{ value: 'Postgres', label: 'PostgreSQL' },
{ value: 'Redshift', label: 'Redshift' },
{ value: 'S3', label: 'S3' },
{ value: 'Snowflake', label: 'Snowflake' },
]}
Expand Down Expand Up @@ -367,6 +368,63 @@ export function BatchExportsEditForm(props: BatchExportsEditLogicProps): JSX.Ele
/>
</Field>

<Field name="exclude_events" label="Events to exclude" className="flex-1">
<LemonSelectMultiple
mode="multiple-custom"
options={[]}
placeholder={
'Input one or more events to exclude from the export (optional)'
}
/>
</Field>
<Field name="include_events" label="Events to include" className="flex-1">
<LemonSelectMultiple
mode="multiple-custom"
options={[]}
placeholder={'Input one or more events to include in the export (optional)'}
/>
</Field>
</>
) : batchExportConfigForm.destination === 'Redshift' ? (
<>
<Field name="user" label="User">
<LemonInput placeholder="my-user" />
</Field>

<Field name="password" label="Password">
<LemonInput placeholder="my-password" type="password" />
</Field>

<Field name="host" label="Host">
<LemonInput placeholder="my-host" />
</Field>

<Field name="port" label="Port">
<LemonInput placeholder="5439" type="number" min="0" max="65535" />
</Field>

<Field name="database" label="Database">
<LemonInput placeholder="my-database" />
</Field>

<Field name="schema" label="Schema">
<LemonInput placeholder="public" />
</Field>

<Field name="table_name" label="Table name">
<LemonInput placeholder="events" />
</Field>

<Field name="properties_data_type" label="Properties data type">
<LemonSelect
options={[
{ value: 'varchar', label: 'VARCHAR(65535)' },
{ value: 'super', label: 'SUPER' },
]}
value={'varchar'}
/>
</Field>

<Field name="exclude_events" label="Events to exclude" className="flex-1">
<LemonSelectMultiple
mode="multiple-custom"
Expand Down
22 changes: 21 additions & 1 deletion frontend/src/scenes/batch_exports/batchExportEditLogic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
BatchExportDestination,
BatchExportDestinationBigQuery,
BatchExportDestinationPostgres,
BatchExportDestinationRedshift,
BatchExportDestinationS3,
BatchExportDestinationSnowflake,
Breadcrumb,
Expand All @@ -28,10 +29,11 @@ export type BatchExportConfigurationForm = Omit<
'id' | 'destination' | 'start_at' | 'end_at'
> &
Partial<BatchExportDestinationPostgres['config']> &
Partial<BatchExportDestinationRedshift['config']> &
Partial<BatchExportDestinationBigQuery['config']> &
Partial<BatchExportDestinationS3['config']> &
Partial<BatchExportDestinationSnowflake['config']> & {
destination: 'S3' | 'Snowflake' | 'Postgres' | 'BigQuery'
destination: 'S3' | 'Snowflake' | 'Postgres' | 'BigQuery' | 'Redshift'
start_at: Dayjs | null
end_at: Dayjs | null
json_config_file?: File[] | null
Expand Down Expand Up @@ -64,6 +66,19 @@ const formFields = (
exclude_events: '',
include_events: '',
}
: destination === 'Redshift'
? {
user: isNew ? (!config.user ? 'This field is required' : '') : '',
password: isNew ? (!config.password ? 'This field is required' : '') : '',
host: !config.host ? 'This field is required' : '',
port: !config.port ? 'This field is required' : '',
database: !config.database ? 'This field is required' : '',
schema: !config.schema ? 'This field is required' : '',
table_name: !config.table_name ? 'This field is required' : '',
properties_data_type: '',
exclude_events: '',
include_events: '',
}
: destination === 'S3'
? {
bucket_name: !config.bucket_name ? 'This field is required' : '',
Expand Down Expand Up @@ -143,6 +158,11 @@ export const batchExportsEditLogic = kea<batchExportsEditLogicType>([
type: 'S3',
config: config,
} as unknown as BatchExportDestinationS3)
: destination === 'Redshift'
? ({
type: 'Redshift',
config: config,
} as unknown as BatchExportDestinationRedshift)
: destination === 'BigQuery'
? ({
type: 'BigQuery',
Expand Down
17 changes: 17 additions & 0 deletions frontend/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3204,11 +3204,28 @@ export type BatchExportDestinationBigQuery = {
}
}

export type BatchExportDestinationRedshift = {
type: 'Redshift'
config: {
user: string
password: string
host: string
port: number
database: string
schema: string
table_name: string
properties_data_type: boolean
exclude_events: string[]
include_events: string[]
}
}

export type BatchExportDestination =
| BatchExportDestinationS3
| BatchExportDestinationSnowflake
| BatchExportDestinationPostgres
| BatchExportDestinationBigQuery
| BatchExportDestinationRedshift

export type BatchExportConfiguration = {
// User provided data for the export. This is the data that the user
Expand Down
2 changes: 1 addition & 1 deletion latest_migrations.manifest
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ contenttypes: 0002_remove_content_type_name
ee: 0015_add_verified_properties
otp_static: 0002_throttling
otp_totp: 0002_auto_20190420_0723
posthog: 0356_add_replay_cost_control
posthog: 0357_add_redshift_batch_export_destination
sessions: 0001_initial
social_django: 0010_uid_db_index
two_factor: 0007_auto_20201201_1019
2 changes: 2 additions & 0 deletions posthog/batch_exports/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,15 @@ class Destination(models.TextChoices):
S3 = "S3"
SNOWFLAKE = "Snowflake"
POSTGRES = "Postgres"
REDSHIFT = "Redshift"
BIGQUERY = "BigQuery"
NOOP = "NoOp"

secret_fields = {
"S3": {"aws_access_key_id", "aws_secret_access_key"},
"Snowflake": set("password"),
"Postgres": set("password"),
"Redshift": set("password"),
"BigQuery": {"private_key", "private_key_id", "client_email", "token_uri"},
"NoOp": set(),
}
Expand Down
8 changes: 8 additions & 0 deletions posthog/batch_exports/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,13 @@ class PostgresBatchExportInputs:
include_events: list[str] | None = None


@dataclass
class RedshiftBatchExportInputs(PostgresBatchExportInputs):
"""Inputs for Redshift export workflow."""

properties_data_type: str = "varchar"


@dataclass
class BigQueryBatchExportInputs:
"""Inputs for BigQuery export workflow."""
Expand Down Expand Up @@ -135,6 +142,7 @@ class NoOpInputs:
"S3": ("s3-export", S3BatchExportInputs),
"Snowflake": ("snowflake-export", SnowflakeBatchExportInputs),
"Postgres": ("postgres-export", PostgresBatchExportInputs),
"Redshift": ("redshift-export", RedshiftBatchExportInputs),
"BigQuery": ("bigquery-export", BigQueryBatchExportInputs),
"NoOp": ("no-op", NoOpInputs),
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,7 @@ def test_create_batch_export_from_app_with_backfill(interval, plugin_config):
batch_export_id = str(batch_export_data["id"])
workflows = wait_for_workflow_executions(temporal, query=f'TemporalScheduledById="{batch_export_id}"')

assert len(workflows) == 1
# In the event the test takes too long, we may spawn more than one run
assert len(workflows) >= 1
workflow_execution = workflows[0]
assert workflow_execution.workflow_type == f"{export_type.lower()}-export"
28 changes: 28 additions & 0 deletions posthog/migrations/0357_add_redshift_batch_export_destination.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Generated by Django 3.2.19 on 2023-10-18 11:40

from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
("posthog", "0356_add_replay_cost_control"),
]

operations = [
migrations.AlterField(
model_name="batchexportdestination",
name="type",
field=models.CharField(
choices=[
("S3", "S3"),
("Snowflake", "Snowflake"),
("Postgres", "Postgres"),
("Redshift", "Redshift"),
("BigQuery", "Bigquery"),
("NoOp", "Noop"),
],
help_text="A choice of supported BatchExportDestination types.",
max_length=64,
),
),
]
38 changes: 38 additions & 0 deletions posthog/temporal/tests/batch_exports/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Testing batch exports

This module contains unit tests covering activities, workflows, and helper functions that power batch exports. Tests are divided by destination, and some destinations require setup steps to enable tests.

## Testing BigQuery batch exports

BigQuery batch exports can be tested against a real BigQuery instance, but doing so requires additional setup. For this reason, these tests are skipped unless an environment variable pointing to a BigQuery credentials file (`GOOGLE_APPLICATION_CREDENTIALS=/path/to/my/project-credentials.json`) is set.

> :warning: Since BigQuery batch export tests require additional setup, we skip them by default and will not be ran by automated CI pipelines. Please ensure these tests pass when making changes that affect BigQuery batch exports.
To enable testing for BigQuery batch exports, we require:
1. A BigQuery project and dataset
2. A BigQuery ServiceAccount with access to said project and dataset. See the [BigQuery batch export documentation](https://posthog.com/docs/cdp/batch-exports/bigquery#setting-up-bigquery-access) on detailed steps to setup a ServiceAccount.

Then, a [key](https://cloud.google.com/iam/docs/keys-create-delete#creating) can be created for the BigQuery ServiceAccount and saved to a local file. For PostHog employees, this file should already be available under the PostHog password manager.

Tests for BigQuery batch exports can be then run from the root of the `posthog` repo:

```bash
DEBUG=1 GOOGLE_APPLICATION_CREDENTIALS=/path/to/my/project-credentials.json pytest posthog/temporal/tests/batch_exports/test_bigquery_batch_export_workflow.py
```

## Testing Redshift batch exports

Redshift batch exports can be tested against a real Redshift (or Redshift Serverless) instance, with additional setup steps required. Due to this requirement, these tests are skipped unless Redshift credentials are specified in the environment.

> :warning: Since Redshift batch export tests require additional setup, we skip them by default and will not be ran by automated CI pipelines. Please ensure these tests pass when making changes that affect Redshift batch exports.
To enable testing for Redshift batch exports, we require:
1. A Redshift (or Redshift Serverless) instance.
2. Network access to this instance (via a VPN connection or jumphost, making a Redshift instance publicly available has serious security implications).
3. User credentials (user requires `CREATEDB` permissions for testing but **not** superuser access).

For PostHog employees, check the password manager as a set of development credentials should already be available. With these credentials, and after connecting to the appropriate VPN, we can run the tests from the root of the `posthog` repo with:

```bash
DEBUG=1 REDSHIFT_HOST=workgroup.111222333.region.redshift-serverless.amazonaws.com REDSHIFT_USER=test_user REDSHIFT_PASSWORD=test_password pytest posthog/temporal/tests/batch_exports/test_redshift_batch_export_workflow.py
```
Loading

0 comments on commit 2adc2f9

Please sign in to comment.