Skip to content

Commit

Permalink
test(batch-exports): Add specific test for garbage jsonl data
Browse files Browse the repository at this point in the history
  • Loading branch information
tomasfarias committed Oct 28, 2024
1 parent a7f4eb6 commit 83accff
Showing 1 changed file with 99 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import operator
import os
import re
import tempfile
import unittest.mock
import uuid
from collections import deque
Expand Down Expand Up @@ -1143,6 +1144,104 @@ async def test_insert_into_snowflake_activity_merges_data_in_follow_up_runs(
)


@pytest.fixture
def garbage_jsonl_file():
"""Manage a JSON file with garbage data."""
with tempfile.NamedTemporaryFile("w+b", suffix=".jsonl", prefix="garbage_") as garbage_jsonl_file:
garbage_jsonl_file.write(b'{"team_id": totally not an integer}\n')
garbage_jsonl_file.seek(0)

yield garbage_jsonl_file.name


@SKIP_IF_MISSING_REQUIRED_ENV_VARS
async def test_insert_into_snowflake_activity_removes_internal_stage_files(
clickhouse_client,
activity_environment,
snowflake_cursor,
snowflake_config,
generate_test_data,
data_interval_start,
data_interval_end,
ateam,
garbage_jsonl_file,
):
"""Test that the `insert_into_snowflake_activity` removes internal stage files.
This test requires some setup steps:
1. We do a first run of the activity to create the export table. Since we
haven't added any garbage, this should work normally.
2. Truncate the table to avoid duplicate data once we re-run the activity.
3. PUT a file with garbage data into the table internal stage.
Once we run the activity a second time, it should first clear up the garbage
file and not fail the COPY. After this second execution is done, and besides
checking this second run worked and exported data, we also check that no files
are present in the table's internal stage.
"""
model = BatchExportModel(name="events", schema=None)

table_name = f"test_insert_activity_table_remove_{ateam.pk}"

insert_inputs = SnowflakeInsertInputs(
team_id=ateam.pk,
table_name=table_name,
data_interval_start=data_interval_start.isoformat(),
data_interval_end=data_interval_end.isoformat(),
batch_export_model=model,
**snowflake_config,
)

await activity_environment.run(insert_into_snowflake_activity, insert_inputs)

await assert_clickhouse_records_in_snowflake(
snowflake_cursor=snowflake_cursor,
clickhouse_client=clickhouse_client,
table_name=table_name,
team_id=ateam.pk,
data_interval_start=data_interval_start,
data_interval_end=data_interval_end,
batch_export_model=model,
sort_key="event",
)

snowflake_cursor.execute(f'TRUNCATE TABLE "{table_name}"')

data_interval_end_str = data_interval_end.strftime("%Y-%m-%d_%H-%M-%S")

put_query = f"""
PUT file://{garbage_jsonl_file} '@%"{table_name}"/{data_interval_end_str}'
"""
snowflake_cursor.execute(put_query)

list_query = f"""
LIST '@%"{table_name}"'
"""
snowflake_cursor.execute(list_query)
rows = snowflake_cursor.fetchall()
columns = {index: metadata.name for index, metadata in enumerate(snowflake_cursor.description)}
stage_files = [{columns[index]: row[index] for index in columns.keys()} for row in rows]
assert len(stage_files) == 1
assert stage_files[0]["name"] == f"{data_interval_end_str}/{os.path.basename(garbage_jsonl_file)}.gz"

await activity_environment.run(insert_into_snowflake_activity, insert_inputs)

await assert_clickhouse_records_in_snowflake(
snowflake_cursor=snowflake_cursor,
clickhouse_client=clickhouse_client,
table_name=table_name,
team_id=ateam.pk,
data_interval_start=data_interval_start,
data_interval_end=data_interval_end,
batch_export_model=model,
sort_key="event",
)

snowflake_cursor.execute(list_query)
rows = snowflake_cursor.fetchall()
assert len(rows) == 0


@SKIP_IF_MISSING_REQUIRED_ENV_VARS
@pytest.mark.parametrize("interval", ["hour", "day"], indirect=True)
@pytest.mark.parametrize("exclude_events", [None, ["test-exclude"]], indirect=True)
Expand Down

0 comments on commit 83accff

Please sign in to comment.