Skip to content

Commit

Permalink
feat(batch-exports): Add include_events option to all destinations (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
tomasfarias authored Oct 6, 2023
1 parent e3d7fa9 commit 88a8b7f
Show file tree
Hide file tree
Showing 11 changed files with 232 additions and 2 deletions.
48 changes: 48 additions & 0 deletions frontend/src/scenes/batch_exports/BatchExportEditForm.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,13 @@ export function BatchExportsEditForm(props: BatchExportsEditLogicProps): JSX.Ele
}
/>
</Field>
<Field name="include_events" label="Events to include" className="flex-1">
<LemonSelectMultiple
mode="multiple-custom"
options={[]}
placeholder={'Input one or more events to include in the export (optional)'}
/>
</Field>
</>
) : batchExportConfigForm.destination === 'Snowflake' ? (
<>
Expand Down Expand Up @@ -281,6 +288,23 @@ export function BatchExportsEditForm(props: BatchExportsEditLogicProps): JSX.Ele
<Field name="role" label="Role" showOptional>
<LemonInput placeholder="my-role" />
</Field>

<Field name="exclude_events" label="Events to exclude" className="flex-1">
<LemonSelectMultiple
mode="multiple-custom"
options={[]}
placeholder={
'Input one or more events to exclude from the export (optional)'
}
/>
</Field>
<Field name="include_events" label="Events to include" className="flex-1">
<LemonSelectMultiple
mode="multiple-custom"
options={[]}
placeholder={'Input one or more events to include in the export (optional)'}
/>
</Field>
</>
) : batchExportConfigForm.destination === 'Postgres' ? (
<>
Expand Down Expand Up @@ -327,6 +351,23 @@ export function BatchExportsEditForm(props: BatchExportsEditLogicProps): JSX.Ele
}
/>
</Field>

<Field name="exclude_events" label="Events to exclude" className="flex-1">
<LemonSelectMultiple
mode="multiple-custom"
options={[]}
placeholder={
'Input one or more events to exclude from the export (optional)'
}
/>
</Field>
<Field name="include_events" label="Events to include" className="flex-1">
<LemonSelectMultiple
mode="multiple-custom"
options={[]}
placeholder={'Input one or more events to include in the export (optional)'}
/>
</Field>
</>
) : batchExportConfigForm.destination === 'BigQuery' ? (
<>
Expand All @@ -351,6 +392,13 @@ export function BatchExportsEditForm(props: BatchExportsEditLogicProps): JSX.Ele
}
/>
</Field>
<Field name="include_events" label="Events to include" className="flex-1">
<LemonSelectMultiple
mode="multiple-custom"
options={[]}
placeholder={'Input one or more events to include in the export (optional)'}
/>
</Field>
</>
) : null}
</div>
Expand Down
1 change: 1 addition & 0 deletions frontend/src/scenes/batch_exports/BatchExports.stories.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ export default {
aws_secret_access_key: '',
compression: null,
exclude_events: [],
include_events: [],
encryption: null,
kms_key_id: null,
},
Expand Down
6 changes: 6 additions & 0 deletions frontend/src/scenes/batch_exports/batchExportEditLogic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ const formFields = (
schema: !config.schema ? 'This field is required' : '',
table_name: !config.table_name ? 'This field is required' : '',
has_self_signed_cert: false,
exclude_events: '',
include_events: '',
}
: destination === 'S3'
? {
Expand All @@ -73,6 +75,7 @@ const formFields = (
encryption: '',
kms_key_id: !config.kms_key_id && config.encryption == 'aws:kms' ? 'This field is required' : '',
exclude_events: '',
include_events: '',
}
: destination === 'BigQuery'
? {
Expand All @@ -90,6 +93,7 @@ const formFields = (
dataset_id: !config.dataset_id ? 'This field is required' : '',
table_id: !config.table_id ? 'This field is required' : '',
exclude_events: '',
include_events: '',
}
: destination === 'Snowflake'
? {
Expand All @@ -101,6 +105,8 @@ const formFields = (
schema: !config.schema ? 'This field is required' : '',
table_name: !config.table_name ? 'This field is required' : '',
role: '',
exclude_events: '',
include_events: '',
}
: {}),
}
Expand Down
6 changes: 6 additions & 0 deletions frontend/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3142,6 +3142,7 @@ export type BatchExportDestinationS3 = {
aws_access_key_id: string
aws_secret_access_key: string
exclude_events: string[]
include_events: string[]
compression: string | null
encryption: string | null
kms_key_id: string | null
Expand All @@ -3159,6 +3160,8 @@ export type BatchExportDestinationPostgres = {
schema: string
table_name: string
has_self_signed_cert: boolean
exclude_events: string[]
include_events: string[]
}
}

Expand All @@ -3173,6 +3176,8 @@ export type BatchExportDestinationSnowflake = {
schema: string
table_name: string
role: string | null
exclude_events: string[]
include_events: string[]
}
}

Expand All @@ -3187,6 +3192,7 @@ export type BatchExportDestinationBigQuery = {
dataset_id: string
table_id: string
exclude_events: string[]
include_events: string[]
}
}

Expand Down
6 changes: 6 additions & 0 deletions posthog/batch_exports/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class S3BatchExportInputs:
data_interval_end: str | None = None
compression: str | None = None
exclude_events: list[str] | None = None
include_events: list[str] | None = None
encryption: str | None = None
kms_key_id: str | None = None

Expand All @@ -77,6 +78,8 @@ class SnowflakeBatchExportInputs:
table_name: str = "events"
data_interval_end: str | None = None
role: str | None = None
exclude_events: list[str] | None = None
include_events: list[str] | None = None


@dataclass
Expand All @@ -95,6 +98,8 @@ class PostgresBatchExportInputs:
table_name: str = "events"
port: int = 5432
data_interval_end: str | None = None
exclude_events: list[str] | None = None
include_events: list[str] | None = None


@dataclass
Expand All @@ -113,6 +118,7 @@ class BigQueryBatchExportInputs:
table_id: str = "events"
data_interval_end: str | None = None
exclude_events: list[str] | None = None
include_events: list[str] | None = None


DESTINATION_WORKFLOWS = {
Expand Down
121 changes: 119 additions & 2 deletions posthog/temporal/tests/batch_exports/test_batch_exports.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,58 @@ async def test_get_rows_count_can_exclude_events(client):

# Exclude the latter half of events.
exclude_events = (f"test-{i}" for i in range(5000, 10000))
row_count = await get_rows_count(client, team_id, "2023-04-20 14:30:00", "2023-04-20 14:31:00", exclude_events)
row_count = await get_rows_count(
client, team_id, "2023-04-20 14:30:00", "2023-04-20 14:31:00", exclude_events=exclude_events
)
assert row_count == 5000


@pytest.mark.django_db
@pytest.mark.asyncio
async def test_get_rows_count_can_include_events(client):
"""Test the count of rows returned by get_rows_count can include events."""
team_id = randint(1, 1000000)

events: list[EventValues] = [
{
"uuid": str(uuid4()),
"event": f"test-{i}",
"_timestamp": "2023-04-20 14:30:00",
"timestamp": f"2023-04-20 14:30:00.{i:06d}",
"inserted_at": f"2023-04-20 14:30:00.{i:06d}",
"created_at": "2023-04-20 14:30:00.000000",
"distinct_id": str(uuid4()),
"person_id": str(uuid4()),
"person_properties": {"$browser": "Chrome", "$os": "Mac OS X"},
"team_id": team_id,
"properties": {
"$browser": "Chrome",
"$os": "Mac OS X",
"$ip": "127.0.0.1",
"$current_url": "http://localhost.com",
},
"elements_chain": "this that and the other",
"elements": json.dumps("this that and the other"),
"ip": "127.0.0.1",
"site_url": "http://localhost.com",
"set": None,
"set_once": None,
}
for i in range(10000)
]
# Duplicate everything
duplicate_events = events * 2

await insert_events(
ch_client=client,
events=duplicate_events,
)

# Include the latter half of events.
include_events = (f"test-{i}" for i in range(5000, 10000))
row_count = await get_rows_count(
client, team_id, "2023-04-20 14:30:00", "2023-04-20 14:31:00", include_events=include_events
)
assert row_count == 5000


Expand Down Expand Up @@ -415,7 +466,9 @@ async def test_get_results_iterator_can_exclude_events(client):

# Exclude the latter half of events.
exclude_events = (f"test-{i}" for i in range(5000, 10000))
iter_ = get_results_iterator(client, team_id, "2023-04-20 14:30:00", "2023-04-20 14:31:00", exclude_events)
iter_ = get_results_iterator(
client, team_id, "2023-04-20 14:30:00", "2023-04-20 14:31:00", exclude_events=exclude_events
)
rows = [row for row in iter_]

all_expected = sorted(events[:5000], key=operator.itemgetter("event"))
Expand All @@ -435,6 +488,70 @@ async def test_get_results_iterator_can_exclude_events(client):
assert value == expected_value, f"{key} value in {result} didn't match value in {expected}"


@pytest.mark.django_db
@pytest.mark.asyncio
async def test_get_results_iterator_can_include_events(client):
"""Test the rows returned by get_results_iterator can include events."""
team_id = randint(1, 1000000)

events: list[EventValues] = [
{
"uuid": str(uuid4()),
"event": f"test-{i}",
"_timestamp": "2023-04-20 14:30:00",
"timestamp": f"2023-04-20 14:30:00.{i:06d}",
"inserted_at": f"2023-04-20 14:30:00.{i:06d}",
"created_at": "2023-04-20 14:30:00.000000",
"distinct_id": str(uuid4()),
"person_id": str(uuid4()),
"person_properties": {"$browser": "Chrome", "$os": "Mac OS X"},
"team_id": team_id,
"properties": {
"$browser": "Chrome",
"$os": "Mac OS X",
"$ip": "127.0.0.1",
"$current_url": "http://localhost.com",
},
"elements_chain": "this that and the other",
"elements": json.dumps("this that and the other"),
"ip": "127.0.0.1",
"site_url": "",
"set": None,
"set_once": None,
}
for i in range(10000)
]
duplicate_events = events * 2

await insert_events(
ch_client=client,
events=duplicate_events,
)

# Include the latter half of events.
include_events = (f"test-{i}" for i in range(5000, 10000))
iter_ = get_results_iterator(
client, team_id, "2023-04-20 14:30:00", "2023-04-20 14:31:00", include_events=include_events
)
rows = [row for row in iter_]

all_expected = sorted(events[5000:], key=operator.itemgetter("event"))
all_result = sorted(rows, key=operator.itemgetter("event"))

assert len(all_expected) == len(all_result)
assert len([row["uuid"] for row in all_result]) == len(set(row["uuid"] for row in all_result))

for expected, result in zip(all_expected, all_result):
for key, value in result.items():
if key in ("timestamp", "inserted_at", "created_at"):
expected_value = to_isoformat(expected[key])
else:
expected_value = expected[key]

# Some keys will be missing from result, so let's only check the ones we have.
assert value == expected_value, f"{key} value in {result} didn't match value in {expected}"


@pytest.mark.parametrize(
"interval,data_interval_end,expected",
[
Expand Down
Loading

0 comments on commit 88a8b7f

Please sign in to comment.