Skip to content

Commit

Permalink
fix: Remove redefinition caused by rebase
Browse files Browse the repository at this point in the history
  • Loading branch information
tomasfarias committed Mar 22, 2024
1 parent 6fa3360 commit c0b695d
Showing 1 changed file with 0 additions and 53 deletions.
53 changes: 0 additions & 53 deletions posthog/temporal/batch_exports/s3_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -634,59 +634,6 @@ def __arrow_ext_scalar_class__(self):
return JsonScalar


def cast_record_batch_json_columns(
record_batch: pa.RecordBatch,
json_columns: collections.abc.Sequence = ("properties", "person_properties", "set", "set_once"),
) -> pa.RecordBatch:
"""Cast json_columns in record_batch to JsonType.
We return a new RecordBatch with any json_columns replaced by fields casted to JsonType.
Casting is not copying the underlying array buffers, so memory usage does not increase when creating
the new array or the new record batch.
"""
column_names = set(record_batch.column_names)
intersection = column_names & set(json_columns)

casted_arrays = []
for array in record_batch.select(intersection):
if pa.types.is_string(array.type):
casted_array = array.cast(JsonType())
casted_arrays.append(casted_array)

remaining_column_names = list(column_names - intersection)
return pa.RecordBatch.from_arrays(
record_batch.select(remaining_column_names).columns + casted_arrays,
names=remaining_column_names + list(intersection),
)


class JsonScalar(pa.ExtensionScalar):
"""Represents a JSON binary string."""

def as_py(self) -> dict | None:
if self.value:
return orjson.loads(self.value.as_py())
else:
return None


class JsonType(pa.ExtensionType):
"""Type for JSON binary strings."""

def __init__(self):
super().__init__(pa.string(), "json")

def __arrow_ext_serialize__(self):
return b""

@classmethod
def __arrow_ext_deserialize__(self, storage_type, serialized):
return JsonType()

def __arrow_ext_scalar_class__(self):
return JsonScalar


@workflow.defn(name="s3-export")
class S3BatchExportWorkflow(PostHogWorkflow):
"""A Temporal Workflow to export ClickHouse data into S3.
Expand Down

0 comments on commit c0b695d

Please sign in to comment.