diff --git a/posthog/temporal/batch_exports/s3_batch_export.py b/posthog/temporal/batch_exports/s3_batch_export.py index 996fccdd94aca..e83fe3f12915d 100644 --- a/posthog/temporal/batch_exports/s3_batch_export.py +++ b/posthog/temporal/batch_exports/s3_batch_export.py @@ -634,59 +634,6 @@ def __arrow_ext_scalar_class__(self): return JsonScalar -def cast_record_batch_json_columns( - record_batch: pa.RecordBatch, - json_columns: collections.abc.Sequence = ("properties", "person_properties", "set", "set_once"), -) -> pa.RecordBatch: - """Cast json_columns in record_batch to JsonType. - - We return a new RecordBatch with any json_columns replaced by fields casted to JsonType. - Casting is not copying the underlying array buffers, so memory usage does not increase when creating - the new array or the new record batch. - """ - column_names = set(record_batch.column_names) - intersection = column_names & set(json_columns) - - casted_arrays = [] - for array in record_batch.select(intersection): - if pa.types.is_string(array.type): - casted_array = array.cast(JsonType()) - casted_arrays.append(casted_array) - - remaining_column_names = list(column_names - intersection) - return pa.RecordBatch.from_arrays( - record_batch.select(remaining_column_names).columns + casted_arrays, - names=remaining_column_names + list(intersection), - ) - - -class JsonScalar(pa.ExtensionScalar): - """Represents a JSON binary string.""" - - def as_py(self) -> dict | None: - if self.value: - return orjson.loads(self.value.as_py()) - else: - return None - - -class JsonType(pa.ExtensionType): - """Type for JSON binary strings.""" - - def __init__(self): - super().__init__(pa.string(), "json") - - def __arrow_ext_serialize__(self): - return b"" - - @classmethod - def __arrow_ext_deserialize__(self, storage_type, serialized): - return JsonType() - - def __arrow_ext_scalar_class__(self): - return JsonScalar - - @workflow.defn(name="s3-export") class S3BatchExportWorkflow(PostHogWorkflow): """A Temporal Workflow to export ClickHouse data into S3.