From f0cb9c9d757238a9f9c9d690e6032eb40a3fbb32 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Mon, 18 Mar 2024 14:02:41 +0100 Subject: [PATCH] refactor: Support for multiple file formats in batch exports --- .../temporal/batch_exports/batch_exports.py | 1 + .../temporal/batch_exports/s3_batch_export.py | 55 +++++++++++++++++++ 2 files changed, 56 insertions(+) diff --git a/posthog/temporal/batch_exports/batch_exports.py b/posthog/temporal/batch_exports/batch_exports.py index 88cf9e32f274f..795bcb5a6a4ec 100644 --- a/posthog/temporal/batch_exports/batch_exports.py +++ b/posthog/temporal/batch_exports/batch_exports.py @@ -6,6 +6,7 @@ from string import Template import pyarrow as pa +import pyarrow.parquet as pq from asgiref.sync import sync_to_async from django.conf import settings from temporalio import activity, exceptions, workflow diff --git a/posthog/temporal/batch_exports/s3_batch_export.py b/posthog/temporal/batch_exports/s3_batch_export.py index e83fe3f12915d..365aa76c58e5f 100644 --- a/posthog/temporal/batch_exports/s3_batch_export.py +++ b/posthog/temporal/batch_exports/s3_batch_export.py @@ -20,6 +20,8 @@ from posthog.temporal.batch_exports.base import PostHogWorkflow from posthog.temporal.batch_exports.batch_exports import ( CreateBatchExportRunInputs, + JSONLBatchExportWriter, + ParquetBatchExportWriter, UpdateBatchExportRunStatusInputs, create_export_run, default_fields, @@ -634,6 +636,59 @@ def __arrow_ext_scalar_class__(self): return JsonScalar +def cast_record_batch_json_columns( + record_batch: pa.RecordBatch, + json_columns: collections.abc.Sequence = ("properties", "person_properties", "set", "set_once"), +) -> pa.RecordBatch: + """Cast json_columns in record_batch to JsonType. + + We return a new RecordBatch with any json_columns replaced by fields casted to JsonType. + Casting is not copying the underlying array buffers, so memory usage does not increase when creating + the new array or the new record batch. + """ + column_names = set(record_batch.column_names) + intersection = column_names & set(json_columns) + + casted_arrays = [] + for array in record_batch.select(intersection): + if pa.types.is_string(array.type): + casted_array = array.cast(JsonType()) + casted_arrays.append(casted_array) + + remaining_column_names = list(column_names - intersection) + return pa.RecordBatch.from_arrays( + record_batch.select(remaining_column_names).columns + casted_arrays, + names=remaining_column_names + list(intersection), + ) + + +class JsonScalar(pa.ExtensionScalar): + """Represents a JSON binary string.""" + + def as_py(self) -> dict | None: + if self.value: + return orjson.loads(self.value.as_py()) + else: + return None + + +class JsonType(pa.ExtensionType): + """Type for JSON binary strings.""" + + def __init__(self): + super().__init__(pa.binary(), "json") + + def __arrow_ext_serialize__(self): + return b"" + + @classmethod + def __arrow_ext_deserialize__(self, storage_type, serialized): + return JsonType() + + def __arrow_ext_scalar_class__(self): + return JsonScalar + + @workflow.defn(name="s3-export") class S3BatchExportWorkflow(PostHogWorkflow): """A Temporal Workflow to export ClickHouse data into S3.