diff --git a/posthog/temporal/batch_exports/s3_batch_export.py b/posthog/temporal/batch_exports/s3_batch_export.py index 836785fe388b9..24455d8716fb8 100644 --- a/posthog/temporal/batch_exports/s3_batch_export.py +++ b/posthog/temporal/batch_exports/s3_batch_export.py @@ -535,6 +535,7 @@ async def flush_to_s3( max_bytes=settings.BATCH_EXPORT_S3_UPLOAD_CHUNK_SIZE_BYTES, schema=schema, ) + async with writer.open_temporary_file(): rows_exported = get_rows_exported_metric() bytes_exported = get_bytes_exported_metric() @@ -602,7 +603,7 @@ class JsonScalar(pa.ExtensionScalar): def as_py(self) -> dict | None: if self.value: - return orjson.loads(self.value.as_py()) + return orjson.loads(self.value.as_py().encode("utf-8")) else: return None @@ -611,7 +612,7 @@ class JsonType(pa.ExtensionType): """Type for JSON binary strings.""" def __init__(self): - super().__init__(pa.binary(), "json") + super().__init__(pa.string(), "json") def __arrow_ext_serialize__(self): return b"" diff --git a/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py b/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py index 2c24f16926349..8c700fb191f2b 100644 --- a/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py +++ b/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py @@ -109,6 +109,15 @@ def s3_key_prefix(): return f"posthog-events-{str(uuid4())}" +@pytest.fixture +def file_format(request) -> str: + """S3 file format.""" + try: + return request.param + except AttributeError: + return f"JSONLines" + + async def delete_all_from_s3(minio_client, bucket_name: str, key_prefix: str): """Delete all objects in bucket_name under key_prefix.""" response = await minio_client.list_objects_v2(Bucket=bucket_name, Prefix=key_prefix) @@ -140,7 +149,7 @@ async def minio_client(bucket_name): await minio_client.delete_bucket(Bucket=bucket_name) -async def read_parquet_from_s3(bucket_name: str, key: str) -> list: +async def read_parquet_from_s3(bucket_name: str, key: str, json_columns) -> list: async with aioboto3.Session().client("sts") as sts: try: await sts.get_caller_identity() @@ -158,7 +167,19 @@ async def read_parquet_from_s3(bucket_name: str, key: str) -> list: parquet_data = [] for batch in table.to_batches(): - parquet_data.extend(batch.to_pylist()) + for record in batch.to_pylist(): + casted_record = {} + for k, v in record.items(): + if isinstance(v, dt.datetime): + # We read data from clickhouse as string, but parquet already casts them as dates. + # To facilitate comparison, we isoformat the dates. + casted_record[k] = v.isoformat() + elif k in json_columns and v is not None: + # Parquet doesn't have a variable map type, so JSON fields are just strings. + casted_record[k] = json.loads(v) + else: + casted_record[k] = v + parquet_data.append(casted_record) return parquet_data @@ -215,8 +236,10 @@ async def assert_clickhouse_records_in_s3( key = objects["Contents"][0].get("Key") assert key + json_columns = ("properties", "person_properties", "set", "set_once") + if file_format == "Parquet": - s3_data = await read_parquet_from_s3(bucket_name, key) + s3_data = await read_parquet_from_s3(bucket_name, key, json_columns) elif file_format == "JSONLines": s3_object = await s3_compatible_client.get_object(Bucket=bucket_name, Key=key) @@ -230,8 +253,6 @@ async def assert_clickhouse_records_in_s3( else: schema_column_names = [field["alias"] for field in s3_default_fields()] - json_columns = ("properties", "person_properties", "set", "set_once") - expected_records = [] for record_batch in iter_records( client=clickhouse_client, @@ -412,6 +433,7 @@ async def s3_batch_export( exclude_events, temporal_client, encryption, + file_format, ): destination_data = { "type": "S3", @@ -426,6 +448,7 @@ async def s3_batch_export( "exclude_events": exclude_events, "encryption": encryption, "kms_key_id": os.getenv("S3_TEST_KMS_KEY_ID") if encryption == "aws:kms" else None, + "file_format": file_format, }, } @@ -451,6 +474,7 @@ async def s3_batch_export( @pytest.mark.parametrize("compression", [None, "gzip", "brotli"], indirect=True) @pytest.mark.parametrize("exclude_events", [None, ["test-exclude"]], indirect=True) @pytest.mark.parametrize("batch_export_schema", TEST_S3_SCHEMAS) +@pytest.mark.parametrize("file_format", ["JSONLines", "Parquet"], indirect=True) async def test_s3_export_workflow_with_minio_bucket( clickhouse_client, minio_client, @@ -462,6 +486,7 @@ async def test_s3_export_workflow_with_minio_bucket( exclude_events, s3_key_prefix, batch_export_schema, + file_format, ): """Test S3BatchExport Workflow end-to-end by using a local MinIO bucket instead of S3. @@ -549,6 +574,7 @@ async def test_s3_export_workflow_with_minio_bucket( batch_export_schema=batch_export_schema, exclude_events=exclude_events, compression=compression, + file_format=file_format, ) @@ -578,6 +604,7 @@ async def s3_client(bucket_name, s3_key_prefix): @pytest.mark.parametrize("encryption", [None, "AES256", "aws:kms"], indirect=True) @pytest.mark.parametrize("bucket_name", [os.getenv("S3_TEST_BUCKET")], indirect=True) @pytest.mark.parametrize("batch_export_schema", TEST_S3_SCHEMAS) +@pytest.mark.parametrize("file_format", ["JSONLines", "Parquet"]) async def test_s3_export_workflow_with_s3_bucket( s3_client, clickhouse_client, @@ -590,6 +617,7 @@ async def test_s3_export_workflow_with_s3_bucket( exclude_events, ateam, batch_export_schema, + file_format, ): """Test S3 Export Workflow end-to-end by using an S3 bucket. @@ -687,6 +715,7 @@ async def test_s3_export_workflow_with_s3_bucket( exclude_events=exclude_events, include_events=None, compression=compression, + file_format=file_format, )