diff --git a/posthog/temporal/batch_exports/s3_batch_export.py b/posthog/temporal/batch_exports/s3_batch_export.py index d6e95ee28fc22..927d6436d634f 100644 --- a/posthog/temporal/batch_exports/s3_batch_export.py +++ b/posthog/temporal/batch_exports/s3_batch_export.py @@ -699,7 +699,7 @@ async def insert_into_s3_activity(inputs: S3InsertInputs) -> RecordsCompleted: # Until we figure it out, we set all fields to nullable. There are some fields we know # are not nullable, but I'm opting for the more flexible option until we out why schemas differ # between batches. - [field.with_nullable(True) for field in record_batch_schema if field.name != "_inserted_at"] + [field.with_nullable(True) for field in record_batch_schema] ) async with s3_upload as s3_upload: @@ -715,6 +715,7 @@ async def insert_into_s3_activity(inputs: S3InsertInputs) -> RecordsCompleted: writer_format=WriterFormat.from_str(inputs.file_format, "S3"), max_bytes=settings.BATCH_EXPORT_S3_UPLOAD_CHUNK_SIZE_BYTES, s3_upload=s3_upload, + include_inserted_at=True, writer_file_kwargs={"compression": inputs.compression}, ) diff --git a/posthog/temporal/batch_exports/spmc.py b/posthog/temporal/batch_exports/spmc.py index 253935656b1e7..53171543db480 100644 --- a/posthog/temporal/batch_exports/spmc.py +++ b/posthog/temporal/batch_exports/spmc.py @@ -12,7 +12,10 @@ from django.conf import settings from posthog.temporal.batch_exports.heartbeat import BatchExportRangeHeartbeatDetails -from posthog.temporal.batch_exports.metrics import get_bytes_exported_metric, get_rows_exported_metric +from posthog.temporal.batch_exports.metrics import ( + get_bytes_exported_metric, + get_rows_exported_metric, +) from posthog.temporal.batch_exports.sql import ( SELECT_FROM_EVENTS_VIEW, SELECT_FROM_EVENTS_VIEW_BACKFILL, @@ -229,6 +232,7 @@ async def start( schema: pa.Schema, json_columns: collections.abc.Sequence[str], multiple_files: bool = False, + include_inserted_at: bool = False, **kwargs, ) -> int: """Start consuming record batches from queue. @@ -261,7 +265,7 @@ async def start( record_batches_count += 1 record_batch = cast_record_batch_json_columns(record_batch, json_columns=json_columns) - await writer.write_record_batch(record_batch, flush=False) + await writer.write_record_batch(record_batch, flush=False, include_inserted_at=include_inserted_at) if writer.should_flush(): records_count += writer.records_since_last_flush @@ -333,6 +337,7 @@ async def run_consumer_loop( json_columns: collections.abc.Sequence[str] = ("properties", "person_properties", "set", "set_once"), writer_file_kwargs: collections.abc.Mapping[str, typing.Any] | None = None, multiple_files: bool = False, + include_inserted_at: bool = False, **kwargs, ) -> int: """Run record batch consumers in a loop. @@ -341,6 +346,10 @@ async def run_consumer_loop( a loop. Once there is nothing left to consumer from the `RecordBatchQueue`, no more consumers will be started, and any pending consumers are awaited. + NOTE: We're starting to include the `_inserted_at` column in the record + batches, one destination at a time, so once we've added it to all + destinations, we can remove the `include_inserted_at` argument. + Returns: Number of records exported. Not the number of record batches, but the number of records in all record batches. @@ -380,6 +389,7 @@ def consumer_done_callback(task: asyncio.Task): schema=schema, json_columns=json_columns, multiple_files=multiple_files, + include_inserted_at=include_inserted_at, **writer_file_kwargs or {}, ), name=f"record_batch_consumer_{consumer_number}", diff --git a/posthog/temporal/batch_exports/temporary_file.py b/posthog/temporal/batch_exports/temporary_file.py index afe91d42412a3..9b23b6f5c9692 100644 --- a/posthog/temporal/batch_exports/temporary_file.py +++ b/posthog/temporal/batch_exports/temporary_file.py @@ -408,7 +408,9 @@ def track_bytes_written(self, batch_export_file: BatchExportTemporaryFile) -> No self.bytes_total = batch_export_file.bytes_total self.bytes_since_last_flush = batch_export_file.bytes_since_last_reset - async def write_record_batch(self, record_batch: pa.RecordBatch, flush: bool = True) -> None: + async def write_record_batch( + self, record_batch: pa.RecordBatch, flush: bool = True, include_inserted_at: bool = False + ) -> None: """Issue a record batch write tracking progress and flushing if required.""" record_batch = record_batch.sort_by("_inserted_at") @@ -429,7 +431,8 @@ async def write_record_batch(self, record_batch: pa.RecordBatch, flush: bool = T self.end_at_since_last_flush = raw_end_at column_names = record_batch.column_names - column_names.pop(column_names.index("_inserted_at")) + if not include_inserted_at: + column_names.pop(column_names.index("_inserted_at")) await asyncio.to_thread(self._write_record_batch, record_batch.select(column_names)) diff --git a/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py b/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py index 76a3c20599518..3dab18c67b283 100644 --- a/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py +++ b/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py @@ -251,10 +251,6 @@ async def assert_clickhouse_records_in_s3( for record in record_batch.to_pylist(): expected_record = {} for k, v in record.items(): - if k not in schema_column_names or k == "_inserted_at": - # _inserted_at is not exported, only used for tracking progress. - continue - if k in json_columns and v is not None: expected_record[k] = json.loads(v) elif isinstance(v, dt.datetime):