Skip to content

Commit

Permalink
feat(batch-exports): Include _inserted_at field in S3 batch exports (
Browse files Browse the repository at this point in the history
  • Loading branch information
rossgray authored Dec 19, 2024
1 parent 9fbe55a commit 5840967
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 9 deletions.
3 changes: 2 additions & 1 deletion posthog/temporal/batch_exports/s3_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -699,7 +699,7 @@ async def insert_into_s3_activity(inputs: S3InsertInputs) -> RecordsCompleted:
# Until we figure it out, we set all fields to nullable. There are some fields we know
# are not nullable, but I'm opting for the more flexible option until we out why schemas differ
# between batches.
[field.with_nullable(True) for field in record_batch_schema if field.name != "_inserted_at"]
[field.with_nullable(True) for field in record_batch_schema]
)

async with s3_upload as s3_upload:
Expand All @@ -715,6 +715,7 @@ async def insert_into_s3_activity(inputs: S3InsertInputs) -> RecordsCompleted:
writer_format=WriterFormat.from_str(inputs.file_format, "S3"),
max_bytes=settings.BATCH_EXPORT_S3_UPLOAD_CHUNK_SIZE_BYTES,
s3_upload=s3_upload,
include_inserted_at=True,
writer_file_kwargs={"compression": inputs.compression},
)

Expand Down
14 changes: 12 additions & 2 deletions posthog/temporal/batch_exports/spmc.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@
from django.conf import settings

from posthog.temporal.batch_exports.heartbeat import BatchExportRangeHeartbeatDetails
from posthog.temporal.batch_exports.metrics import get_bytes_exported_metric, get_rows_exported_metric
from posthog.temporal.batch_exports.metrics import (
get_bytes_exported_metric,
get_rows_exported_metric,
)
from posthog.temporal.batch_exports.sql import (
SELECT_FROM_EVENTS_VIEW,
SELECT_FROM_EVENTS_VIEW_BACKFILL,
Expand Down Expand Up @@ -229,6 +232,7 @@ async def start(
schema: pa.Schema,
json_columns: collections.abc.Sequence[str],
multiple_files: bool = False,
include_inserted_at: bool = False,
**kwargs,
) -> int:
"""Start consuming record batches from queue.
Expand Down Expand Up @@ -261,7 +265,7 @@ async def start(
record_batches_count += 1
record_batch = cast_record_batch_json_columns(record_batch, json_columns=json_columns)

await writer.write_record_batch(record_batch, flush=False)
await writer.write_record_batch(record_batch, flush=False, include_inserted_at=include_inserted_at)

if writer.should_flush():
records_count += writer.records_since_last_flush
Expand Down Expand Up @@ -333,6 +337,7 @@ async def run_consumer_loop(
json_columns: collections.abc.Sequence[str] = ("properties", "person_properties", "set", "set_once"),
writer_file_kwargs: collections.abc.Mapping[str, typing.Any] | None = None,
multiple_files: bool = False,
include_inserted_at: bool = False,
**kwargs,
) -> int:
"""Run record batch consumers in a loop.
Expand All @@ -341,6 +346,10 @@ async def run_consumer_loop(
a loop. Once there is nothing left to consumer from the `RecordBatchQueue`, no
more consumers will be started, and any pending consumers are awaited.
NOTE: We're starting to include the `_inserted_at` column in the record
batches, one destination at a time, so once we've added it to all
destinations, we can remove the `include_inserted_at` argument.
Returns:
Number of records exported. Not the number of record batches, but the
number of records in all record batches.
Expand Down Expand Up @@ -380,6 +389,7 @@ def consumer_done_callback(task: asyncio.Task):
schema=schema,
json_columns=json_columns,
multiple_files=multiple_files,
include_inserted_at=include_inserted_at,
**writer_file_kwargs or {},
),
name=f"record_batch_consumer_{consumer_number}",
Expand Down
7 changes: 5 additions & 2 deletions posthog/temporal/batch_exports/temporary_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,9 @@ def track_bytes_written(self, batch_export_file: BatchExportTemporaryFile) -> No
self.bytes_total = batch_export_file.bytes_total
self.bytes_since_last_flush = batch_export_file.bytes_since_last_reset

async def write_record_batch(self, record_batch: pa.RecordBatch, flush: bool = True) -> None:
async def write_record_batch(
self, record_batch: pa.RecordBatch, flush: bool = True, include_inserted_at: bool = False
) -> None:
"""Issue a record batch write tracking progress and flushing if required."""
record_batch = record_batch.sort_by("_inserted_at")

Expand All @@ -429,7 +431,8 @@ async def write_record_batch(self, record_batch: pa.RecordBatch, flush: bool = T
self.end_at_since_last_flush = raw_end_at

column_names = record_batch.column_names
column_names.pop(column_names.index("_inserted_at"))
if not include_inserted_at:
column_names.pop(column_names.index("_inserted_at"))

await asyncio.to_thread(self._write_record_batch, record_batch.select(column_names))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,10 +251,6 @@ async def assert_clickhouse_records_in_s3(
for record in record_batch.to_pylist():
expected_record = {}
for k, v in record.items():
if k not in schema_column_names or k == "_inserted_at":
# _inserted_at is not exported, only used for tracking progress.
continue

if k in json_columns and v is not None:
expected_record[k] = json.loads(v)
elif isinstance(v, dt.datetime):
Expand Down

0 comments on commit 5840967

Please sign in to comment.