Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(batch-exports): Include _inserted_at field in S3 batch exports #27019

Merged
merged 2 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion posthog/temporal/batch_exports/s3_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -699,7 +699,7 @@ async def insert_into_s3_activity(inputs: S3InsertInputs) -> RecordsCompleted:
# Until we figure it out, we set all fields to nullable. There are some fields we know
# are not nullable, but I'm opting for the more flexible option until we out why schemas differ
# between batches.
[field.with_nullable(True) for field in record_batch_schema if field.name != "_inserted_at"]
[field.with_nullable(True) for field in record_batch_schema]
)

async with s3_upload as s3_upload:
Expand All @@ -715,6 +715,7 @@ async def insert_into_s3_activity(inputs: S3InsertInputs) -> RecordsCompleted:
writer_format=WriterFormat.from_str(inputs.file_format, "S3"),
max_bytes=settings.BATCH_EXPORT_S3_UPLOAD_CHUNK_SIZE_BYTES,
s3_upload=s3_upload,
include_inserted_at=True,
writer_file_kwargs={"compression": inputs.compression},
)

Expand Down
14 changes: 12 additions & 2 deletions posthog/temporal/batch_exports/spmc.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@
from django.conf import settings

from posthog.temporal.batch_exports.heartbeat import BatchExportRangeHeartbeatDetails
from posthog.temporal.batch_exports.metrics import get_bytes_exported_metric, get_rows_exported_metric
from posthog.temporal.batch_exports.metrics import (
get_bytes_exported_metric,
get_rows_exported_metric,
)
from posthog.temporal.batch_exports.sql import (
SELECT_FROM_EVENTS_VIEW,
SELECT_FROM_EVENTS_VIEW_BACKFILL,
Expand Down Expand Up @@ -229,6 +232,7 @@ async def start(
schema: pa.Schema,
json_columns: collections.abc.Sequence[str],
multiple_files: bool = False,
include_inserted_at: bool = False,
**kwargs,
) -> int:
"""Start consuming record batches from queue.
Expand Down Expand Up @@ -261,7 +265,7 @@ async def start(
record_batches_count += 1
record_batch = cast_record_batch_json_columns(record_batch, json_columns=json_columns)

await writer.write_record_batch(record_batch, flush=False)
await writer.write_record_batch(record_batch, flush=False, include_inserted_at=include_inserted_at)

if writer.should_flush():
records_count += writer.records_since_last_flush
Expand Down Expand Up @@ -333,6 +337,7 @@ async def run_consumer_loop(
json_columns: collections.abc.Sequence[str] = ("properties", "person_properties", "set", "set_once"),
writer_file_kwargs: collections.abc.Mapping[str, typing.Any] | None = None,
multiple_files: bool = False,
include_inserted_at: bool = False,
**kwargs,
) -> int:
"""Run record batch consumers in a loop.
Expand All @@ -341,6 +346,10 @@ async def run_consumer_loop(
a loop. Once there is nothing left to consumer from the `RecordBatchQueue`, no
more consumers will be started, and any pending consumers are awaited.

NOTE: We're starting to include the `_inserted_at` column in the record
batches, one destination at a time, so once we've added it to all
destinations, we can remove the `include_inserted_at` argument.

Returns:
Number of records exported. Not the number of record batches, but the
number of records in all record batches.
Expand Down Expand Up @@ -380,6 +389,7 @@ def consumer_done_callback(task: asyncio.Task):
schema=schema,
json_columns=json_columns,
multiple_files=multiple_files,
include_inserted_at=include_inserted_at,
**writer_file_kwargs or {},
),
name=f"record_batch_consumer_{consumer_number}",
Expand Down
7 changes: 5 additions & 2 deletions posthog/temporal/batch_exports/temporary_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,9 @@ def track_bytes_written(self, batch_export_file: BatchExportTemporaryFile) -> No
self.bytes_total = batch_export_file.bytes_total
self.bytes_since_last_flush = batch_export_file.bytes_since_last_reset

async def write_record_batch(self, record_batch: pa.RecordBatch, flush: bool = True) -> None:
async def write_record_batch(
self, record_batch: pa.RecordBatch, flush: bool = True, include_inserted_at: bool = False
) -> None:
"""Issue a record batch write tracking progress and flushing if required."""
record_batch = record_batch.sort_by("_inserted_at")

Expand All @@ -429,7 +431,8 @@ async def write_record_batch(self, record_batch: pa.RecordBatch, flush: bool = T
self.end_at_since_last_flush = raw_end_at

column_names = record_batch.column_names
column_names.pop(column_names.index("_inserted_at"))
if not include_inserted_at:
column_names.pop(column_names.index("_inserted_at"))

await asyncio.to_thread(self._write_record_batch, record_batch.select(column_names))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,10 +251,6 @@ async def assert_clickhouse_records_in_s3(
for record in record_batch.to_pylist():
expected_record = {}
for k, v in record.items():
if k not in schema_column_names or k == "_inserted_at":
# _inserted_at is not exported, only used for tracking progress.
continue

if k in json_columns and v is not None:
expected_record[k] = json.loads(v)
elif isinstance(v, dt.datetime):
Expand Down
Loading