Skip to content

Commit

Permalink
feat: S3 supports custom schemas (#20075)
Browse files Browse the repository at this point in the history
* fix: BigQuery batch exports supports iter_records

* chore: Remove unnecessary try/except

* Update query snapshots

* Update query snapshots

* feat: S3 supports custom schemas

* chore: Update mypy-baseline.txt

* fix: Iterate over record batches

* fix: Reduce test data size

* fix: Typo in 'default'

* Update query snapshots

---------

Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com>
  • Loading branch information
tomasfarias and github-actions[bot] authored Feb 12, 2024
1 parent e917dff commit a0fcba4
Show file tree
Hide file tree
Showing 6 changed files with 247 additions and 196 deletions.
56 changes: 0 additions & 56 deletions mypy-baseline.txt
Original file line number Diff line number Diff line change
Expand Up @@ -817,62 +817,6 @@ posthog/temporal/tests/batch_exports/test_snowflake_batch_export_workflow.py:0:
posthog/temporal/tests/batch_exports/test_snowflake_batch_export_workflow.py:0: error: Need type annotation for "_execute_async_calls" (hint: "_execute_async_calls: List[<type>] = ...") [var-annotated]
posthog/temporal/tests/batch_exports/test_snowflake_batch_export_workflow.py:0: error: Need type annotation for "_cursors" (hint: "_cursors: List[<type>] = ...") [var-annotated]
posthog/temporal/tests/batch_exports/test_snowflake_batch_export_workflow.py:0: error: List item 0 has incompatible type "tuple[str, str, int, int, int, int, str, int]"; expected "tuple[str, str, int, int, str, str, str, str]" [list-item]
posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 4 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "str" [arg-type]
posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 4 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "int" [arg-type]
posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 4 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "str | None" [arg-type]
posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 4 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "list[str] | None" [arg-type]
posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 4 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "str" [arg-type]
posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 4 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "int" [arg-type]
posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 4 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "str | None" [arg-type]
posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 4 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "list[str] | None" [arg-type]
posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 5 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "str" [arg-type]
posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 5 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "int" [arg-type]
posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 5 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "str | None" [arg-type]
posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 5 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "list[str] | None" [arg-type]
posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 5 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "str" [arg-type]
posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 5 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "int" [arg-type]
posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 5 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "str | None" [arg-type]
posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 5 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "list[str] | None" [arg-type]
posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 4 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "str" [arg-type]
posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 4 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "int" [arg-type]
posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 4 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "str | None" [arg-type]
posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 4 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "list[str] | None" [arg-type]
posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 4 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "str" [arg-type]
posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 4 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "int" [arg-type]
posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 4 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "str | None" [arg-type]
posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 4 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "list[str] | None" [arg-type]
posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 5 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "str" [arg-type]
posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 5 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "int" [arg-type]
posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 5 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "str | None" [arg-type]
posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 5 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "list[str] | None" [arg-type]
posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 5 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "str" [arg-type]
posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 5 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "int" [arg-type]
posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 5 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "str | None" [arg-type]
posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 5 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "list[str] | None" [arg-type]
posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 4 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "str" [arg-type]
posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 4 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "int" [arg-type]
posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 4 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "str | None" [arg-type]
posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 4 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "list[str] | None" [arg-type]
posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 4 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "str" [arg-type]
posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 4 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "int" [arg-type]
posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 4 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "str | None" [arg-type]
posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 4 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "list[str] | None" [arg-type]
posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 4 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "str" [arg-type]
posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 4 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "int" [arg-type]
posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 4 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "str | None" [arg-type]
posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 4 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "list[str] | None" [arg-type]
posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 4 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "str" [arg-type]
posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 4 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "int" [arg-type]
posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 4 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "str | None" [arg-type]
posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 4 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "list[str] | None" [arg-type]
posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 5 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "str" [arg-type]
posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 5 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "int" [arg-type]
posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 5 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "str | None" [arg-type]
posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 5 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "list[str] | None" [arg-type]
posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 5 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "str" [arg-type]
posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 5 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "int" [arg-type]
posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 5 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "str | None" [arg-type]
posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: Argument 5 to "S3InsertInputs" has incompatible type "**dict[str, object]"; expected "list[str] | None" [arg-type]
posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: "tuple[Any, ...]" has no attribute "last_uploaded_part_timestamp" [attr-defined]
posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: "tuple[Any, ...]" has no attribute "upload_state" [attr-defined]
posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: "tuple[Any, ...]" has no attribute "upload_state" [attr-defined]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@
if(and(equals(e.event, 'user signed up'), ifNull(in(e__pdi.person_id,
(SELECT cohortpeople.person_id AS person_id
FROM cohortpeople
WHERE and(equals(cohortpeople.team_id, 2), equals(cohortpeople.cohort_id, 2))
WHERE and(equals(cohortpeople.team_id, 2), equals(cohortpeople.cohort_id, 1))
GROUP BY cohortpeople.person_id, cohortpeople.cohort_id, cohortpeople.version
HAVING ifNull(greater(sum(cohortpeople.sign), 0), 0))), 0)), 1, 0) AS step_0,
if(ifNull(equals(step_0, 1), 0), timestamp, NULL) AS latest_0,
Expand Down Expand Up @@ -871,7 +871,7 @@
if(and(equals(e.event, 'user signed up'), ifNull(in(e__pdi.person_id,
(SELECT person_static_cohort.person_id AS person_id
FROM person_static_cohort
WHERE and(equals(person_static_cohort.team_id, 2), equals(person_static_cohort.cohort_id, 3)))), 0)), 1, 0) AS step_0,
WHERE and(equals(person_static_cohort.team_id, 2), equals(person_static_cohort.cohort_id, 2)))), 0)), 1, 0) AS step_0,
if(ifNull(equals(step_0, 1), 0), timestamp, NULL) AS latest_0,
if(equals(e.event, 'paid'), 1, 0) AS step_1,
if(ifNull(equals(step_1, 1), 0), timestamp, NULL) AS latest_1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@
WHERE and(equals(events.team_id, 2), greaterOrEquals(toTimeZone(events.timestamp, 'UTC'), minus(toStartOfDay(assumeNotNull(parseDateTime64BestEffortOrNull('2020-01-12 00:00:00', 6, 'UTC'))), toIntervalDay(1))), less(toTimeZone(events.timestamp, 'UTC'), plus(toStartOfDay(assumeNotNull(parseDateTime64BestEffortOrNull('2020-01-19 23:59:59', 6, 'UTC'))), toIntervalDay(1))), ifNull(in(person_id,
(SELECT cohortpeople.person_id AS person_id
FROM cohortpeople
WHERE and(equals(cohortpeople.team_id, 2), equals(cohortpeople.cohort_id, 4))
WHERE and(equals(cohortpeople.team_id, 2), equals(cohortpeople.cohort_id, 3))
GROUP BY cohortpeople.person_id, cohortpeople.cohort_id, cohortpeople.version
HAVING ifNull(greater(sum(cohortpeople.sign), 0), 0))), 0), equals(events.event, '$pageview'))
GROUP BY person_id)
Expand Down
Loading

0 comments on commit a0fcba4

Please sign in to comment.