Skip to content

Commit

Permalink
Add created_at to persons batch export
Browse files Browse the repository at this point in the history
  • Loading branch information
rossgray committed Nov 28, 2024
1 parent 15d5f4c commit 6d1d99e
Show file tree
Hide file tree
Showing 4 changed files with 253 additions and 56 deletions.
3 changes: 3 additions & 0 deletions posthog/batch_exports/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
p.properties AS properties,
pd.version AS person_distinct_id_version,
p.version AS person_version,
p.created_at AS created_at,
multiIf(
(
pd._timestamp >= {{interval_start:DateTime64}}
Expand Down Expand Up @@ -140,6 +141,7 @@
p.properties AS properties,
pd.version AS person_distinct_id_version,
p.version AS person_version,
p.created_at AS created_at,
multiIf(
pd._timestamp < {{interval_end:DateTime64}}
AND NOT p._timestamp < {{interval_end:DateTime64}},
Expand Down Expand Up @@ -170,6 +172,7 @@
id,
max(version) AS version,
argMax(properties, person.version) AS properties,
argMax(created_at, person.version) AS created_at,
argMax(_timestamp, person.version) AS _timestamp
FROM
person
Expand Down
83 changes: 75 additions & 8 deletions posthog/temporal/batch_exports/batch_exports.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,32 @@
optimize_aggregation_in_order=1
"""

# This is an updated version of the view that we will use going forward
# We will migrate each batch export destination over one at a time to migitate
# risk, and once this is done we can clean this up.
SELECT_FROM_PERSONS_VIEW_NEW = """
SELECT
persons.team_id AS team_id,
persons.distinct_id AS distinct_id,
persons.person_id AS person_id,
persons.properties AS properties,
persons.person_distinct_id_version AS person_distinct_id_version,
persons.person_version AS person_version,
persons.created_at AS created_at,
persons._inserted_at AS _inserted_at
FROM
persons_batch_export(
team_id={team_id},
interval_start={interval_start},
interval_end={interval_end}
) AS persons
FORMAT ArrowStream
SETTINGS
max_bytes_before_external_group_by=50000000000,
max_bytes_before_external_sort=50000000000,
optimize_aggregation_in_order=1
"""

SELECT_FROM_PERSONS_VIEW_BACKFILL = """
SELECT
persons.team_id AS team_id,
Expand All @@ -89,6 +115,31 @@
optimize_aggregation_in_order=1
"""

# This is an updated version of the view that we will use going forward
# We will migrate each batch export destination over one at a time to migitate
# risk, and once this is done we can clean this up.
SELECT_FROM_PERSONS_VIEW_BACKFILL_NEW = """
SELECT
persons.team_id AS team_id,
persons.distinct_id AS distinct_id,
persons.person_id AS person_id,
persons.properties AS properties,
persons.person_distinct_id_version AS person_distinct_id_version,
persons.person_version AS person_version,
persons.created_at AS created_at,
persons._inserted_at AS _inserted_at
FROM
persons_batch_export_backfill(
team_id={team_id},
interval_end={interval_end}
) AS persons
FORMAT ArrowStream
SETTINGS
max_bytes_before_external_group_by=50000000000,
max_bytes_before_external_sort=50000000000,
optimize_aggregation_in_order=1
"""

SELECT_FROM_EVENTS_VIEW = Template(
"""
SELECT
Expand Down Expand Up @@ -175,6 +226,8 @@ async def iter_model_records(
interval_start: str | None,
interval_end: str,
destination_default_fields: list[BatchExportField] | None = None,
# TODO - remove this once all batch exports are using the latest schema
use_latest_schema: bool = False,
**parameters,
) -> AsyncRecordsGenerator:
if not is_backfill and interval_start is None:
Expand All @@ -195,6 +248,7 @@ async def iter_model_records(
extra_query_parameters=model.schema["values"] if model.schema is not None else None,
interval_start=interval_start,
interval_end=interval_end,
use_latest_schema=use_latest_schema,
**parameters,
):
yield record
Expand All @@ -221,13 +275,21 @@ async def iter_records_from_model_view(
interval_start: str | None,
interval_end: str,
fields: list[BatchExportField],
# TODO - remove this once all batch exports are using the latest schema
use_latest_schema: bool = False,
**parameters,
) -> AsyncRecordsGenerator:
if model_name == "persons":
if is_backfill and interval_start is None:
view = SELECT_FROM_PERSONS_VIEW_BACKFILL
if use_latest_schema:
view = SELECT_FROM_PERSONS_VIEW_BACKFILL_NEW
else:
view = SELECT_FROM_PERSONS_VIEW_BACKFILL
else:
view = SELECT_FROM_PERSONS_VIEW
if use_latest_schema:
view = SELECT_FROM_PERSONS_VIEW_NEW
else:
view = SELECT_FROM_PERSONS_VIEW
elif str(team_id) not in settings.ASYNC_ARROW_STREAMING_TEAM_IDS:
# TODO: Let this model be exported by `astream_query_as_arrow`.
# Just to reduce risk, I don't want to change the function that runs 100% of the exports
Expand Down Expand Up @@ -366,6 +428,8 @@ def start_produce_batch_export_record_batches(
done_ranges: list[tuple[dt.datetime, dt.datetime]],
fields: list[BatchExportField] | None = None,
destination_default_fields: list[BatchExportField] | None = None,
# TODO - remove this once all batch exports are using the latest schema
use_latest_schema: bool = False,
**parameters,
):
"""Start producing batch export record batches from a model query.
Expand All @@ -388,9 +452,15 @@ def start_produce_batch_export_record_batches(

if model_name == "persons":
if is_backfill and full_range[0] is None:
view = SELECT_FROM_PERSONS_VIEW_BACKFILL
if use_latest_schema:
view = SELECT_FROM_PERSONS_VIEW_BACKFILL_NEW
else:
view = SELECT_FROM_PERSONS_VIEW_BACKFILL
else:
view = SELECT_FROM_PERSONS_VIEW
if use_latest_schema:
view = SELECT_FROM_PERSONS_VIEW_NEW
else:
view = SELECT_FROM_PERSONS_VIEW

else:
if parameters.get("exclude_events", None):
Expand Down Expand Up @@ -712,13 +782,10 @@ class StartBatchExportRunInputs:

@activity.defn
async def start_batch_export_run(inputs: StartBatchExportRunInputs) -> BatchExportRunId:
"""Activity that creates an BatchExportRun and returns the count of records to export.
"""Activity that creates an BatchExportRun and returns the run id.
Intended to be used in all export workflows, usually at the start, to create a model
instance to represent them in our database.
Upon seeing a count of 0 records to export, batch export workflows should finish early
(i.e. without running the insert activity), as there will be nothing to export.
"""
logger = await bind_temporal_worker_logger(team_id=inputs.team_id)
await logger.ainfo(
Expand Down
58 changes: 30 additions & 28 deletions posthog/temporal/batch_exports/bigquery_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@
start_batch_export_run,
start_produce_batch_export_record_batches,
)
from posthog.temporal.batch_exports.heartbeat import (
BatchExportRangeHeartbeatDetails,
DateRange,
should_resume_from_activity_heartbeat,
)
from posthog.temporal.batch_exports.metrics import (
get_bytes_exported_metric,
get_rows_exported_metric,
Expand All @@ -49,12 +54,6 @@
cast_record_batch_json_columns,
set_status_to_running_task,
)
from posthog.temporal.batch_exports.heartbeat import (
BatchExportRangeHeartbeatDetails,
DateRange,
should_resume_from_activity_heartbeat,
)

from posthog.temporal.common.clickhouse import get_client
from posthog.temporal.common.heartbeat import Heartbeater
from posthog.temporal.common.logger import configure_temporal_worker_logger
Expand Down Expand Up @@ -174,48 +173,56 @@ async def adelete_table(
project_id: str,
dataset_id: str,
table_id: str,
table_schema: list[bigquery.SchemaField],
not_found_ok: bool = True,
) -> None:
"""Delete a table in BigQuery."""
fully_qualified_name = f"{project_id}.{dataset_id}.{table_id}"
table = bigquery.Table(fully_qualified_name, schema=table_schema)
table = bigquery.Table(fully_qualified_name)

await asyncio.to_thread(self.delete_table, table, not_found_ok=not_found_ok)

return None

async def aget_table(
self,
project_id: str,
dataset_id: str,
table_id: str,
) -> bigquery.Table:
"""Get a table in BigQuery."""
fully_qualified_name = f"{project_id}.{dataset_id}.{table_id}"
return await asyncio.to_thread(self.get_table, fully_qualified_name)

@contextlib.asynccontextmanager
async def managed_table(
self,
project_id: str,
dataset_id: str,
table_id: str,
table_schema: list[bigquery.SchemaField],
table_schema: list[bigquery.SchemaField] | None = None,
exists_ok: bool = True,
not_found_ok: bool = True,
delete: bool = True,
create: bool = True,
) -> collections.abc.AsyncGenerator[bigquery.Table, None]:
"""Manage a table in BigQuery by ensure it exists while in context."""
if create is True:
assert table_schema is not None, "Table schema is required when creating a table"
table = await self.acreate_table(project_id, dataset_id, table_id, table_schema, exists_ok)
else:
fully_qualified_name = f"{project_id}.{dataset_id}.{table_id}"
table = bigquery.Table(fully_qualified_name, schema=table_schema)
table = await self.aget_table(project_id, dataset_id, table_id)

try:
yield table
finally:
if delete is True:
await self.adelete_table(project_id, dataset_id, table_id, table_schema, not_found_ok)
await self.adelete_table(project_id, dataset_id, table_id, not_found_ok)

async def amerge_person_tables(
self,
final_table: bigquery.Table,
stage_table: bigquery.Table,
merge_key: collections.abc.Iterable[bigquery.SchemaField],
update_fields: collections.abc.Iterable[bigquery.SchemaField] | None = None,
person_version_key: str = "person_version",
person_distinct_id_version_key: str = "person_distinct_id_version",
):
Expand All @@ -233,12 +240,7 @@ async def amerge_person_tables(
values = ""
field_names = ""

if not update_fields:
update_clause_fields = final_table.schema
else:
update_clause_fields = update_fields

for n, field in enumerate(update_clause_fields):
for n, field in enumerate(final_table.schema):
if n > 0:
update_clause += ", "
values += ", "
Expand Down Expand Up @@ -409,6 +411,7 @@ async def insert_into_bigquery_activity(inputs: BigQueryInsertInputs) -> Records
include_events=inputs.include_events,
fields=fields,
destination_default_fields=bigquery_default_fields(),
use_latest_schema=True,
extra_query_parameters=extra_query_parameters,
)

Expand Down Expand Up @@ -475,17 +478,17 @@ async def insert_into_bigquery_activity(inputs: BigQueryInsertInputs) -> Records
with bigquery_client(inputs) as bq_client:
async with (
bq_client.managed_table(
inputs.project_id,
inputs.dataset_id,
inputs.table_id,
schema,
project_id=inputs.project_id,
dataset_id=inputs.dataset_id,
table_id=inputs.table_id,
table_schema=schema,
delete=False,
) as bigquery_table,
bq_client.managed_table(
inputs.project_id,
inputs.dataset_id,
stage_table_name,
schema,
project_id=inputs.project_id,
dataset_id=inputs.dataset_id,
table_id=stage_table_name,
table_schema=schema,
create=requires_merge,
delete=requires_merge,
) as bigquery_stage_table,
Expand Down Expand Up @@ -556,7 +559,6 @@ async def flush_to_bigquery(
final_table=bigquery_table,
stage_table=bigquery_stage_table,
merge_key=merge_key,
update_fields=schema,
)

return records_total
Expand Down
Loading

0 comments on commit 6d1d99e

Please sign in to comment.