diff --git a/posthog/temporal/data_imports/__init__.py b/posthog/temporal/data_imports/__init__.py index 3d27ad668f1e7..69590c6a5ac29 100644 --- a/posthog/temporal/data_imports/__init__.py +++ b/posthog/temporal/data_imports/__init__.py @@ -8,4 +8,4 @@ run_external_data_job, move_draft_to_production_activity, validate_schema_activity, -] \ No newline at end of file +] diff --git a/posthog/temporal/data_imports/external_data_job.py b/posthog/temporal/data_imports/external_data_job.py index 6124c42c0ae98..7234ed570699a 100644 --- a/posthog/temporal/data_imports/external_data_job.py +++ b/posthog/temporal/data_imports/external_data_job.py @@ -18,7 +18,8 @@ get_external_data_source, ) from posthog.warehouse.models.external_data_source import ExternalDataSource -# TODO: remove dependency + +# TODO: remove dependency from posthog.temporal.batch_exports.base import PostHogWorkflow from posthog.temporal.heartbeat import HeartbeatDetails from temporalio import activity, workflow, exceptions diff --git a/posthog/warehouse/data_load/stripe/__init__.py b/posthog/warehouse/data_load/stripe/__init__.py index 89102f1e49efa..1a1818ad5ee50 100644 --- a/posthog/warehouse/data_load/stripe/__init__.py +++ b/posthog/warehouse/data_load/stripe/__init__.py @@ -51,52 +51,3 @@ def stripe_resource( name=endpoint, write_disposition="replace", )(endpoint) - - -@dlt.source -def incremental_stripe_source( - endpoints: Tuple[str, ...] = INCREMENTAL_ENDPOINTS, - stripe_secret_key: str = dlt.secrets.value, - initial_start_date: Optional[DateTime] = None, - end_date: Optional[DateTime] = None, -) -> Iterable[DltResource]: - """ - As Stripe API does not include the "updated" key in its responses, - we are only able to perform incremental downloads from endpoints where all objects are uneditable. - This source yields the resources with incremental loading based on "append" mode. - You will load only the newest data without duplicating and without downloading a huge amount of data each time. - - Args: - endpoints (tuple): A tuple of endpoint names to retrieve data from. Defaults to Stripe API endpoints with uneditable data. - stripe_secret_key (str): The API access token for authentication. Defaults to the value in the `dlt.secrets` object. - initial_start_date (Optional[DateTime]): An optional parameter that specifies the initial value for dlt.sources.incremental. - If parameter is not None, then load only data that were created after initial_start_date on the first run. - Defaults to None. Format: datetime(YYYY, MM, DD). - end_date (Optional[DateTime]): An optional end date to limit the data retrieved. - Defaults to None. Format: datetime(YYYY, MM, DD). - Returns: - Iterable[DltResource]: Resources with only that data has not yet been loaded. - """ - stripe.api_key = stripe_secret_key - stripe.api_version = "2022-11-15" - start_date_unix = ( - transform_date(initial_start_date) if initial_start_date is not None else -1 - ) - - def incremental_resource( - endpoint: str, - created: Optional[Any] = dlt.sources.incremental( - "created", initial_value=start_date_unix - ), - ) -> Generator[Dict[Any, Any], Any, None]: - start_value = created.last_value - for item in pagination(endpoint, start_date=start_value, end_date=end_date): - yield item - - for endpoint in endpoints: - yield dlt.resource( - incremental_resource, - name=endpoint, - write_disposition="append", - primary_key="id", - )(endpoint) \ No newline at end of file diff --git a/posthog/warehouse/data_load/stripe/helper.py b/posthog/warehouse/data_load/stripe/helper.py index f651bd63e8bb9..790950bbd18b1 100644 --- a/posthog/warehouse/data_load/stripe/helper.py +++ b/posthog/warehouse/data_load/stripe/helper.py @@ -1,6 +1,6 @@ """Stripe analytics source helpers""" -from typing import Any, Dict, Generator, Optional, Union, Iterable +from typing import Any, Dict, Optional, Union, Iterable import stripe from dlt.common import pendulum @@ -8,9 +8,7 @@ from dlt.common.typing import TDataItem -def pagination( - endpoint: str, start_date: Optional[Any] = None, end_date: Optional[Any] = None -) -> Iterable[TDataItem]: +def pagination(endpoint: str, start_date: Optional[Any] = None, end_date: Optional[Any] = None) -> Iterable[TDataItem]: """ Retrieves data from an endpoint with pagination. @@ -62,7 +60,5 @@ def stripe_get_data( if resource == "Subscription": kwargs.update({"status": "all"}) - resource_dict = getattr(stripe, resource).list( - created={"gte": start_date, "lt": end_date}, limit=100, **kwargs - ) - return dict(resource_dict) \ No newline at end of file + resource_dict = getattr(stripe, resource).list(created={"gte": start_date, "lt": end_date}, limit=100, **kwargs) + return dict(resource_dict) diff --git a/posthog/warehouse/data_load/stripe/settings.py b/posthog/warehouse/data_load/stripe/settings.py index 52b3651f0454c..e9694cba52fc0 100644 --- a/posthog/warehouse/data_load/stripe/settings.py +++ b/posthog/warehouse/data_load/stripe/settings.py @@ -11,4 +11,4 @@ "Price", ) # possible incremental endpoints -INCREMENTAL_ENDPOINTS = ("Event", "Invoice", "BalanceTransaction") \ No newline at end of file +INCREMENTAL_ENDPOINTS = ("Event", "Invoice", "BalanceTransaction") diff --git a/posthog/warehouse/external_data_source/connection.py b/posthog/warehouse/external_data_source/connection.py index 8ac939da9be51..9a37222f9d8d4 100644 --- a/posthog/warehouse/external_data_source/connection.py +++ b/posthog/warehouse/external_data_source/connection.py @@ -7,6 +7,7 @@ logger = structlog.get_logger(__name__) + class ExternalDataConnection(BaseModel): connection_id: str source_id: str