Skip to content

Commit

Permalink
test: More parquet testing
Browse files Browse the repository at this point in the history
  • Loading branch information
tomasfarias committed Mar 20, 2024
1 parent 539f335 commit 80510fc
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 7 deletions.
5 changes: 3 additions & 2 deletions posthog/temporal/batch_exports/s3_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,7 @@ async def flush_to_s3(
max_bytes=settings.BATCH_EXPORT_S3_UPLOAD_CHUNK_SIZE_BYTES,
schema=schema,
)

async with writer.open_temporary_file():
rows_exported = get_rows_exported_metric()
bytes_exported = get_bytes_exported_metric()
Expand Down Expand Up @@ -602,7 +603,7 @@ class JsonScalar(pa.ExtensionScalar):

def as_py(self) -> dict | None:
if self.value:
return orjson.loads(self.value.as_py())
return orjson.loads(self.value.as_py().encode("utf-8"))
else:
return None

Expand All @@ -611,7 +612,7 @@ class JsonType(pa.ExtensionType):
"""Type for JSON binary strings."""

def __init__(self):
super().__init__(pa.binary(), "json")
super().__init__(pa.string(), "json")

def __arrow_ext_serialize__(self):
return b""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,15 @@ def s3_key_prefix():
return f"posthog-events-{str(uuid4())}"


@pytest.fixture
def file_format(request) -> str:
"""S3 file format."""
try:
return request.param
except AttributeError:
return f"JSONLines"


async def delete_all_from_s3(minio_client, bucket_name: str, key_prefix: str):
"""Delete all objects in bucket_name under key_prefix."""
response = await minio_client.list_objects_v2(Bucket=bucket_name, Prefix=key_prefix)
Expand Down Expand Up @@ -140,7 +149,7 @@ async def minio_client(bucket_name):
await minio_client.delete_bucket(Bucket=bucket_name)


async def read_parquet_from_s3(bucket_name: str, key: str) -> list:
async def read_parquet_from_s3(bucket_name: str, key: str, json_columns) -> list:
async with aioboto3.Session().client("sts") as sts:
try:
await sts.get_caller_identity()
Expand All @@ -158,7 +167,19 @@ async def read_parquet_from_s3(bucket_name: str, key: str) -> list:

parquet_data = []
for batch in table.to_batches():
parquet_data.extend(batch.to_pylist())
for record in batch.to_pylist():
casted_record = {}
for k, v in record.items():
if isinstance(v, dt.datetime):
# We read data from clickhouse as string, but parquet already casts them as dates.
# To facilitate comparison, we isoformat the dates.
casted_record[k] = v.isoformat()
elif k in json_columns and v is not None:
# Parquet doesn't have a variable map type, so JSON fields are just strings.
casted_record[k] = json.loads(v)
else:
casted_record[k] = v
parquet_data.append(casted_record)

return parquet_data

Expand Down Expand Up @@ -215,8 +236,10 @@ async def assert_clickhouse_records_in_s3(
key = objects["Contents"][0].get("Key")
assert key

json_columns = ("properties", "person_properties", "set", "set_once")

if file_format == "Parquet":
s3_data = await read_parquet_from_s3(bucket_name, key)
s3_data = await read_parquet_from_s3(bucket_name, key, json_columns)

elif file_format == "JSONLines":
s3_object = await s3_compatible_client.get_object(Bucket=bucket_name, Key=key)
Expand All @@ -230,8 +253,6 @@ async def assert_clickhouse_records_in_s3(
else:
schema_column_names = [field["alias"] for field in s3_default_fields()]

json_columns = ("properties", "person_properties", "set", "set_once")

expected_records = []
for record_batch in iter_records(
client=clickhouse_client,
Expand Down Expand Up @@ -412,6 +433,7 @@ async def s3_batch_export(
exclude_events,
temporal_client,
encryption,
file_format,
):
destination_data = {
"type": "S3",
Expand All @@ -426,6 +448,7 @@ async def s3_batch_export(
"exclude_events": exclude_events,
"encryption": encryption,
"kms_key_id": os.getenv("S3_TEST_KMS_KEY_ID") if encryption == "aws:kms" else None,
"file_format": file_format,
},
}

Expand All @@ -451,6 +474,7 @@ async def s3_batch_export(
@pytest.mark.parametrize("compression", [None, "gzip", "brotli"], indirect=True)
@pytest.mark.parametrize("exclude_events", [None, ["test-exclude"]], indirect=True)
@pytest.mark.parametrize("batch_export_schema", TEST_S3_SCHEMAS)
@pytest.mark.parametrize("file_format", ["JSONLines", "Parquet"], indirect=True)
async def test_s3_export_workflow_with_minio_bucket(
clickhouse_client,
minio_client,
Expand All @@ -462,6 +486,7 @@ async def test_s3_export_workflow_with_minio_bucket(
exclude_events,
s3_key_prefix,
batch_export_schema,
file_format,
):
"""Test S3BatchExport Workflow end-to-end by using a local MinIO bucket instead of S3.
Expand Down Expand Up @@ -549,6 +574,7 @@ async def test_s3_export_workflow_with_minio_bucket(
batch_export_schema=batch_export_schema,
exclude_events=exclude_events,
compression=compression,
file_format=file_format,
)


Expand Down Expand Up @@ -578,6 +604,7 @@ async def s3_client(bucket_name, s3_key_prefix):
@pytest.mark.parametrize("encryption", [None, "AES256", "aws:kms"], indirect=True)
@pytest.mark.parametrize("bucket_name", [os.getenv("S3_TEST_BUCKET")], indirect=True)
@pytest.mark.parametrize("batch_export_schema", TEST_S3_SCHEMAS)
@pytest.mark.parametrize("file_format", ["JSONLines", "Parquet"])
async def test_s3_export_workflow_with_s3_bucket(
s3_client,
clickhouse_client,
Expand All @@ -590,6 +617,7 @@ async def test_s3_export_workflow_with_s3_bucket(
exclude_events,
ateam,
batch_export_schema,
file_format,
):
"""Test S3 Export Workflow end-to-end by using an S3 bucket.
Expand Down Expand Up @@ -687,6 +715,7 @@ async def test_s3_export_workflow_with_s3_bucket(
exclude_events=exclude_events,
include_events=None,
compression=compression,
file_format=file_format,
)


Expand Down

0 comments on commit 80510fc

Please sign in to comment.