Skip to content

Commit

Permalink
fix(batch-exports): Only read person properties for S3 (PostHog#18095)
Browse files Browse the repository at this point in the history
Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com>
  • Loading branch information
2 people authored and Justicea83 committed Oct 25, 2023
1 parent 4a5cf00 commit 84fdf0e
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 9 deletions.
54 changes: 46 additions & 8 deletions posthog/temporal/tests/batch_exports/test_batch_exports.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,8 @@ async def test_get_rows_count_can_include_events(client):

@pytest.mark.django_db
@pytest.mark.asyncio
async def test_get_results_iterator(client):
@pytest.mark.parametrize("include_person_properties", (False, True))
async def test_get_results_iterator(client, include_person_properties):
"""Test the rows returned by get_results_iterator."""
team_id = randint(1, 1000000)

Expand Down Expand Up @@ -345,7 +346,13 @@ async def test_get_results_iterator(client):
events=events,
)

iter_ = get_results_iterator(client, team_id, "2023-04-20 14:30:00", "2023-04-20 14:31:00")
iter_ = get_results_iterator(
client,
team_id,
"2023-04-20 14:30:00",
"2023-04-20 14:31:00",
include_person_properties=include_person_properties,
)
rows = [row for row in iter_]

all_expected = sorted(events, key=operator.itemgetter("event"))
Expand All @@ -355,6 +362,9 @@ async def test_get_results_iterator(client):

for expected, result in zip(all_expected, all_result):
for key, value in result.items():
if key == "person_properties" and not include_person_properties:
continue

if key in ("timestamp", "inserted_at", "created_at"):
expected_value = to_isoformat(expected[key])
else:
Expand All @@ -366,7 +376,8 @@ async def test_get_results_iterator(client):

@pytest.mark.django_db
@pytest.mark.asyncio
async def test_get_results_iterator_handles_duplicates(client):
@pytest.mark.parametrize("include_person_properties", (False, True))
async def test_get_results_iterator_handles_duplicates(client, include_person_properties):
"""Test the rows returned by get_results_iterator are de-duplicated."""
team_id = randint(1, 1000000)

Expand Down Expand Up @@ -404,7 +415,13 @@ async def test_get_results_iterator_handles_duplicates(client):
events=duplicate_events,
)

iter_ = get_results_iterator(client, team_id, "2023-04-20 14:30:00", "2023-04-20 14:31:00")
iter_ = get_results_iterator(
client,
team_id,
"2023-04-20 14:30:00",
"2023-04-20 14:31:00",
include_person_properties=include_person_properties,
)
rows = [row for row in iter_]

all_expected = sorted(events, key=operator.itemgetter("event"))
Expand All @@ -415,6 +432,9 @@ async def test_get_results_iterator_handles_duplicates(client):

for expected, result in zip(all_expected, all_result):
for key, value in result.items():
if key == "person_properties" and not include_person_properties:
continue

if key in ("timestamp", "inserted_at", "created_at"):
expected_value = to_isoformat(expected[key])
else:
Expand All @@ -426,7 +446,8 @@ async def test_get_results_iterator_handles_duplicates(client):

@pytest.mark.django_db
@pytest.mark.asyncio
async def test_get_results_iterator_can_exclude_events(client):
@pytest.mark.parametrize("include_person_properties", (False, True))
async def test_get_results_iterator_can_exclude_events(client, include_person_properties):
"""Test the rows returned by get_results_iterator can exclude events."""
team_id = randint(1, 1000000)

Expand Down Expand Up @@ -467,7 +488,12 @@ async def test_get_results_iterator_can_exclude_events(client):
# Exclude the latter half of events.
exclude_events = (f"test-{i}" for i in range(5000, 10000))
iter_ = get_results_iterator(
client, team_id, "2023-04-20 14:30:00", "2023-04-20 14:31:00", exclude_events=exclude_events
client,
team_id,
"2023-04-20 14:30:00",
"2023-04-20 14:31:00",
exclude_events=exclude_events,
include_person_properties=include_person_properties,
)
rows = [row for row in iter_]

Expand All @@ -479,6 +505,9 @@ async def test_get_results_iterator_can_exclude_events(client):

for expected, result in zip(all_expected, all_result):
for key, value in result.items():
if key == "person_properties" and not include_person_properties:
continue

if key in ("timestamp", "inserted_at", "created_at"):
expected_value = to_isoformat(expected[key])
else:
Expand All @@ -490,7 +519,8 @@ async def test_get_results_iterator_can_exclude_events(client):

@pytest.mark.django_db
@pytest.mark.asyncio
async def test_get_results_iterator_can_include_events(client):
@pytest.mark.parametrize("include_person_properties", (False, True))
async def test_get_results_iterator_can_include_events(client, include_person_properties):
"""Test the rows returned by get_results_iterator can include events."""
team_id = randint(1, 1000000)

Expand Down Expand Up @@ -531,7 +561,12 @@ async def test_get_results_iterator_can_include_events(client):
# Include the latter half of events.
include_events = (f"test-{i}" for i in range(5000, 10000))
iter_ = get_results_iterator(
client, team_id, "2023-04-20 14:30:00", "2023-04-20 14:31:00", include_events=include_events
client,
team_id,
"2023-04-20 14:30:00",
"2023-04-20 14:31:00",
include_events=include_events,
include_person_properties=include_person_properties,
)
rows = [row for row in iter_]

Expand All @@ -543,6 +578,9 @@ async def test_get_results_iterator_can_include_events(client):

for expected, result in zip(all_expected, all_result):
for key, value in result.items():
if key == "person_properties" and not include_person_properties:
continue

if key in ("timestamp", "inserted_at", "created_at"):
expected_value = to_isoformat(expected[key])
else:
Expand Down
19 changes: 18 additions & 1 deletion posthog/temporal/workflows/batch_exports.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,22 @@ async def get_rows_count(
-- Point in time identity fields
toString(distinct_id) as distinct_id,
toString(person_id) as person_id,
-- Autocapture fields
elements_chain
"""

S3_FIELDS = """
DISTINCT ON (event, cityHash64(distinct_id), cityHash64(uuid))
toString(uuid) as uuid,
team_id,
timestamp,
inserted_at,
created_at,
event,
properties,
-- Point in time identity fields
toString(distinct_id) as distinct_id,
toString(person_id) as person_id,
person_properties,
-- Autocapture fields
elements_chain
Expand All @@ -122,6 +138,7 @@ def get_results_iterator(
interval_end: str,
exclude_events: collections.abc.Iterable[str] | None = None,
include_events: collections.abc.Iterable[str] | None = None,
include_person_properties: bool = False,
) -> typing.Generator[dict[str, typing.Any], None, None]:
data_interval_start_ch = dt.datetime.fromisoformat(interval_start).strftime("%Y-%m-%d %H:%M:%S")
data_interval_end_ch = dt.datetime.fromisoformat(interval_end).strftime("%Y-%m-%d %H:%M:%S")
Expand All @@ -141,7 +158,7 @@ def get_results_iterator(
events_to_include_tuple = ()

query = SELECT_QUERY_TEMPLATE.substitute(
fields=FIELDS,
fields=S3_FIELDS if include_person_properties else FIELDS,
order_by="ORDER BY inserted_at",
format="FORMAT ArrowStream",
exclude_events=exclude_events_statement,
Expand Down
1 change: 1 addition & 0 deletions posthog/temporal/workflows/s3_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,7 @@ async def insert_into_s3_activity(inputs: S3InsertInputs):
interval_end=inputs.data_interval_end,
exclude_events=inputs.exclude_events,
include_events=inputs.include_events,
include_person_properties=True,
)

result = None
Expand Down

0 comments on commit 84fdf0e

Please sign in to comment.