Skip to content

Commit

Permalink
chore(data-waehouse): Added job cancellation checking into rest source (
Browse files Browse the repository at this point in the history
#22854)

* Added job cancellation checking into rest source

* Remove chunk size

* fixed mypy

* Fixed test

* fixed mypy

* Fixed test

* Fixed silly silly code

* Fixed test

* Fixed stripe columns
  • Loading branch information
Gilbert09 authored Jun 11, 2024
1 parent 2b40181 commit 6f745af
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 50 deletions.
38 changes: 22 additions & 16 deletions mypy-baseline.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,6 @@ posthog/temporal/data_imports/pipelines/rest_source/config_setup.py:0: error: Un
posthog/temporal/data_imports/pipelines/rest_source/config_setup.py:0: error: Unpacked dict entry 1 has incompatible type "dict[str, Any] | None"; expected "SupportsKeysAndGetItem[str, Any]" [dict-item]
posthog/temporal/data_imports/pipelines/rest_source/config_setup.py:0: error: Unpacked dict entry 0 has incompatible type "dict[str, Any] | None"; expected "SupportsKeysAndGetItem[str, ResolveParamConfig | IncrementalParamConfig | Any]" [dict-item]
posthog/temporal/data_imports/pipelines/rest_source/config_setup.py:0: error: Unpacked dict entry 1 has incompatible type "dict[str, ResolveParamConfig | IncrementalParamConfig | Any] | None"; expected "SupportsKeysAndGetItem[str, ResolveParamConfig | IncrementalParamConfig | Any]" [dict-item]
posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Not all union combinations were tried because there are too many unions [misc]
posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Argument 2 to "source" has incompatible type "str | None"; expected "str" [arg-type]
posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Argument 3 to "source" has incompatible type "str | None"; expected "str" [arg-type]
posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Argument 4 to "source" has incompatible type "int | None"; expected "int" [arg-type]
posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Argument 6 to "source" has incompatible type "Schema | None"; expected "Schema" [arg-type]
posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Argument 7 to "source" has incompatible type "Literal['evolve', 'discard_value', 'freeze', 'discard_row'] | TSchemaContractDict | None"; expected "Literal['evolve', 'discard_value', 'freeze', 'discard_row'] | TSchemaContractDict" [arg-type]
posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Argument 8 to "source" has incompatible type "type[BaseConfiguration] | None"; expected "type[BaseConfiguration]" [arg-type]
posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Argument 1 to "build_resource_dependency_graph" has incompatible type "EndpointResourceBase | None"; expected "EndpointResourceBase" [arg-type]
posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Need type annotation for "resources" (hint: "resources: dict[<type>, <type>] = ...") [var-annotated]
posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Incompatible types in assignment (expression has type "ResolvedParam | None", variable has type "ResolvedParam") [assignment]
posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Incompatible types in assignment (expression has type "list[str] | None", variable has type "list[str]") [assignment]
posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Argument 1 to "setup_incremental_object" has incompatible type "dict[str, ResolveParamConfig | IncrementalParamConfig | Any] | None"; expected "dict[str, Any]" [arg-type]
posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Statement is unreachable [unreachable]
posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Argument 1 to "exclude_keys" has incompatible type "dict[str, ResolveParamConfig | IncrementalParamConfig | Any] | None"; expected "Mapping[str, Any]" [arg-type]
posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Incompatible default for argument "incremental_param" (default has type "IncrementalParam | None", argument has type "IncrementalParam") [assignment]
posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Argument "module" to "SourceInfo" has incompatible type Module | None; expected Module [arg-type]
posthog/hogql/database/schema/numbers.py:0: error: Incompatible types in assignment (expression has type "dict[str, IntegerDatabaseField]", variable has type "dict[str, FieldOrTable]") [assignment]
posthog/hogql/database/schema/numbers.py:0: note: "Dict" is invariant -- see https://mypy.readthedocs.io/en/stable/common_issues.html#variance
posthog/hogql/database/schema/numbers.py:0: note: Consider using "Mapping" instead, which is covariant in the value type
Expand Down Expand Up @@ -625,6 +609,21 @@ posthog/temporal/data_imports/pipelines/zendesk/helpers.py:0: error: Argument 1
posthog/temporal/data_imports/pipelines/zendesk/helpers.py:0: error: Argument 1 to "ensure_pendulum_datetime" has incompatible type "DateTime | Date | datetime | date | str | float | int | None"; expected "DateTime | Date | datetime | date | str | float | int" [arg-type]
posthog/temporal/data_imports/pipelines/zendesk/helpers.py:0: error: Item "None" of "DateTime | None" has no attribute "int_timestamp" [union-attr]
posthog/temporal/data_imports/pipelines/zendesk/helpers.py:0: error: Argument 1 to "ensure_pendulum_datetime" has incompatible type "str | None"; expected "DateTime | Date | datetime | date | str | float | int" [arg-type]
posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Not all union combinations were tried because there are too many unions [misc]
posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Argument 2 to "source" has incompatible type "str | None"; expected "str" [arg-type]
posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Argument 3 to "source" has incompatible type "str | None"; expected "str" [arg-type]
posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Argument 4 to "source" has incompatible type "int | None"; expected "int" [arg-type]
posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Argument 6 to "source" has incompatible type "Schema | None"; expected "Schema" [arg-type]
posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Argument 7 to "source" has incompatible type "Literal['evolve', 'discard_value', 'freeze', 'discard_row'] | TSchemaContractDict | None"; expected "Literal['evolve', 'discard_value', 'freeze', 'discard_row'] | TSchemaContractDict" [arg-type]
posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Argument 8 to "source" has incompatible type "type[BaseConfiguration] | None"; expected "type[BaseConfiguration]" [arg-type]
posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Argument 1 to "build_resource_dependency_graph" has incompatible type "EndpointResourceBase | None"; expected "EndpointResourceBase" [arg-type]
posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Incompatible types in assignment (expression has type "list[str] | None", variable has type "list[str]") [assignment]
posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Argument 1 to "setup_incremental_object" has incompatible type "dict[str, ResolveParamConfig | IncrementalParamConfig | Any] | None"; expected "dict[str, Any]" [arg-type]
posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Argument "base_url" to "RESTClient" has incompatible type "str | None"; expected "str" [arg-type]
posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Argument 1 to "exclude_keys" has incompatible type "dict[str, ResolveParamConfig | IncrementalParamConfig | Any] | None"; expected "Mapping[str, Any]" [arg-type]
posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Incompatible default for argument "resolved_param" (default has type "ResolvedParam | None", argument has type "ResolvedParam") [assignment]
posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore]
posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Argument "module" to "SourceInfo" has incompatible type Module | None; expected Module [arg-type]
posthog/tasks/exports/test/test_csv_exporter.py:0: error: Function is missing a return type annotation [no-untyped-def]
posthog/tasks/exports/test/test_csv_exporter.py:0: error: Function is missing a type annotation [no-untyped-def]
posthog/tasks/exports/test/test_csv_exporter.py:0: error: Function is missing a type annotation for one or more arguments [no-untyped-def]
Expand Down Expand Up @@ -721,6 +720,13 @@ posthog/temporal/tests/batch_exports/test_run_updates.py:0: error: Unused "type:
posthog/temporal/tests/batch_exports/test_run_updates.py:0: error: Unused "type: ignore" comment [unused-ignore]
posthog/temporal/tests/batch_exports/test_run_updates.py:0: error: Unused "type: ignore" comment [unused-ignore]
posthog/temporal/tests/batch_exports/test_batch_exports.py:0: error: TypedDict key must be a string literal; expected one of ("_timestamp", "created_at", "distinct_id", "elements", "elements_chain", ...) [literal-required]
posthog/temporal/data_imports/pipelines/stripe/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore]
posthog/temporal/data_imports/pipelines/stripe/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore]
posthog/temporal/data_imports/pipelines/stripe/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore]
posthog/temporal/data_imports/pipelines/stripe/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore]
posthog/temporal/data_imports/pipelines/stripe/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore]
posthog/temporal/data_imports/pipelines/stripe/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore]
posthog/temporal/data_imports/pipelines/stripe/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore]
posthog/session_recordings/session_recording_api.py:0: error: Argument "team_id" to "get_realtime_snapshots" has incompatible type "int"; expected "str" [arg-type]
posthog/queries/app_metrics/test/test_app_metrics.py:0: error: Argument 3 to "AppMetricsErrorDetailsQuery" has incompatible type "AppMetricsRequestSerializer"; expected "AppMetricsErrorsRequestSerializer" [arg-type]
posthog/queries/app_metrics/test/test_app_metrics.py:0: error: Argument 3 to "AppMetricsErrorDetailsQuery" has incompatible type "AppMetricsRequestSerializer"; expected "AppMetricsErrorsRequestSerializer" [arg-type]
Expand Down
14 changes: 3 additions & 11 deletions posthog/temporal/data_imports/pipelines/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,14 @@
from django.db.models import F
from posthog.warehouse.util import database_sync_to_async

CHUNK_SIZE = 10_000


async def check_limit(
async def is_job_cancelled(
team_id: int,
job_id: str,
new_count: int,
):
) -> bool:
model = await aget_external_data_job(team_id, job_id)

if new_count >= CHUNK_SIZE:
new_count = 0

status = model.status

return new_count, status
return model.status == ExternalDataJob.Status.CANCELLED


@database_sync_to_async
Expand Down
49 changes: 34 additions & 15 deletions posthog/temporal/data_imports/pipelines/rest_source/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
Optional,
cast,
)
from collections.abc import Generator, Callable
from collections.abc import AsyncGenerator, Iterator
from collections.abc import Callable
import graphlib # type: ignore[import,unused-ignore]

import dlt
Expand All @@ -21,6 +22,8 @@
from dlt.sources.helpers.rest_client.client import RESTClient
from dlt.sources.helpers.rest_client.paginators import BasePaginator
from dlt.sources.helpers.rest_client.typing import HTTPMethodBasic

from posthog.temporal.data_imports.pipelines.helpers import is_job_cancelled
from .typing import (
ClientConfig,
ResolvedParam,
Expand All @@ -42,6 +45,8 @@

def rest_api_source(
config: RESTAPIConfig,
team_id: int,
job_id: str,
name: Optional[str] = None,
section: Optional[str] = None,
max_table_nesting: Optional[int] = None,
Expand Down Expand Up @@ -104,10 +109,10 @@ def rest_api_source(
spec,
)

return decorated(config)
return decorated(config, team_id, job_id)


def rest_api_resources(config: RESTAPIConfig) -> list[DltResource]:
def rest_api_resources(config: RESTAPIConfig, team_id: int, job_id: str) -> list[DltResource]:
"""Creates a list of resources from a REST API configuration.
Args:
Expand Down Expand Up @@ -187,6 +192,8 @@ def rest_api_resources(config: RESTAPIConfig) -> list[DltResource]:
dependency_graph,
endpoint_resource_map,
resolved_param_map,
team_id=team_id,
job_id=job_id,
)

return list(resources.values())
Expand All @@ -197,18 +204,20 @@ def create_resources(
dependency_graph: graphlib.TopologicalSorter,
endpoint_resource_map: dict[str, EndpointResource],
resolved_param_map: dict[str, Optional[ResolvedParam]],
team_id: int,
job_id: str,
) -> dict[str, DltResource]:
resources = {}

for resource_name in dependency_graph.static_order():
resource_name = cast(str, resource_name)
endpoint_resource = endpoint_resource_map[resource_name]
endpoint_config = cast(Endpoint, endpoint_resource["endpoint"])
endpoint_config = cast(Endpoint, endpoint_resource.get("endpoint"))
request_params = endpoint_config.get("params", {})
request_json = endpoint_config.get("json", None)
paginator = create_paginator(endpoint_config.get("paginator"))

resolved_param: ResolvedParam = resolved_param_map[resource_name]
resolved_param: ResolvedParam | None = resolved_param_map[resource_name]

include_from_parent: list[str] = endpoint_resource.get("include_from_parent", [])
if not resolved_param and include_from_parent:
Expand All @@ -222,7 +231,7 @@ def create_resources(
) = setup_incremental_object(request_params, endpoint_config.get("incremental"))

client = RESTClient(
base_url=client_config["base_url"],
base_url=client_config.get("base_url"),
headers=client_config.get("headers"),
auth=create_auth(client_config.get("auth")),
paginator=create_paginator(client_config.get("paginator")),
Expand All @@ -234,7 +243,7 @@ def create_resources(

if resolved_param is None:

def paginate_resource(
async def paginate_resource(
method: HTTPMethodBasic,
path: str,
params: dict[str, Any],
Expand All @@ -244,14 +253,19 @@ def paginate_resource(
hooks: Optional[dict[str, Any]],
client: RESTClient = client,
incremental_object: Optional[Incremental[Any]] = incremental_object,
incremental_param: IncrementalParam = incremental_param,
) -> Generator[Any, None, None]:
if incremental_object:
incremental_param: IncrementalParam | None = incremental_param,
) -> AsyncGenerator[Iterator[Any], Any]:
yield dlt.mark.materialize_table_schema() # type: ignore

if await is_job_cancelled(team_id=team_id, job_id=job_id):
return

if incremental_object and incremental_param:
params[incremental_param.start] = incremental_object.last_value
if incremental_param.end:
params[incremental_param.end] = incremental_object.end_value

yield from client.paginate(
yield client.paginate(
method=method,
path=path,
params=params,
Expand Down Expand Up @@ -279,7 +293,7 @@ def paginate_resource(

base_params = exclude_keys(request_params, {resolved_param.param_name})

def paginate_dependent_resource(
async def paginate_dependent_resource(
items: list[dict[str, Any]],
method: HTTPMethodBasic,
path: str,
Expand All @@ -291,9 +305,14 @@ def paginate_dependent_resource(
resolved_param: ResolvedParam = resolved_param,
include_from_parent: list[str] = include_from_parent,
incremental_object: Optional[Incremental[Any]] = incremental_object,
incremental_param: IncrementalParam = incremental_param,
) -> Generator[Any, None, None]:
if incremental_object:
incremental_param: IncrementalParam | None = incremental_param,
) -> AsyncGenerator[Any, Any]:
yield dlt.mark.materialize_table_schema() # type: ignore

if await is_job_cancelled(team_id=team_id, job_id=job_id):
return

if incremental_object and incremental_param:
params[incremental_param.start] = incremental_object.last_value
if incremental_param.end:
params[incremental_param.end] = incremental_object.end_value
Expand Down
Loading

0 comments on commit 6f745af

Please sign in to comment.