Skip to content

Commit

Permalink
fix(batch-exports): replace invalid unicode with ?
Browse files Browse the repository at this point in the history
  • Loading branch information
bretthoerner committed Mar 27, 2024
1 parent a54ae88 commit 53a8418
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 3 deletions.
36 changes: 33 additions & 3 deletions posthog/temporal/batch_exports/temporary_file.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""This module contains a temporary file to stage data in batch exports."""

import abc
import collections.abc
import contextlib
Expand All @@ -14,8 +15,25 @@
import pyarrow.parquet as pq


def replace_broken_unicode(obj):
if isinstance(obj, str):
return obj.encode("utf-8", "replace").decode("utf-8")
elif isinstance(obj, list):
return [replace_broken_unicode(item) for item in obj]
elif isinstance(obj, dict):
return {replace_broken_unicode(key): replace_broken_unicode(value) for key, value in obj.items()}
else:
return obj


def json_dumps_bytes(d) -> bytes:
return orjson.dumps(d, default=str)
try:
return orjson.dumps(d, default=str)
except orjson.JSONEncodeError:
# orjson is very strict about invalid unicode. This slow path protects us against
# things we've observed in practice, like single surrogate codes, e.g. "\ud83d"
cleaned_d = replace_broken_unicode(d)
return orjson.dumps(cleaned_d, default=str)


class BatchExportTemporaryFile:
Expand Down Expand Up @@ -131,7 +149,13 @@ def write_record_as_bytes(self, record: bytes):
def write_records_to_jsonl(self, records):
"""Write records to a temporary file as JSONL."""
if len(records) == 1:
jsonl_dump = orjson.dumps(records[0], option=orjson.OPT_APPEND_NEWLINE, default=str)
try:
jsonl_dump = orjson.dumps(records[0], option=orjson.OPT_APPEND_NEWLINE, default=str)
except orjson.JSONEncodeError:
# orjson is very strict about invalid unicode. This slow path protects us against
# things we've observed in practice, like single surrogate codes, e.g. "\ud83d"
cleaned_record = replace_broken_unicode(records[0])
jsonl_dump = orjson.dumps(cleaned_record, option=orjson.OPT_APPEND_NEWLINE, default=str)
else:
jsonl_dump = b"\n".join(map(json_dumps_bytes, records))

Expand Down Expand Up @@ -405,7 +429,13 @@ def __init__(

def write(self, content: bytes) -> int:
"""Write a single row of JSONL."""
n = self.batch_export_file.write(orjson.dumps(content, default=str) + b"\n")
try:
n = self.batch_export_file.write(orjson.dumps(content, default=str) + b"\n")
except orjson.JSONEncodeError:
# orjson is very strict about invalid unicode. This slow path protects us against
# things we've observed in practice, like single surrogate codes, e.g. "\ud83d"
cleaned_content = replace_broken_unicode(content)
n = self.batch_export_file.write(orjson.dumps(cleaned_content, default=str) + b"\n")
return n

def _write_record_batch(self, record_batch: pa.RecordBatch) -> None:
Expand Down
9 changes: 9 additions & 0 deletions posthog/temporal/tests/batch_exports/test_temporary_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,15 @@ def test_batch_export_temporary_file_write_records_to_jsonl(records):
assert be_file.records_since_last_reset == 0


def test_batch_export_temporary_file_write_records_to_jsonl_invalid_unicode():
with BatchExportTemporaryFile() as be_file:
be_file.write_records_to_jsonl(["hello\ud83dworld"])

be_file.seek(0)
# Invalid single surrogate is replaced with a question mark.
assert json.loads(be_file.readlines()[0]) == "hello?world"


@pytest.mark.parametrize(
"records",
TEST_RECORDS,
Expand Down

0 comments on commit 53a8418

Please sign in to comment.