Skip to content

Commit

Permalink
refactor: Support for multiple file formats in batch exports
Browse files Browse the repository at this point in the history
  • Loading branch information
tomasfarias committed Mar 22, 2024
1 parent 7c0258c commit f0cb9c9
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 0 deletions.
1 change: 1 addition & 0 deletions posthog/temporal/batch_exports/batch_exports.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from string import Template

import pyarrow as pa
import pyarrow.parquet as pq
from asgiref.sync import sync_to_async
from django.conf import settings
from temporalio import activity, exceptions, workflow
Expand Down
55 changes: 55 additions & 0 deletions posthog/temporal/batch_exports/s3_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
from posthog.temporal.batch_exports.base import PostHogWorkflow
from posthog.temporal.batch_exports.batch_exports import (
CreateBatchExportRunInputs,
JSONLBatchExportWriter,
ParquetBatchExportWriter,
UpdateBatchExportRunStatusInputs,
create_export_run,
default_fields,
Expand Down Expand Up @@ -634,6 +636,59 @@ def __arrow_ext_scalar_class__(self):
return JsonScalar


def cast_record_batch_json_columns(
record_batch: pa.RecordBatch,
json_columns: collections.abc.Sequence = ("properties", "person_properties", "set", "set_once"),
) -> pa.RecordBatch:
"""Cast json_columns in record_batch to JsonType.
We return a new RecordBatch with any json_columns replaced by fields casted to JsonType.
Casting is not copying the underlying array buffers, so memory usage does not increase when creating
the new array or the new record batch.
"""
column_names = set(record_batch.column_names)
intersection = column_names & set(json_columns)

casted_arrays = []
for array in record_batch.select(intersection):
if pa.types.is_string(array.type):
casted_array = array.cast(JsonType())
casted_arrays.append(casted_array)

remaining_column_names = list(column_names - intersection)
return pa.RecordBatch.from_arrays(
record_batch.select(remaining_column_names).columns + casted_arrays,
names=remaining_column_names + list(intersection),
)


class JsonScalar(pa.ExtensionScalar):
"""Represents a JSON binary string."""

def as_py(self) -> dict | None:
if self.value:
return orjson.loads(self.value.as_py())
else:
return None


class JsonType(pa.ExtensionType):
"""Type for JSON binary strings."""

def __init__(self):
super().__init__(pa.binary(), "json")

def __arrow_ext_serialize__(self):
return b""

@classmethod
def __arrow_ext_deserialize__(self, storage_type, serialized):
return JsonType()

def __arrow_ext_scalar_class__(self):
return JsonScalar


@workflow.defn(name="s3-export")
class S3BatchExportWorkflow(PostHogWorkflow):
"""A Temporal Workflow to export ClickHouse data into S3.
Expand Down

0 comments on commit f0cb9c9

Please sign in to comment.