Skip to content

Commit

Permalink
feat(data-warehouse): Add created_at to BigQuery persons batch expo…
Browse files Browse the repository at this point in the history
…rt (#26486)
  • Loading branch information
rossgray authored Nov 29, 2024
1 parent fc97b2e commit ce66669
Show file tree
Hide file tree
Showing 6 changed files with 271 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,12 @@ const personsTable: DatabaseSchemaBatchExportTable = {
type: 'integer',
schema_valid: true,
},
created_at: {
name: 'created_at',
hogql_value: 'created_at',
type: 'datetime',
schema_valid: true,
},
},
}

Expand Down
3 changes: 3 additions & 0 deletions posthog/batch_exports/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
p.properties AS properties,
pd.version AS person_distinct_id_version,
p.version AS person_version,
p.created_at AS created_at,
multiIf(
(
pd._timestamp >= {{interval_start:DateTime64}}
Expand Down Expand Up @@ -140,6 +141,7 @@
p.properties AS properties,
pd.version AS person_distinct_id_version,
p.version AS person_version,
p.created_at AS created_at,
multiIf(
pd._timestamp < {{interval_end:DateTime64}}
AND NOT p._timestamp < {{interval_end:DateTime64}},
Expand Down Expand Up @@ -170,6 +172,7 @@
id,
max(version) AS version,
argMax(properties, person.version) AS properties,
argMax(created_at, person.version) AS created_at,
argMax(_timestamp, person.version) AS _timestamp
FROM
person
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from posthog.batch_exports.sql import (
CREATE_PERSONS_BATCH_EXPORT_VIEW,
CREATE_PERSONS_BATCH_EXPORT_VIEW_BACKFILL,
)
from posthog.clickhouse.client.migration_tools import run_sql_with_exceptions

operations = map(
run_sql_with_exceptions,
[
CREATE_PERSONS_BATCH_EXPORT_VIEW,
CREATE_PERSONS_BATCH_EXPORT_VIEW_BACKFILL,
],
)
83 changes: 75 additions & 8 deletions posthog/temporal/batch_exports/batch_exports.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,32 @@
optimize_aggregation_in_order=1
"""

# This is an updated version of the view that we will use going forward
# We will migrate each batch export destination over one at a time to migitate
# risk, and once this is done we can clean this up.
SELECT_FROM_PERSONS_VIEW_NEW = """
SELECT
persons.team_id AS team_id,
persons.distinct_id AS distinct_id,
persons.person_id AS person_id,
persons.properties AS properties,
persons.person_distinct_id_version AS person_distinct_id_version,
persons.person_version AS person_version,
persons.created_at AS created_at,
persons._inserted_at AS _inserted_at
FROM
persons_batch_export(
team_id={team_id},
interval_start={interval_start},
interval_end={interval_end}
) AS persons
FORMAT ArrowStream
SETTINGS
max_bytes_before_external_group_by=50000000000,
max_bytes_before_external_sort=50000000000,
optimize_aggregation_in_order=1
"""

SELECT_FROM_PERSONS_VIEW_BACKFILL = """
SELECT
persons.team_id AS team_id,
Expand All @@ -89,6 +115,31 @@
optimize_aggregation_in_order=1
"""

# This is an updated version of the view that we will use going forward
# We will migrate each batch export destination over one at a time to migitate
# risk, and once this is done we can clean this up.
SELECT_FROM_PERSONS_VIEW_BACKFILL_NEW = """
SELECT
persons.team_id AS team_id,
persons.distinct_id AS distinct_id,
persons.person_id AS person_id,
persons.properties AS properties,
persons.person_distinct_id_version AS person_distinct_id_version,
persons.person_version AS person_version,
persons.created_at AS created_at,
persons._inserted_at AS _inserted_at
FROM
persons_batch_export_backfill(
team_id={team_id},
interval_end={interval_end}
) AS persons
FORMAT ArrowStream
SETTINGS
max_bytes_before_external_group_by=50000000000,
max_bytes_before_external_sort=50000000000,
optimize_aggregation_in_order=1
"""

SELECT_FROM_EVENTS_VIEW = Template(
"""
SELECT
Expand Down Expand Up @@ -195,6 +246,8 @@ async def iter_model_records(
interval_start: str | None,
interval_end: str,
destination_default_fields: list[BatchExportField] | None = None,
# TODO - remove this once all batch exports are using the latest schema
use_latest_schema: bool = False,
**parameters,
) -> AsyncRecordsGenerator:
if not is_backfill and interval_start is None:
Expand All @@ -215,6 +268,7 @@ async def iter_model_records(
extra_query_parameters=model.schema["values"] if model.schema is not None else None,
interval_start=interval_start,
interval_end=interval_end,
use_latest_schema=use_latest_schema,
**parameters,
):
yield record
Expand All @@ -241,13 +295,21 @@ async def iter_records_from_model_view(
interval_start: str | None,
interval_end: str,
fields: list[BatchExportField],
# TODO - remove this once all batch exports are using the latest schema
use_latest_schema: bool = False,
**parameters,
) -> AsyncRecordsGenerator:
if model_name == "persons":
if is_backfill and interval_start is None:
view = SELECT_FROM_PERSONS_VIEW_BACKFILL
if use_latest_schema:
view = SELECT_FROM_PERSONS_VIEW_BACKFILL_NEW
else:
view = SELECT_FROM_PERSONS_VIEW_BACKFILL
else:
view = SELECT_FROM_PERSONS_VIEW
if use_latest_schema:
view = SELECT_FROM_PERSONS_VIEW_NEW
else:
view = SELECT_FROM_PERSONS_VIEW
elif str(team_id) not in settings.ASYNC_ARROW_STREAMING_TEAM_IDS:
# TODO: Let this model be exported by `astream_query_as_arrow`.
# Just to reduce risk, I don't want to change the function that runs 100% of the exports
Expand Down Expand Up @@ -396,6 +458,8 @@ def start_produce_batch_export_record_batches(
done_ranges: list[tuple[dt.datetime, dt.datetime]],
fields: list[BatchExportField] | None = None,
destination_default_fields: list[BatchExportField] | None = None,
# TODO - remove this once all batch exports are using the latest schema
use_latest_schema: bool = False,
**parameters,
):
"""Start producing batch export record batches from a model query.
Expand All @@ -418,9 +482,15 @@ def start_produce_batch_export_record_batches(

if model_name == "persons":
if is_backfill and full_range[0] is None:
view = SELECT_FROM_PERSONS_VIEW_BACKFILL
if use_latest_schema:
view = SELECT_FROM_PERSONS_VIEW_BACKFILL_NEW
else:
view = SELECT_FROM_PERSONS_VIEW_BACKFILL
else:
view = SELECT_FROM_PERSONS_VIEW
if use_latest_schema:
view = SELECT_FROM_PERSONS_VIEW_NEW
else:
view = SELECT_FROM_PERSONS_VIEW

else:
if parameters.get("exclude_events", None):
Expand Down Expand Up @@ -762,13 +832,10 @@ class StartBatchExportRunInputs:

@activity.defn
async def start_batch_export_run(inputs: StartBatchExportRunInputs) -> BatchExportRunId:
"""Activity that creates an BatchExportRun and returns the count of records to export.
"""Activity that creates an BatchExportRun and returns the run id.
Intended to be used in all export workflows, usually at the start, to create a model
instance to represent them in our database.
Upon seeing a count of 0 records to export, batch export workflows should finish early
(i.e. without running the insert activity), as there will be nothing to export.
"""
logger = await bind_temporal_worker_logger(team_id=inputs.team_id)
await logger.ainfo(
Expand Down
57 changes: 29 additions & 28 deletions posthog/temporal/batch_exports/bigquery_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@
start_batch_export_run,
start_produce_batch_export_record_batches,
)
from posthog.temporal.batch_exports.heartbeat import (
BatchExportRangeHeartbeatDetails,
DateRange,
should_resume_from_activity_heartbeat,
)
from posthog.temporal.batch_exports.metrics import (
get_bytes_exported_metric,
get_rows_exported_metric,
Expand All @@ -49,12 +54,6 @@
cast_record_batch_json_columns,
set_status_to_running_task,
)
from posthog.temporal.batch_exports.heartbeat import (
BatchExportRangeHeartbeatDetails,
DateRange,
should_resume_from_activity_heartbeat,
)

from posthog.temporal.common.clickhouse import get_client
from posthog.temporal.common.heartbeat import Heartbeater
from posthog.temporal.common.logger import configure_temporal_worker_logger
Expand Down Expand Up @@ -174,17 +173,26 @@ async def adelete_table(
project_id: str,
dataset_id: str,
table_id: str,
table_schema: list[bigquery.SchemaField],
not_found_ok: bool = True,
) -> None:
"""Delete a table in BigQuery."""
fully_qualified_name = f"{project_id}.{dataset_id}.{table_id}"
table = bigquery.Table(fully_qualified_name, schema=table_schema)
table = bigquery.Table(fully_qualified_name)

await asyncio.to_thread(self.delete_table, table, not_found_ok=not_found_ok)

return None

async def aget_table(
self,
project_id: str,
dataset_id: str,
table_id: str,
) -> bigquery.Table:
"""Get a table in BigQuery."""
fully_qualified_name = f"{project_id}.{dataset_id}.{table_id}"
return await asyncio.to_thread(self.get_table, fully_qualified_name)

@contextlib.asynccontextmanager
async def managed_table(
self,
Expand All @@ -197,25 +205,23 @@ async def managed_table(
delete: bool = True,
create: bool = True,
) -> collections.abc.AsyncGenerator[bigquery.Table, None]:
"""Manage a table in BigQuery by ensure it exists while in context."""
"""Manage a table in BigQuery by ensuring it exists while in context."""
if create is True:
table = await self.acreate_table(project_id, dataset_id, table_id, table_schema, exists_ok)
else:
fully_qualified_name = f"{project_id}.{dataset_id}.{table_id}"
table = bigquery.Table(fully_qualified_name, schema=table_schema)
table = await self.aget_table(project_id, dataset_id, table_id)

try:
yield table
finally:
if delete is True:
await self.adelete_table(project_id, dataset_id, table_id, table_schema, not_found_ok)
await self.adelete_table(project_id, dataset_id, table_id, not_found_ok)

async def amerge_person_tables(
self,
final_table: bigquery.Table,
stage_table: bigquery.Table,
merge_key: collections.abc.Iterable[bigquery.SchemaField],
update_fields: collections.abc.Iterable[bigquery.SchemaField] | None = None,
person_version_key: str = "person_version",
person_distinct_id_version_key: str = "person_distinct_id_version",
):
Expand All @@ -233,12 +239,7 @@ async def amerge_person_tables(
values = ""
field_names = ""

if not update_fields:
update_clause_fields = final_table.schema
else:
update_clause_fields = update_fields

for n, field in enumerate(update_clause_fields):
for n, field in enumerate(final_table.schema):
if n > 0:
update_clause += ", "
values += ", "
Expand Down Expand Up @@ -409,6 +410,7 @@ async def insert_into_bigquery_activity(inputs: BigQueryInsertInputs) -> Records
include_events=inputs.include_events,
fields=fields,
destination_default_fields=bigquery_default_fields(),
use_latest_schema=True,
extra_query_parameters=extra_query_parameters,
)

Expand Down Expand Up @@ -475,17 +477,17 @@ async def insert_into_bigquery_activity(inputs: BigQueryInsertInputs) -> Records
with bigquery_client(inputs) as bq_client:
async with (
bq_client.managed_table(
inputs.project_id,
inputs.dataset_id,
inputs.table_id,
schema,
project_id=inputs.project_id,
dataset_id=inputs.dataset_id,
table_id=inputs.table_id,
table_schema=schema,
delete=False,
) as bigquery_table,
bq_client.managed_table(
inputs.project_id,
inputs.dataset_id,
stage_table_name,
schema,
project_id=inputs.project_id,
dataset_id=inputs.dataset_id,
table_id=stage_table_name,
table_schema=schema,
create=requires_merge,
delete=requires_merge,
) as bigquery_stage_table,
Expand Down Expand Up @@ -556,7 +558,6 @@ async def flush_to_bigquery(
final_table=bigquery_table,
stage_table=bigquery_stage_table,
merge_key=merge_key,
update_fields=schema,
)

return records_total
Expand Down
Loading

0 comments on commit ce66669

Please sign in to comment.