diff --git a/mypy-baseline.txt b/mypy-baseline.txt index 2fbd39073f37e..8b56066703dcd 100644 --- a/mypy-baseline.txt +++ b/mypy-baseline.txt @@ -817,62 +817,6 @@ posthog/temporal/tests/batch_exports/test_snowflake_batch_export_workflow.py:0: posthog/temporal/tests/batch_exports/test_snowflake_batch_export_workflow.py:0: error: Need type annotation for "_execute_async_calls" (hint: "_execute_async_calls: List[] = ...") [var-annotated] posthog/temporal/tests/batch_exports/test_snowflake_batch_export_workflow.py:0: error: Need type annotation for "_cursors" (hint: "_cursors: List[] = ...") [var-annotated] posthog/temporal/tests/batch_exports/test_snowflake_batch_export_workflow.py:0: error: List item 0 has incompatible type "tuple[str, str, int, int, int, int, str, int]"; expected "tuple[str, str, int, int, str, str, str, str]" [list-item] -posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 4 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "str" [arg-type] -posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 4 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "int" [arg-type] -posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 4 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "str | None" [arg-type] -posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 4 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "list[str] | None" [arg-type] -posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 4 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "str" [arg-type] -posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 4 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "int" [arg-type] -posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 4 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "str | None" [arg-type] -posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 4 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "list[str] | None" [arg-type] -posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 5 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "str" [arg-type] -posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 5 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "int" [arg-type] -posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 5 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "str | None" [arg-type] -posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 5 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "list[str] | None" [arg-type] -posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 5 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "str" [arg-type] -posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 5 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "int" [arg-type] -posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 5 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "str | None" [arg-type] -posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 5 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "list[str] | None" [arg-type] -posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 4 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "str" [arg-type] -posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 4 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "int" [arg-type] -posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 4 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "str | None" [arg-type] -posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 4 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "list[str] | None" [arg-type] -posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 4 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "str" [arg-type] -posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 4 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "int" [arg-type] -posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 4 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "str | None" [arg-type] -posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 4 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "list[str] | None" [arg-type] -posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 5 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "str" [arg-type] -posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 5 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "int" [arg-type] -posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 5 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "str | None" [arg-type] -posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 5 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "list[str] | None" [arg-type] -posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 5 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "str" [arg-type] -posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 5 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "int" [arg-type] -posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 5 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "str | None" [arg-type] -posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 5 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "list[str] | None" [arg-type] -posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 4 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "str" [arg-type] -posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 4 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "int" [arg-type] -posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 4 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "str | None" [arg-type] -posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 4 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "list[str] | None" [arg-type] -posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 4 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "str" [arg-type] -posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 4 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "int" [arg-type] -posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 4 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "str | None" [arg-type] -posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 4 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "list[str] | None" [arg-type] -posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 4 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "str" [arg-type] -posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 4 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "int" [arg-type] -posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 4 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "str | None" [arg-type] -posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 4 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "list[str] | None" [arg-type] -posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 4 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "str" [arg-type] -posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 4 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "int" [arg-type] -posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 4 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "str | None" [arg-type] -posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 4 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "list[str] | None" [arg-type] -posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 5 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "str" [arg-type] -posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 5 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "int" [arg-type] -posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 5 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "str | None" [arg-type] -posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 5 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "list[str] | None" [arg-type] -posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 5 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "str" [arg-type] -posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 5 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "int" [arg-type] -posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 5 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "str | None" [arg-type] -posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 5 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "list[str] | None" [arg-type] posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: "tuple[Any, ...]" has no attribute "last_uploaded_part_timestamp" [attr-defined] posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: "tuple[Any, ...]" has no attribute "upload_state" [attr-defined] posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: "tuple[Any, ...]" has no attribute "upload_state" [attr-defined] diff --git a/posthog/hogql_queries/insights/funnels/test/__snapshots__/test_funnel.ambr b/posthog/hogql_queries/insights/funnels/test/__snapshots__/test_funnel.ambr index 14d50251dbeca..18a9766da8327 100644 --- a/posthog/hogql_queries/insights/funnels/test/__snapshots__/test_funnel.ambr +++ b/posthog/hogql_queries/insights/funnels/test/__snapshots__/test_funnel.ambr @@ -350,7 +350,7 @@ if(and(equals(e.event, 'user signed up'), ifNull(in(e__pdi.person_id, (SELECT cohortpeople.person_id AS person_id FROM cohortpeople - WHERE and(equals(cohortpeople.team_id, 2), equals(cohortpeople.cohort_id, 2)) + WHERE and(equals(cohortpeople.team_id, 2), equals(cohortpeople.cohort_id, 1)) GROUP BY cohortpeople.person_id, cohortpeople.cohort_id, cohortpeople.version HAVING ifNull(greater(sum(cohortpeople.sign), 0), 0))), 0)), 1, 0) AS step_0, if(ifNull(equals(step_0, 1), 0), timestamp, NULL) AS latest_0, @@ -871,7 +871,7 @@ if(and(equals(e.event, 'user signed up'), ifNull(in(e__pdi.person_id, (SELECT person_static_cohort.person_id AS person_id FROM person_static_cohort - WHERE and(equals(person_static_cohort.team_id, 2), equals(person_static_cohort.cohort_id, 3)))), 0)), 1, 0) AS step_0, + WHERE and(equals(person_static_cohort.team_id, 2), equals(person_static_cohort.cohort_id, 2)))), 0)), 1, 0) AS step_0, if(ifNull(equals(step_0, 1), 0), timestamp, NULL) AS latest_0, if(equals(e.event, 'paid'), 1, 0) AS step_1, if(ifNull(equals(step_1, 1), 0), timestamp, NULL) AS latest_1 diff --git a/posthog/hogql_queries/insights/test/__snapshots__/test_lifecycle_query_runner.ambr b/posthog/hogql_queries/insights/test/__snapshots__/test_lifecycle_query_runner.ambr index ef3b23794866d..20c5e65be77f8 100644 --- a/posthog/hogql_queries/insights/test/__snapshots__/test_lifecycle_query_runner.ambr +++ b/posthog/hogql_queries/insights/test/__snapshots__/test_lifecycle_query_runner.ambr @@ -79,7 +79,7 @@ WHERE and(equals(events.team_id, 2), greaterOrEquals(toTimeZone(events.timestamp, 'UTC'), minus(toStartOfDay(assumeNotNull(parseDateTime64BestEffortOrNull('2020-01-12 00:00:00', 6, 'UTC'))), toIntervalDay(1))), less(toTimeZone(events.timestamp, 'UTC'), plus(toStartOfDay(assumeNotNull(parseDateTime64BestEffortOrNull('2020-01-19 23:59:59', 6, 'UTC'))), toIntervalDay(1))), ifNull(in(person_id, (SELECT cohortpeople.person_id AS person_id FROM cohortpeople - WHERE and(equals(cohortpeople.team_id, 2), equals(cohortpeople.cohort_id, 4)) + WHERE and(equals(cohortpeople.team_id, 2), equals(cohortpeople.cohort_id, 3)) GROUP BY cohortpeople.person_id, cohortpeople.cohort_id, cohortpeople.version HAVING ifNull(greater(sum(cohortpeople.sign), 0), 0))), 0), equals(events.event, '$pageview')) GROUP BY person_id) diff --git a/posthog/hogql_queries/insights/trends/test/__snapshots__/test_trends.ambr b/posthog/hogql_queries/insights/trends/test/__snapshots__/test_trends.ambr index d9e0cd6ed6abf..150e38a62713c 100644 --- a/posthog/hogql_queries/insights/trends/test/__snapshots__/test_trends.ambr +++ b/posthog/hogql_queries/insights/trends/test/__snapshots__/test_trends.ambr @@ -85,7 +85,7 @@ WHERE and(equals(e.team_id, 2), greaterOrEquals(toTimeZone(e.timestamp, 'UTC'), toStartOfDay(assumeNotNull(parseDateTime64BestEffortOrNull('2020-01-01 00:00:00', 6, 'UTC')))), lessOrEquals(toTimeZone(e.timestamp, 'UTC'), assumeNotNull(parseDateTime64BestEffortOrNull('2020-01-07 23:59:59', 6, 'UTC'))), ifNull(equals(e__pdi__person.`properties___$bool_prop`, 'x'), 0), and(equals(e.event, 'sign up'), ifNull(in(e__pdi.person_id, (SELECT cohortpeople.person_id AS person_id FROM cohortpeople - WHERE and(equals(cohortpeople.team_id, 2), equals(cohortpeople.cohort_id, 5)) + WHERE and(equals(cohortpeople.team_id, 2), equals(cohortpeople.cohort_id, 4)) GROUP BY cohortpeople.person_id, cohortpeople.cohort_id, cohortpeople.version HAVING ifNull(greater(sum(cohortpeople.sign), 0), 0))), 0))) GROUP BY day_start) @@ -172,7 +172,7 @@ WHERE and(equals(e.team_id, 2), greaterOrEquals(toTimeZone(e.timestamp, 'UTC'), toStartOfDay(assumeNotNull(parseDateTime64BestEffortOrNull('2020-01-01 00:00:00', 6, 'UTC')))), lessOrEquals(toTimeZone(e.timestamp, 'UTC'), assumeNotNull(parseDateTime64BestEffortOrNull('2020-01-07 23:59:59', 6, 'UTC'))), ifNull(equals(replaceRegexpAll(nullIf(nullIf(JSONExtractRaw(e.person_properties, '$bool_prop'), ''), 'null'), '^"|"$', ''), 'x'), 0), and(equals(e.event, 'sign up'), ifNull(in(ifNull(nullIf(e__override.override_person_id, '00000000-0000-0000-0000-000000000000'), e.person_id), (SELECT cohortpeople.person_id AS person_id FROM cohortpeople - WHERE and(equals(cohortpeople.team_id, 2), equals(cohortpeople.cohort_id, 6)) + WHERE and(equals(cohortpeople.team_id, 2), equals(cohortpeople.cohort_id, 5)) GROUP BY cohortpeople.person_id, cohortpeople.cohort_id, cohortpeople.version HAVING ifNull(greater(sum(cohortpeople.sign), 0), 0))), 0))) GROUP BY day_start) @@ -688,7 +688,7 @@ WHERE and(equals(e.team_id, 2), and(equals(e.event, '$pageview'), and(or(ifNull(equals(e__pdi__person.properties___name, 'p1'), 0), ifNull(equals(e__pdi__person.properties___name, 'p2'), 0), ifNull(equals(e__pdi__person.properties___name, 'p3'), 0)), ifNull(in(e__pdi.person_id, (SELECT cohortpeople.person_id AS person_id FROM cohortpeople - WHERE and(equals(cohortpeople.team_id, 2), equals(cohortpeople.cohort_id, 25)) + WHERE and(equals(cohortpeople.team_id, 2), equals(cohortpeople.cohort_id, 24)) GROUP BY cohortpeople.person_id, cohortpeople.cohort_id, cohortpeople.version HAVING ifNull(greater(sum(cohortpeople.sign), 0), 0))), 0)))) GROUP BY value @@ -757,7 +757,7 @@ WHERE and(equals(e.team_id, 2), and(and(equals(e.event, '$pageview'), and(or(ifNull(equals(e__pdi__person.properties___name, 'p1'), 0), ifNull(equals(e__pdi__person.properties___name, 'p2'), 0), ifNull(equals(e__pdi__person.properties___name, 'p3'), 0)), ifNull(in(e__pdi.person_id, (SELECT cohortpeople.person_id AS person_id FROM cohortpeople - WHERE and(equals(cohortpeople.team_id, 2), equals(cohortpeople.cohort_id, 25)) + WHERE and(equals(cohortpeople.team_id, 2), equals(cohortpeople.cohort_id, 24)) GROUP BY cohortpeople.person_id, cohortpeople.cohort_id, cohortpeople.version HAVING ifNull(greater(sum(cohortpeople.sign), 0), 0))), 0))), or(ifNull(equals(transform(ifNull(replaceRegexpAll(nullIf(nullIf(JSONExtractRaw(e.properties, 'key'), ''), 'null'), '^"|"$', ''), '$$_posthog_breakdown_null_$$'), ['$$_posthog_breakdown_other_$$', 'val'], ['$$_posthog_breakdown_other_$$', 'val'], '$$_posthog_breakdown_other_$$'), '$$_posthog_breakdown_other_$$'), 0), ifNull(equals(replaceRegexpAll(nullIf(nullIf(JSONExtractRaw(e.properties, 'key'), ''), 'null'), '^"|"$', ''), 'val'), 0))), ifNull(greaterOrEquals(timestamp, minus(assumeNotNull(parseDateTime64BestEffortOrNull('2020-01-01 00:00:00', 6, 'UTC')), toIntervalDay(7))), 0), ifNull(lessOrEquals(timestamp, assumeNotNull(parseDateTime64BestEffortOrNull('2020-01-12 23:59:59', 6, 'UTC'))), 0)) GROUP BY timestamp, actor_id, @@ -1592,7 +1592,7 @@ WHERE and(equals(e.team_id, 2), greaterOrEquals(toTimeZone(e.timestamp, 'UTC'), toStartOfDay(assumeNotNull(parseDateTime64BestEffortOrNull('2019-12-28 00:00:00', 6, 'UTC')))), lessOrEquals(toTimeZone(e.timestamp, 'UTC'), assumeNotNull(parseDateTime64BestEffortOrNull('2020-01-04 23:59:59', 6, 'UTC'))), and(equals(e.event, 'sign up'), ifNull(in(e__pdi.person_id, (SELECT cohortpeople.person_id AS person_id FROM cohortpeople - WHERE and(equals(cohortpeople.team_id, 2), equals(cohortpeople.cohort_id, 38)) + WHERE and(equals(cohortpeople.team_id, 2), equals(cohortpeople.cohort_id, 37)) GROUP BY cohortpeople.person_id, cohortpeople.cohort_id, cohortpeople.version HAVING ifNull(greater(sum(cohortpeople.sign), 0), 0))), 0))) GROUP BY value @@ -1640,7 +1640,7 @@ WHERE and(equals(e.team_id, 2), greaterOrEquals(toTimeZone(e.timestamp, 'UTC'), toStartOfDay(assumeNotNull(parseDateTime64BestEffortOrNull('2019-12-28 00:00:00', 6, 'UTC')))), lessOrEquals(toTimeZone(e.timestamp, 'UTC'), assumeNotNull(parseDateTime64BestEffortOrNull('2020-01-04 23:59:59', 6, 'UTC'))), and(equals(e.event, 'sign up'), ifNull(in(e__pdi.person_id, (SELECT cohortpeople.person_id AS person_id FROM cohortpeople - WHERE and(equals(cohortpeople.team_id, 2), equals(cohortpeople.cohort_id, 38)) + WHERE and(equals(cohortpeople.team_id, 2), equals(cohortpeople.cohort_id, 37)) GROUP BY cohortpeople.person_id, cohortpeople.cohort_id, cohortpeople.version HAVING ifNull(greater(sum(cohortpeople.sign), 0), 0))), 0)), or(ifNull(equals(transform(ifNull(replaceRegexpAll(nullIf(nullIf(JSONExtractRaw(e.properties, '$some_property'), ''), 'null'), '^"|"$', ''), '$$_posthog_breakdown_null_$$'), ['$$_posthog_breakdown_other_$$', 'value', 'other_value'], ['$$_posthog_breakdown_other_$$', 'value', 'other_value'], '$$_posthog_breakdown_other_$$'), '$$_posthog_breakdown_other_$$'), 0), ifNull(equals(replaceRegexpAll(nullIf(nullIf(JSONExtractRaw(e.properties, '$some_property'), ''), 'null'), '^"|"$', ''), 'value'), 0), ifNull(equals(replaceRegexpAll(nullIf(nullIf(JSONExtractRaw(e.properties, '$some_property'), ''), 'null'), '^"|"$', ''), 'other_value'), 0))) GROUP BY day_start, @@ -1691,7 +1691,7 @@ WHERE and(equals(e.team_id, 2), greaterOrEquals(toTimeZone(e.timestamp, 'UTC'), toStartOfDay(assumeNotNull(parseDateTime64BestEffortOrNull('2019-12-28 00:00:00', 6, 'UTC')))), lessOrEquals(toTimeZone(e.timestamp, 'UTC'), assumeNotNull(parseDateTime64BestEffortOrNull('2020-01-04 23:59:59', 6, 'UTC'))), and(equals(e.event, 'sign up'), ifNull(in(ifNull(nullIf(e__override.override_person_id, '00000000-0000-0000-0000-000000000000'), e.person_id), (SELECT cohortpeople.person_id AS person_id FROM cohortpeople - WHERE and(equals(cohortpeople.team_id, 2), equals(cohortpeople.cohort_id, 39)) + WHERE and(equals(cohortpeople.team_id, 2), equals(cohortpeople.cohort_id, 38)) GROUP BY cohortpeople.person_id, cohortpeople.cohort_id, cohortpeople.version HAVING ifNull(greater(sum(cohortpeople.sign), 0), 0))), 0))) GROUP BY value @@ -1738,7 +1738,7 @@ WHERE and(equals(e.team_id, 2), greaterOrEquals(toTimeZone(e.timestamp, 'UTC'), toStartOfDay(assumeNotNull(parseDateTime64BestEffortOrNull('2019-12-28 00:00:00', 6, 'UTC')))), lessOrEquals(toTimeZone(e.timestamp, 'UTC'), assumeNotNull(parseDateTime64BestEffortOrNull('2020-01-04 23:59:59', 6, 'UTC'))), and(equals(e.event, 'sign up'), ifNull(in(ifNull(nullIf(e__override.override_person_id, '00000000-0000-0000-0000-000000000000'), e.person_id), (SELECT cohortpeople.person_id AS person_id FROM cohortpeople - WHERE and(equals(cohortpeople.team_id, 2), equals(cohortpeople.cohort_id, 39)) + WHERE and(equals(cohortpeople.team_id, 2), equals(cohortpeople.cohort_id, 38)) GROUP BY cohortpeople.person_id, cohortpeople.cohort_id, cohortpeople.version HAVING ifNull(greater(sum(cohortpeople.sign), 0), 0))), 0)), or(ifNull(equals(transform(ifNull(replaceRegexpAll(nullIf(nullIf(JSONExtractRaw(e.properties, '$some_property'), ''), 'null'), '^"|"$', ''), '$$_posthog_breakdown_null_$$'), ['$$_posthog_breakdown_other_$$', 'value', 'other_value'], ['$$_posthog_breakdown_other_$$', 'value', 'other_value'], '$$_posthog_breakdown_other_$$'), '$$_posthog_breakdown_other_$$'), 0), ifNull(equals(replaceRegexpAll(nullIf(nullIf(JSONExtractRaw(e.properties, '$some_property'), ''), 'null'), '^"|"$', ''), 'value'), 0), ifNull(equals(replaceRegexpAll(nullIf(nullIf(JSONExtractRaw(e.properties, '$some_property'), ''), 'null'), '^"|"$', ''), 'other_value'), 0))) GROUP BY day_start, diff --git a/posthog/temporal/batch_exports/s3_batch_export.py b/posthog/temporal/batch_exports/s3_batch_export.py index be9e27327f837..6a6f4a8404995 100644 --- a/posthog/temporal/batch_exports/s3_batch_export.py +++ b/posthog/temporal/batch_exports/s3_batch_export.py @@ -12,7 +12,7 @@ from temporalio import activity, workflow from temporalio.common import RetryPolicy -from posthog.batch_exports.service import S3BatchExportInputs +from posthog.batch_exports.service import BatchExportField, BatchExportSchema, S3BatchExportInputs from posthog.temporal.batch_exports.base import PostHogWorkflow from posthog.temporal.batch_exports.batch_exports import ( BatchExportTemporaryFile, @@ -304,6 +304,7 @@ class S3InsertInputs: include_events: list[str] | None = None encryption: str | None = None kms_key_id: str | None = None + batch_export_schema: BatchExportSchema | None = None async def initialize_and_resume_multipart_upload(inputs: S3InsertInputs) -> tuple[S3MultiPartUpload, str]: @@ -360,6 +361,25 @@ async def initialize_and_resume_multipart_upload(inputs: S3InsertInputs) -> tupl return s3_upload, interval_start +def s3_default_fields() -> list[BatchExportField]: + """Default fields for an S3 batch export. + + Starting from the common default fields, we add and tweak some fields for + backwards compatibility. + """ + batch_export_fields = default_fields() + batch_export_fields.append({"expression": "elements_chain", "alias": "elements_chain"}) + batch_export_fields.append({"expression": "nullIf(person_properties, '')", "alias": "person_properties"}) + batch_export_fields.append({"expression": "toString(person_id)", "alias": "person_id"}) + # In contrast to other destinations, we do export this field. + batch_export_fields.append({"expression": "COALESCE(inserted_at, _timestamp)", "alias": "inserted_at"}) + + # Again, in contrast to other destinations, and for historical reasons, we do not include these fields. + not_exported_by_default = {"team_id", "set", "set_once"} + + return [field for field in batch_export_fields if field["alias"] not in not_exported_by_default] + + @activity.defn async def insert_into_s3_activity(inputs: S3InsertInputs): """Activity to batch export data from PostHog's ClickHouse to S3. @@ -403,11 +423,13 @@ async def insert_into_s3_activity(inputs: S3InsertInputs): s3_upload, interval_start = await initialize_and_resume_multipart_upload(inputs) - fields = default_fields() - # Fields kept for backwards compatibility with legacy apps schema. - fields.append({"expression": "elements_chain", "alias": "elements_chain"}) - fields.append({"expression": "nullIf(person_properties, '')", "alias": "person_properties"}) - fields.append({"expression": "toString(person_id)", "alias": "person_id"}) + if inputs.batch_export_schema is None: + fields = s3_default_fields() + query_parameters = None + + else: + fields = inputs.batch_export_schema["fields"] + query_parameters = inputs.batch_export_schema["values"] record_iterator = iter_records( client=client, @@ -417,9 +439,9 @@ async def insert_into_s3_activity(inputs: S3InsertInputs): exclude_events=inputs.exclude_events, include_events=inputs.include_events, fields=fields, + extra_query_parameters=query_parameters, ) - result = None last_uploaded_part_timestamp: str | None = None async def worker_shutdown_handler(): @@ -437,6 +459,8 @@ async def worker_shutdown_handler(): asyncio.create_task(worker_shutdown_handler()) + record = None + async with s3_upload as s3_upload: with BatchExportTemporaryFile(compression=inputs.compression) as local_results_file: rows_exported = get_rows_exported_metric() @@ -444,7 +468,7 @@ async def worker_shutdown_handler(): async def flush_to_s3(last_uploaded_part_timestamp: str, last=False): logger.debug( - "Uploading %spart %s containing %s records with size %s bytes", + "Uploading %s part %s containing %s records with size %s bytes", "last " if last else "", s3_upload.part_number + 1, local_results_file.records_since_last_reset, @@ -458,33 +482,22 @@ async def flush_to_s3(last_uploaded_part_timestamp: str, last=False): activity.heartbeat(last_uploaded_part_timestamp, s3_upload.to_state()) for record_batch in record_iterator: - for result in record_batch.to_pylist(): - record = { - "created_at": result["created_at"], - "distinct_id": result["distinct_id"], - "elements_chain": result["elements_chain"], - "event": result["event"], - "inserted_at": result["_inserted_at"], - "person_id": result["person_id"], - "person_properties": json.loads(result["person_properties"]) - if result["person_properties"] is not None - else None, - "properties": json.loads(result["properties"]) - if result["properties"] is not None - else None, - "timestamp": result["timestamp"], - "uuid": result["uuid"], - } + for record in record_batch.to_pylist(): + for json_column in ("properties", "person_properties", "set", "set_once"): + if (json_str := record.get(json_column, None)) is not None: + record[json_column] = json.loads(json_str) + + inserted_at = record.pop("_inserted_at") local_results_file.write_records_to_jsonl([record]) if local_results_file.tell() > settings.BATCH_EXPORT_S3_UPLOAD_CHUNK_SIZE_BYTES: - last_uploaded_part_timestamp = str(result["_inserted_at"]) + last_uploaded_part_timestamp = str(inserted_at) await flush_to_s3(last_uploaded_part_timestamp) local_results_file.reset() - if local_results_file.tell() > 0 and result is not None: - last_uploaded_part_timestamp = str(result["_inserted_at"]) + if local_results_file.tell() > 0 and record is not None: + last_uploaded_part_timestamp = str(inserted_at) await flush_to_s3(last_uploaded_part_timestamp, last=True) await s3_upload.complete() @@ -548,6 +561,7 @@ async def run(self, inputs: S3BatchExportInputs): include_events=inputs.include_events, encryption=inputs.encryption, kms_key_id=inputs.kms_key_id, + batch_export_schema=inputs.batch_export_schema, ) await execute_batch_export_insert_activity( diff --git a/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py b/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py index ab8871b3545e7..e8392b5e4fb5b 100644 --- a/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py +++ b/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py @@ -22,10 +22,13 @@ from temporalio.testing import WorkflowEnvironment from temporalio.worker import UnsandboxedWorkflowRunner, Worker +from posthog.batch_exports.service import BatchExportSchema from posthog.temporal.batch_exports.batch_exports import ( create_export_run, + iter_records, update_export_run_status, ) +from posthog.temporal.batch_exports.clickhouse import ClickHouseClient from posthog.temporal.batch_exports.s3_batch_export import ( HeartbeatDetails, S3BatchExportInputs, @@ -33,10 +36,9 @@ S3InsertInputs, get_s3_key, insert_into_s3_activity, + s3_default_fields, ) -from posthog.temporal.tests.utils.datetimes import to_isoformat from posthog.temporal.tests.utils.events import ( - EventValues, generate_test_events_in_clickhouse, ) from posthog.temporal.tests.utils.models import ( @@ -138,15 +140,34 @@ async def minio_client(bucket_name): await minio_client.delete_bucket(Bucket=bucket_name) -async def assert_events_in_s3( +async def assert_clickhouse_records_in_s3( s3_compatible_client, + clickhouse_client: ClickHouseClient, bucket_name: str, key_prefix: str, - events: list[EventValues], - compression: str | None = None, + team_id: int, + data_interval_start: dt.datetime, + data_interval_end: dt.datetime, exclude_events: list[str] | None = None, + include_events: list[str] | None = None, + batch_export_schema: BatchExportSchema | None = None, + compression: str | None = None, ): - """Assert provided events written to JSON in key_prefix in S3 bucket_name.""" + """Assert ClickHouse records are written to JSON in key_prefix in S3 bucket_name. + + Arguments: + s3_compatible_client: An S3 client used to read records; can be MinIO if doing local testing. + clickhouse_client: A ClickHouseClient used to read records that are expected to be exported. + team_id: The ID of the team that we are testing for. + bucket_name: S3 bucket name where records are exported to. + key_prefix: S3 key prefix where records are exported to. + data_interval_start: Start of the batch period for exported records. + data_interval_end: End of the batch period for exported records. + exclude_events: Event names to be excluded from the export. + include_events: Event names to be included in the export. + batch_export_schema: Custom schema used in the batch export. + compression: Optional compression used in upload. + """ # List the objects in the bucket with the prefix. objects = await s3_compatible_client.list_objects_v2(Bucket=bucket_name, Prefix=key_prefix) @@ -171,50 +192,79 @@ async def assert_events_in_s3( json_data = [json.loads(line) for line in data.decode("utf-8").split("\n") if line] # Pull out the fields we inserted only - json_data.sort(key=lambda x: (x["event"], x["created_at"])) - - # Remove team_id, _timestamp from events - if exclude_events is None: - exclude_events = [] - - def to_expected_event(event): - mapping_functions = { - "timestamp": to_isoformat, - "inserted_at": to_isoformat, - "created_at": to_isoformat, - } - # These are inserted/returned by the ClickHouse event generators, but we do not export them - # or we export them as properties. - not_exported = {"team_id", "_timestamp", "set", "set_once", "ip", "site_url", "elements"} - expected_event = { - k: mapping_functions.get(k, lambda x: x)(v) for k, v in event.items() if k not in not_exported - } - - if expected_event["inserted_at"] is None: - expected_event["inserted_at"] = ( - dt.datetime.fromisoformat(event["_timestamp"]).replace(tzinfo=dt.timezone.utc).isoformat() - ) - return expected_event - - expected_events = list( - map( - to_expected_event, - (event for event in events if event["event"] not in exclude_events), - ) - ) + if batch_export_schema is not None: + schema_column_names = [field["alias"] for field in batch_export_schema["fields"]] + else: + schema_column_names = [field["alias"] for field in s3_default_fields()] - expected_events.sort(key=lambda x: (x["event"], x["created_at"])) + json_columns = ("properties", "person_properties", "set", "set_once") - # First check one event, the first one, so that we can get a nice diff if - # the included data is different. - assert json_data[0] == expected_events[0] - assert json_data == expected_events + expected_records = [] + for record_batch in iter_records( + client=clickhouse_client, + team_id=team_id, + interval_start=data_interval_start.isoformat(), + interval_end=data_interval_end.isoformat(), + exclude_events=exclude_events, + include_events=include_events, + fields=batch_export_schema["fields"] if batch_export_schema is not None else s3_default_fields(), + extra_query_parameters=batch_export_schema["values"] if batch_export_schema is not None else None, + ): + for record in record_batch.to_pylist(): + expected_record = {} + for k, v in record.items(): + if k not in schema_column_names or k == "_inserted_at": + # _inserted_at is not exported, only used for tracking progress. + continue + + if k in json_columns and v is not None: + expected_record[k] = json.loads(v) + elif isinstance(v, dt.datetime): + # Some type precision is lost when json dumping to S3, so we have to cast this to str to match. + expected_record[k] = v.isoformat() + else: + expected_record[k] = v + + expected_records.append(expected_record) + + assert len(json_data) == len(expected_records) + assert json_data[0] == expected_records[0] + assert json_data == expected_records + + +TEST_S3_SCHEMAS: list[BatchExportSchema | None] = [ + { + "fields": [ + {"expression": "event", "alias": "my_event_name"}, + {"expression": "nullIf(JSONExtractString(properties, %(hogql_val_0)s), '')", "alias": "browser"}, + {"expression": "nullIf(JSONExtractString(properties, %(hogql_val_1)s), '')", "alias": "os"}, + {"expression": "nullIf(properties, '')", "alias": "all_properties"}, + ], + "values": {"hogql_val_0": "$browser", "hogql_val_1": "$os"}, + }, + { + "fields": [ + {"expression": "event", "alias": "my_event_name"}, + {"expression": "inserted_at", "alias": "inserted_at"}, + {"expression": "1 + 1", "alias": "two"}, + ], + "values": {}, + }, + None, +] @pytest.mark.parametrize("compression", [None, "gzip", "brotli"], indirect=True) @pytest.mark.parametrize("exclude_events", [None, ["test-exclude"]], indirect=True) +@pytest.mark.parametrize("batch_export_schema", TEST_S3_SCHEMAS) async def test_insert_into_s3_activity_puts_data_into_s3( - clickhouse_client, bucket_name, minio_client, activity_environment, compression, exclude_events + clickhouse_client, + bucket_name, + minio_client, + activity_environment, + compression, + exclude_events, + batch_export_schema: BatchExportSchema | None, ): """Test that the insert_into_s3_activity function ends up with data into S3. @@ -226,7 +276,7 @@ async def test_insert_into_s3_activity_puts_data_into_s3( * Are not duplicates of other events that are in the same batch. * Do not have an event name contained in the batch export's exclude_events. - Once we have these events, we pass them to the assert_events_in_s3 function to check + Once we have these events, we pass them to the assert_clickhouse_records_in_s3 function to check that they appear in the expected S3 bucket and key. """ data_interval_start = dt.datetime(2023, 4, 20, 14, 0, 0, tzinfo=dt.timezone.utc) @@ -236,7 +286,7 @@ async def test_insert_into_s3_activity_puts_data_into_s3( # but it's very small. team_id = randint(1, 1000000) - (events, _, _) = await generate_test_events_in_clickhouse( + await generate_test_events_in_clickhouse( client=clickhouse_client, team_id=team_id, start_time=data_interval_start, @@ -249,7 +299,7 @@ async def test_insert_into_s3_activity_puts_data_into_s3( person_properties={"utm_medium": "referral", "$initial_os": "Linux"}, ) - (events_with_no_properties, _, _) = await generate_test_events_in_clickhouse( + await generate_test_events_in_clickhouse( client=clickhouse_client, team_id=team_id, start_time=data_interval_start, @@ -289,6 +339,7 @@ async def test_insert_into_s3_activity_puts_data_into_s3( aws_secret_access_key="object_storage_root_password", compression=compression, exclude_events=exclude_events, + batch_export_schema=batch_export_schema, ) with override_settings( @@ -300,13 +351,18 @@ async def test_insert_into_s3_activity_puts_data_into_s3( ): await activity_environment.run(insert_into_s3_activity, insert_inputs) - await assert_events_in_s3( - minio_client, - bucket_name, - prefix, - events=events + events_with_no_properties, - compression=compression, + await assert_clickhouse_records_in_s3( + s3_compatible_client=minio_client, + clickhouse_client=clickhouse_client, + bucket_name=bucket_name, + key_prefix=prefix, + team_id=team_id, + data_interval_start=data_interval_start, + data_interval_end=data_interval_end, + batch_export_schema=batch_export_schema, exclude_events=exclude_events, + include_events=None, + compression=compression, ) @@ -350,6 +406,7 @@ async def s3_batch_export( @pytest.mark.parametrize("interval", ["hour", "day"], indirect=True) @pytest.mark.parametrize("compression", [None, "gzip", "brotli"], indirect=True) @pytest.mark.parametrize("exclude_events", [None, ["test-exclude"]], indirect=True) +@pytest.mark.parametrize("batch_export_schema", TEST_S3_SCHEMAS) async def test_s3_export_workflow_with_minio_bucket( clickhouse_client, minio_client, @@ -360,6 +417,7 @@ async def test_s3_export_workflow_with_minio_bucket( compression, exclude_events, s3_key_prefix, + batch_export_schema, ): """Test S3BatchExport Workflow end-to-end by using a local MinIO bucket instead of S3. @@ -373,7 +431,7 @@ async def test_s3_export_workflow_with_minio_bucket( data_interval_end = dt.datetime.fromisoformat("2023-04-25T14:30:00.000000+00:00") data_interval_start = data_interval_end - s3_batch_export.interval_time_delta - (events, _, _) = await generate_test_events_in_clickhouse( + await generate_test_events_in_clickhouse( client=clickhouse_client, team_id=ateam.pk, start_time=data_interval_start, @@ -405,6 +463,7 @@ async def test_s3_export_workflow_with_minio_bucket( batch_export_id=str(s3_batch_export.id), data_interval_end=data_interval_end.isoformat(), interval=interval, + batch_export_schema=batch_export_schema, **s3_batch_export.destination.config, ) @@ -440,13 +499,17 @@ async def test_s3_export_workflow_with_minio_bucket( run = runs[0] assert run.status == "Completed" - await assert_events_in_s3( - minio_client, - bucket_name, - s3_key_prefix, - events=events, - compression=compression, + await assert_clickhouse_records_in_s3( + s3_compatible_client=minio_client, + clickhouse_client=clickhouse_client, + bucket_name=bucket_name, + key_prefix=s3_key_prefix, + team_id=ateam.pk, + data_interval_start=data_interval_start, + data_interval_end=data_interval_end, + batch_export_schema=batch_export_schema, exclude_events=exclude_events, + compression=compression, ) @@ -475,6 +538,7 @@ async def s3_client(bucket_name, s3_key_prefix): @pytest.mark.parametrize("exclude_events", [None, ["test-exclude"]], indirect=True) @pytest.mark.parametrize("encryption", [None, "AES256", "aws:kms"], indirect=True) @pytest.mark.parametrize("bucket_name", [os.getenv("S3_TEST_BUCKET")], indirect=True) +@pytest.mark.parametrize("batch_export_schema", TEST_S3_SCHEMAS) async def test_s3_export_workflow_with_s3_bucket( s3_client, clickhouse_client, @@ -486,6 +550,7 @@ async def test_s3_export_workflow_with_s3_bucket( encryption, exclude_events, ateam, + batch_export_schema, ): """Test S3 Export Workflow end-to-end by using an S3 bucket. @@ -503,7 +568,7 @@ async def test_s3_export_workflow_with_s3_bucket( data_interval_end = dt.datetime.fromisoformat("2023-04-25T14:30:00.000000") data_interval_start = data_interval_end - s3_batch_export.interval_time_delta - (events, _, _) = await generate_test_events_in_clickhouse( + await generate_test_events_in_clickhouse( client=clickhouse_client, team_id=ateam.pk, start_time=data_interval_start, @@ -535,6 +600,7 @@ async def test_s3_export_workflow_with_s3_bucket( batch_export_id=str(s3_batch_export.id), data_interval_end=data_interval_end.isoformat(), interval=interval, + batch_export_schema=batch_export_schema, **s3_batch_export.destination.config, ) @@ -574,13 +640,18 @@ async def create_minio_client(*args, **kwargs): run = runs[0] assert run.status == "Completed" - await assert_events_in_s3( - s3_client, - bucket_name, - s3_key_prefix, - events=events, - compression=compression, + await assert_clickhouse_records_in_s3( + s3_compatible_client=minio_client, + clickhouse_client=clickhouse_client, + bucket_name=bucket_name, + key_prefix=s3_key_prefix, + team_id=ateam.pk, + data_interval_start=data_interval_start, + data_interval_end=data_interval_end, + batch_export_schema=batch_export_schema, exclude_events=exclude_events, + include_events=None, + compression=compression, ) @@ -603,12 +674,12 @@ async def test_s3_export_workflow_with_minio_bucket_and_a_lot_of_data( data_interval_end = dt.datetime.fromisoformat("2023-04-25T14:30:00.000000+00:00") data_interval_start = data_interval_end - s3_batch_export.interval_time_delta - (events, _, _) = await generate_test_events_in_clickhouse( + await generate_test_events_in_clickhouse( client=clickhouse_client, team_id=ateam.pk, start_time=data_interval_start, end_time=data_interval_end, - count=1000000, + count=100000, count_outside_range=1000, count_other_team=1000, duplicate=True, @@ -656,13 +727,16 @@ async def test_s3_export_workflow_with_minio_bucket_and_a_lot_of_data( run = runs[0] assert run.status == "Completed" - await assert_events_in_s3( - minio_client, - bucket_name, - s3_key_prefix, - events=events, - compression=compression, + await assert_clickhouse_records_in_s3( + s3_compatible_client=minio_client, + clickhouse_client=clickhouse_client, + bucket_name=bucket_name, + key_prefix=s3_key_prefix, + team_id=ateam.pk, + data_interval_start=data_interval_start, + data_interval_end=data_interval_end, exclude_events=exclude_events, + compression=compression, ) @@ -678,7 +752,7 @@ async def test_s3_export_workflow_defaults_to_timestamp_on_null_inserted_at( data_interval_end = dt.datetime.fromisoformat("2023-04-25T14:30:00.000000+00:00") data_interval_start = data_interval_end - s3_batch_export.interval_time_delta - (events, _, _) = await generate_test_events_in_clickhouse( + await generate_test_events_in_clickhouse( client=clickhouse_client, team_id=ateam.pk, start_time=data_interval_start, @@ -732,12 +806,15 @@ async def test_s3_export_workflow_defaults_to_timestamp_on_null_inserted_at( run = runs[0] assert run.status == "Completed" - await assert_events_in_s3( - minio_client, - bucket_name, - s3_key_prefix, - events, - compression, + await assert_clickhouse_records_in_s3( + s3_compatible_client=minio_client, + clickhouse_client=clickhouse_client, + bucket_name=bucket_name, + key_prefix=s3_key_prefix, + team_id=ateam.pk, + data_interval_start=data_interval_start, + data_interval_end=data_interval_end, + compression=compression, ) @@ -764,7 +841,7 @@ async def test_s3_export_workflow_with_minio_bucket_and_custom_key_prefix( data_interval_end = dt.datetime.fromisoformat("2023-04-25T14:30:00.000000+00:00") data_interval_start = data_interval_end - s3_batch_export.interval_time_delta - (events, _, _) = await generate_test_events_in_clickhouse( + await generate_test_events_in_clickhouse( client=clickhouse_client, team_id=ateam.pk, start_time=data_interval_start, @@ -832,7 +909,16 @@ async def test_s3_export_workflow_with_minio_bucket_and_custom_key_prefix( assert len(objects.get("Contents", [])) == 1 assert key.startswith(expected_key_prefix) - await assert_events_in_s3(minio_client, bucket_name, expected_key_prefix, events, compression) + await assert_clickhouse_records_in_s3( + s3_compatible_client=minio_client, + clickhouse_client=clickhouse_client, + bucket_name=bucket_name, + key_prefix=s3_key_prefix, + team_id=ateam.pk, + data_interval_start=data_interval_start, + data_interval_end=data_interval_end, + compression=compression, + ) async def test_s3_export_workflow_handles_insert_activity_errors(ateam, s3_batch_export, interval): @@ -956,7 +1042,7 @@ async def never_finish_activity(_: S3InsertInputs) -> str: prefix="/", data_interval_start="2023-01-01 00:00:00", data_interval_end="2023-01-01 01:00:00", - **base_inputs, + **base_inputs, # type: ignore ), "2023-01-01 00:00:00-2023-01-01 01:00:00.jsonl", ), @@ -965,7 +1051,7 @@ async def never_finish_activity(_: S3InsertInputs) -> str: prefix="", data_interval_start="2023-01-01 00:00:00", data_interval_end="2023-01-01 01:00:00", - **base_inputs, + **base_inputs, # type: ignore ), "2023-01-01 00:00:00-2023-01-01 01:00:00.jsonl", ), @@ -975,7 +1061,7 @@ async def never_finish_activity(_: S3InsertInputs) -> str: data_interval_start="2023-01-01 00:00:00", data_interval_end="2023-01-01 01:00:00", compression="gzip", - **base_inputs, + **base_inputs, # type: ignore ), "2023-01-01 00:00:00-2023-01-01 01:00:00.jsonl.gz", ), @@ -985,7 +1071,7 @@ async def never_finish_activity(_: S3InsertInputs) -> str: data_interval_start="2023-01-01 00:00:00", data_interval_end="2023-01-01 01:00:00", compression="brotli", - **base_inputs, + **base_inputs, # type: ignore ), "2023-01-01 00:00:00-2023-01-01 01:00:00.jsonl.br", ), @@ -994,7 +1080,7 @@ async def never_finish_activity(_: S3InsertInputs) -> str: prefix="my-fancy-prefix", data_interval_start="2023-01-01 00:00:00", data_interval_end="2023-01-01 01:00:00", - **base_inputs, + **base_inputs, # type: ignore ), "my-fancy-prefix/2023-01-01 00:00:00-2023-01-01 01:00:00.jsonl", ), @@ -1003,7 +1089,7 @@ async def never_finish_activity(_: S3InsertInputs) -> str: prefix="/my-fancy-prefix", data_interval_start="2023-01-01 00:00:00", data_interval_end="2023-01-01 01:00:00", - **base_inputs, + **base_inputs, # type: ignore ), "my-fancy-prefix/2023-01-01 00:00:00-2023-01-01 01:00:00.jsonl", ), @@ -1013,7 +1099,7 @@ async def never_finish_activity(_: S3InsertInputs) -> str: data_interval_start="2023-01-01 00:00:00", data_interval_end="2023-01-01 01:00:00", compression="gzip", - **base_inputs, + **base_inputs, # type: ignore ), "my-fancy-prefix/2023-01-01 00:00:00-2023-01-01 01:00:00.jsonl.gz", ), @@ -1023,7 +1109,7 @@ async def never_finish_activity(_: S3InsertInputs) -> str: data_interval_start="2023-01-01 00:00:00", data_interval_end="2023-01-01 01:00:00", compression="brotli", - **base_inputs, + **base_inputs, # type: ignore ), "my-fancy-prefix/2023-01-01 00:00:00-2023-01-01 01:00:00.jsonl.br", ), @@ -1032,7 +1118,7 @@ async def never_finish_activity(_: S3InsertInputs) -> str: prefix="my-fancy-prefix-with-a-forwardslash/", data_interval_start="2023-01-01 00:00:00", data_interval_end="2023-01-01 01:00:00", - **base_inputs, + **base_inputs, # type: ignore ), "my-fancy-prefix-with-a-forwardslash/2023-01-01 00:00:00-2023-01-01 01:00:00.jsonl", ), @@ -1041,7 +1127,7 @@ async def never_finish_activity(_: S3InsertInputs) -> str: prefix="/my-fancy-prefix-with-a-forwardslash/", data_interval_start="2023-01-01 00:00:00", data_interval_end="2023-01-01 01:00:00", - **base_inputs, + **base_inputs, # type: ignore ), "my-fancy-prefix-with-a-forwardslash/2023-01-01 00:00:00-2023-01-01 01:00:00.jsonl", ), @@ -1050,7 +1136,7 @@ async def never_finish_activity(_: S3InsertInputs) -> str: prefix="nested/prefix/", data_interval_start="2023-01-01 00:00:00", data_interval_end="2023-01-01 01:00:00", - **base_inputs, + **base_inputs, # type: ignore ), "nested/prefix/2023-01-01 00:00:00-2023-01-01 01:00:00.jsonl", ), @@ -1059,7 +1145,7 @@ async def never_finish_activity(_: S3InsertInputs) -> str: prefix="/nested/prefix/", data_interval_start="2023-01-01 00:00:00", data_interval_end="2023-01-01 01:00:00", - **base_inputs, + **base_inputs, # type: ignore ), "nested/prefix/2023-01-01 00:00:00-2023-01-01 01:00:00.jsonl", ), @@ -1069,7 +1155,7 @@ async def never_finish_activity(_: S3InsertInputs) -> str: data_interval_start="2023-01-01 00:00:00", data_interval_end="2023-01-01 01:00:00", compression="gzip", - **base_inputs, + **base_inputs, # type: ignore ), "nested/prefix/2023-01-01 00:00:00-2023-01-01 01:00:00.jsonl.gz", ), @@ -1079,7 +1165,7 @@ async def never_finish_activity(_: S3InsertInputs) -> str: data_interval_start="2023-01-01 00:00:00", data_interval_end="2023-01-01 01:00:00", compression="brotli", - **base_inputs, + **base_inputs, # type: ignore ), "nested/prefix/2023-01-01 00:00:00-2023-01-01 01:00:00.jsonl.br", ), @@ -1101,13 +1187,12 @@ async def test_insert_into_s3_activity_heartbeats( data_interval_end = dt.datetime.fromisoformat("2023-04-20T14:30:00.000000+00:00") data_interval_start = data_interval_end - s3_batch_export.interval_time_delta - events_in_parts = [] n_expected_parts = 3 for i in range(1, n_expected_parts + 1): part_inserted_at = data_interval_end - s3_batch_export.interval_time_delta / i - (events, _, _) = await generate_test_events_in_clickhouse( + await generate_test_events_in_clickhouse( client=clickhouse_client, team_id=ateam.pk, start_time=data_interval_start, @@ -1120,7 +1205,6 @@ async def test_insert_into_s3_activity_heartbeats( properties={"$chonky": ("a" * 5 * 1024**2)}, inserted_at=part_inserted_at, ) - events_in_parts += events current_part_number = 1 @@ -1159,4 +1243,13 @@ def assert_heartbeat_details(*details): # This checks that the assert_heartbeat_details function was actually called. # The '+ 1' is because we increment current_part_number one last time after we are done. assert current_part_number == n_expected_parts + 1 - await assert_events_in_s3(minio_client, bucket_name, s3_key_prefix, events_in_parts, None, None) + + await assert_clickhouse_records_in_s3( + s3_compatible_client=minio_client, + clickhouse_client=clickhouse_client, + bucket_name=bucket_name, + key_prefix=s3_key_prefix, + team_id=ateam.pk, + data_interval_start=data_interval_start, + data_interval_end=data_interval_end, + )