diff --git a/mypy-baseline.txt b/mypy-baseline.txt index 00e42297ced4f..e4d8a875c56cd 100644 --- a/mypy-baseline.txt +++ b/mypy-baseline.txt @@ -702,6 +702,7 @@ posthog/api/plugin_log_entry.py:0: error: Module "django.utils.timezone" does no posthog/api/action.py:0: error: Argument 1 to has incompatible type "*tuple[str, ...]"; expected "type[BaseRenderer]" [arg-type] posthog/temporal/tests/batch_exports/test_redshift_batch_export_workflow.py:0: error: Incompatible types in assignment (expression has type "str | int", variable has type "int") [assignment] posthog/api/test/batch_exports/conftest.py:0: error: Argument "activities" to "ThreadedWorker" has incompatible type "list[function]"; expected "Sequence[Callable[..., Any]]" [arg-type] +posthog/temporal/tests/data_imports/test_end_to_end.py:0: error: Unused "type: ignore" comment [unused-ignore] posthog/api/test/test_team.py:0: error: "HttpResponse" has no attribute "json" [attr-defined] posthog/api/test/test_team.py:0: error: "HttpResponse" has no attribute "json" [attr-defined] posthog/api/test/test_capture.py:0: error: Statement is unreachable [unreachable] diff --git a/posthog/temporal/tests/data_imports/conftest.py b/posthog/temporal/tests/data_imports/conftest.py new file mode 100644 index 0000000000000..1f43057da6e8a --- /dev/null +++ b/posthog/temporal/tests/data_imports/conftest.py @@ -0,0 +1,517 @@ +import json +import pytest + + +@pytest.fixture +def stripe_balance_transaction(): + return json.loads( + """ + { + "object": "list", + "url": "/v1/balance_transactions", + "has_more": false, + "data": [ + { + "id": "txn_1MiN3gLkdIwHu7ixxapQrznl", + "object": "balance_transaction", + "amount": -400, + "available_on": 1678043844, + "created": 1678043844, + "currency": "usd", + "description": null, + "exchange_rate": null, + "fee": 0, + "fee_details": [], + "net": -400, + "reporting_category": "transfer", + "source": "tr_1MiN3gLkdIwHu7ixNCZvFdgA", + "status": "available", + "type": "transfer" + } + ] + } + """ + ) + + +@pytest.fixture +def stripe_charge(): + return json.loads( + """ + { + "object": "list", + "url": "/v1/charges", + "has_more": false, + "data": [ + { + "id": "ch_3MmlLrLkdIwHu7ix0snN0B15", + "object": "charge", + "amount": 1099, + "amount_captured": 1099, + "amount_refunded": 0, + "application": null, + "application_fee": null, + "application_fee_amount": null, + "balance_transaction": "txn_3MmlLrLkdIwHu7ix0uke3Ezy", + "billing_details": { + "address": { + "city": null, + "country": null, + "line1": null, + "line2": null, + "postal_code": null, + "state": null + }, + "email": null, + "name": null, + "phone": null + }, + "calculated_statement_descriptor": "Stripe", + "captured": true, + "created": 1679090539, + "currency": "usd", + "customer": null, + "description": null, + "disputed": false, + "failure_balance_transaction": null, + "failure_code": null, + "failure_message": null, + "fraud_details": {}, + "invoice": null, + "livemode": false, + "metadata": {}, + "on_behalf_of": null, + "outcome": { + "network_status": "approved_by_network", + "reason": null, + "risk_level": "normal", + "risk_score": 32, + "seller_message": "Payment complete.", + "type": "authorized" + }, + "paid": true, + "payment_intent": null, + "payment_method": "card_1MmlLrLkdIwHu7ixIJwEWSNR", + "payment_method_details": { + "card": { + "brand": "visa", + "checks": { + "address_line1_check": null, + "address_postal_code_check": null, + "cvc_check": null + }, + "country": "US", + "exp_month": 3, + "exp_year": 2024, + "fingerprint": "mToisGZ01V71BCos", + "funding": "credit", + "installments": null, + "last4": "4242", + "mandate": null, + "network": "visa", + "three_d_secure": null, + "wallet": null + }, + "type": "card" + }, + "receipt_email": null, + "receipt_number": null, + "receipt_url": "https://pay.stripe.com/receipts/payment/CAcaFwoVYWNjdF8xTTJKVGtMa2RJd0h1N2l4KOvG06AGMgZfBXyr1aw6LBa9vaaSRWU96d8qBwz9z2J_CObiV_H2-e8RezSK_sw0KISesp4czsOUlVKY", + "refunded": false, + "review": null, + "shipping": null, + "source_transfer": null, + "statement_descriptor": null, + "statement_descriptor_suffix": null, + "status": "succeeded", + "transfer_data": null, + "transfer_group": null + } + ] + } + """ + ) + + +@pytest.fixture +def stripe_customer(): + return json.loads( + """ + { + "object": "list", + "url": "/v1/customers", + "has_more": false, + "data": [ + { + "id": "cus_NffrFeUfNV2Hib", + "object": "customer", + "address": null, + "balance": 0, + "created": 1680893993, + "currency": null, + "default_source": null, + "delinquent": false, + "description": null, + "discount": null, + "email": "jennyrosen@example.com", + "invoice_prefix": "0759376C", + "invoice_settings": { + "custom_fields": null, + "default_payment_method": null, + "footer": null, + "rendering_options": null + }, + "livemode": false, + "metadata": {}, + "name": "Jenny Rosen", + "next_invoice_sequence": 1, + "phone": null, + "preferred_locales": [], + "shipping": null, + "tax_exempt": "none", + "test_clock": null + } + ] + } + """ + ) + + +@pytest.fixture +def stripe_invoice(): + return json.loads( + """ + { + "object": "list", + "url": "/v1/invoices", + "has_more": false, + "data": [ + { + "id": "in_1MtHbELkdIwHu7ixl4OzzPMv", + "object": "invoice", + "account_country": "US", + "account_name": "Stripe Docs", + "account_tax_ids": null, + "amount_due": 0, + "amount_paid": 0, + "amount_remaining": 0, + "amount_shipping": 0, + "application": null, + "application_fee_amount": null, + "attempt_count": 0, + "attempted": false, + "auto_advance": false, + "automatic_tax": { + "enabled": false, + "liability": null, + "status": null + }, + "billing_reason": "manual", + "charge": null, + "collection_method": "charge_automatically", + "created": 1680644467, + "currency": "usd", + "custom_fields": null, + "customer": "cus_NeZwdNtLEOXuvB", + "customer_address": null, + "customer_email": "jennyrosen@example.com", + "customer_name": "Jenny Rosen", + "customer_phone": null, + "customer_shipping": null, + "customer_tax_exempt": "none", + "customer_tax_ids": [], + "default_payment_method": null, + "default_source": null, + "default_tax_rates": [], + "description": null, + "discount": null, + "discounts": [], + "due_date": null, + "ending_balance": null, + "footer": null, + "from_invoice": null, + "hosted_invoice_url": null, + "invoice_pdf": null, + "issuer": { + "type": "self" + }, + "last_finalization_error": null, + "latest_revision": null, + "lines": { + "object": "list", + "data": [], + "has_more": false, + "total_count": 0, + "url": "/v1/invoices/in_1MtHbELkdIwHu7ixl4OzzPMv/lines" + }, + "livemode": false, + "metadata": {}, + "next_payment_attempt": null, + "number": null, + "on_behalf_of": null, + "paid": false, + "paid_out_of_band": false, + "payment_intent": null, + "payment_settings": { + "default_mandate": null, + "payment_method_options": null, + "payment_method_types": null + }, + "period_end": 1680644467, + "period_start": 1680644467, + "post_payment_credit_notes_amount": 0, + "pre_payment_credit_notes_amount": 0, + "quote": null, + "receipt_number": null, + "rendering_options": null, + "shipping_cost": null, + "shipping_details": null, + "starting_balance": 0, + "statement_descriptor": null, + "status": "draft", + "status_transitions": { + "finalized_at": null, + "marked_uncollectible_at": null, + "paid_at": null, + "voided_at": null + }, + "subscription": null, + "subtotal": 0, + "subtotal_excluding_tax": 0, + "tax": null, + "test_clock": null, + "total": 0, + "total_discount_amounts": [], + "total_excluding_tax": 0, + "total_tax_amounts": [], + "transfer_data": null, + "webhooks_delivered_at": 1680644467 + } + ] + } + """ + ) + + +@pytest.fixture +def stripe_price(): + return json.loads( + """ + { + "object": "list", + "url": "/v1/prices", + "has_more": false, + "data": [ + { + "id": "price_1MoBy5LkdIwHu7ixZhnattbh", + "object": "price", + "active": true, + "billing_scheme": "per_unit", + "created": 1679431181, + "currency": "usd", + "custom_unit_amount": null, + "livemode": false, + "lookup_key": null, + "metadata": {}, + "nickname": null, + "product": "prod_NZKdYqrwEYx6iK", + "recurring": { + "aggregate_usage": null, + "interval": "month", + "interval_count": 1, + "trial_period_days": null, + "usage_type": "licensed" + }, + "tax_behavior": "unspecified", + "tiers_mode": null, + "transform_quantity": null, + "type": "recurring", + "unit_amount": 1000, + "unit_amount_decimal": "1000" + } + ] + } + """ + ) + + +@pytest.fixture +def stripe_product(): + return json.loads( + """ + { + "object": "list", + "url": "/v1/products", + "has_more": false, + "data": [ + { + "id": "prod_NWjs8kKbJWmuuc", + "object": "product", + "active": true, + "created": 1678833149, + "default_price": null, + "description": null, + "images": [], + "features": [], + "livemode": false, + "metadata": {}, + "name": "Gold Plan", + "package_dimensions": null, + "shippable": null, + "statement_descriptor": null, + "tax_code": null, + "unit_label": null, + "updated": 1678833149, + "url": null + } + ] + } + """ + ) + + +@pytest.fixture +def stripe_subscription(): + return json.loads( + """ + { + "object": "list", + "url": "/v1/subscriptions", + "has_more": false, + "data": [ + { + "id": "sub_1MowQVLkdIwHu7ixeRlqHVzs", + "object": "subscription", + "application": null, + "application_fee_percent": null, + "automatic_tax": { + "enabled": false, + "liability": null + }, + "billing_cycle_anchor": 1679609767, + "billing_thresholds": null, + "cancel_at": null, + "cancel_at_period_end": false, + "canceled_at": null, + "cancellation_details": { + "comment": null, + "feedback": null, + "reason": null + }, + "collection_method": "charge_automatically", + "created": 1679609767, + "currency": "usd", + "current_period_end": 1682288167, + "current_period_start": 1679609767, + "customer": "cus_Na6dX7aXxi11N4", + "days_until_due": null, + "default_payment_method": null, + "default_source": null, + "default_tax_rates": [], + "description": null, + "discount": null, + "discounts": null, + "ended_at": null, + "invoice_settings": { + "issuer": { + "type": "self" + } + }, + "items": { + "object": "list", + "data": [ + { + "id": "si_Na6dzxczY5fwHx", + "object": "subscription_item", + "billing_thresholds": null, + "created": 1679609768, + "metadata": {}, + "plan": { + "id": "price_1MowQULkdIwHu7ixraBm864M", + "object": "plan", + "active": true, + "aggregate_usage": null, + "amount": 1000, + "amount_decimal": "1000", + "billing_scheme": "per_unit", + "created": 1679609766, + "currency": "usd", + "discounts": null, + "interval": "month", + "interval_count": 1, + "livemode": false, + "metadata": {}, + "nickname": null, + "product": "prod_Na6dGcTsmU0I4R", + "tiers_mode": null, + "transform_usage": null, + "trial_period_days": null, + "usage_type": "licensed" + }, + "price": { + "id": "price_1MowQULkdIwHu7ixraBm864M", + "object": "price", + "active": true, + "billing_scheme": "per_unit", + "created": 1679609766, + "currency": "usd", + "custom_unit_amount": null, + "livemode": false, + "lookup_key": null, + "metadata": {}, + "nickname": null, + "product": "prod_Na6dGcTsmU0I4R", + "recurring": { + "aggregate_usage": null, + "interval": "month", + "interval_count": 1, + "trial_period_days": null, + "usage_type": "licensed" + }, + "tax_behavior": "unspecified", + "tiers_mode": null, + "transform_quantity": null, + "type": "recurring", + "unit_amount": 1000, + "unit_amount_decimal": "1000" + }, + "quantity": 1, + "subscription": "sub_1MowQVLkdIwHu7ixeRlqHVzs", + "tax_rates": [] + } + ], + "has_more": false, + "total_count": 1, + "url": "/v1/subscription_items?subscription=sub_1MowQVLkdIwHu7ixeRlqHVzs" + }, + "latest_invoice": "in_1MowQWLkdIwHu7ixuzkSPfKd", + "livemode": false, + "metadata": {}, + "next_pending_invoice_item_invoice": null, + "on_behalf_of": null, + "pause_collection": null, + "payment_settings": { + "payment_method_options": null, + "payment_method_types": null, + "save_default_payment_method": "off" + }, + "pending_invoice_item_interval": null, + "pending_setup_intent": null, + "pending_update": null, + "schedule": null, + "start_date": 1679609767, + "status": "active", + "test_clock": null, + "transfer_data": null, + "trial_end": null, + "trial_settings": { + "end_behavior": { + "missing_payment_method": "create_invoice" + } + }, + "trial_start": null + } + ] + } + """ + ) diff --git a/posthog/temporal/tests/data_imports/test_end_to_end.py b/posthog/temporal/tests/data_imports/test_end_to_end.py new file mode 100644 index 0000000000000..2ac44d7443b9a --- /dev/null +++ b/posthog/temporal/tests/data_imports/test_end_to_end.py @@ -0,0 +1,196 @@ +from typing import Any, Optional +from unittest import mock +import uuid +from asgiref.sync import sync_to_async +from django.conf import settings +from django.test import override_settings +import pytest +from posthog.hogql.query import execute_hogql_query +from posthog.models.team.team import Team +from posthog.temporal.data_imports import ACTIVITIES +from posthog.temporal.data_imports.external_data_job import ExternalDataJobWorkflow +from posthog.temporal.utils import ExternalDataWorkflowInputs +from posthog.warehouse.models.external_table_definitions import external_tables +from posthog.warehouse.models import ( + ExternalDataJob, + ExternalDataSource, + ExternalDataSchema, +) +from temporalio.testing import WorkflowEnvironment +from temporalio.common import RetryPolicy +from temporalio.worker import UnsandboxedWorkflowRunner, Worker +from posthog.constants import DATA_WAREHOUSE_TASK_QUEUE +from posthog.warehouse.models.external_data_job import get_latest_run_if_exists +from dlt.sources.helpers.rest_client.client import RESTClient + + +BUCKET_NAME = "test-pipeline" + + +async def _run( + team: Team, schema_name: str, table_name: str, source_type: str, job_inputs: dict[str, str], mock_data_response: Any +): + source = await sync_to_async(ExternalDataSource.objects.create)( + source_id=uuid.uuid4(), + connection_id=uuid.uuid4(), + destination_id=uuid.uuid4(), + team=team, + status="running", + source_type=source_type, + job_inputs=job_inputs, + ) + + schema = await sync_to_async(ExternalDataSchema.objects.create)( + name=schema_name, + team_id=team.pk, + source_id=source.pk, + ) + + workflow_id = str(uuid.uuid4()) + inputs = ExternalDataWorkflowInputs( + team_id=team.id, + external_data_source_id=source.pk, + external_data_schema_id=schema.id, + ) + + def mock_paginate( + class_self, + path: str = "", + method: Any = "GET", + params: Optional[dict[str, Any]] = None, + json: Optional[dict[str, Any]] = None, + auth: Optional[Any] = None, + paginator: Optional[Any] = None, + data_selector: Optional[Any] = None, + hooks: Optional[Any] = None, + ): + return iter(mock_data_response) + + with ( + mock.patch.object(RESTClient, "paginate", mock_paginate), + override_settings( + BUCKET_URL=f"s3://{BUCKET_NAME}", + AIRBYTE_BUCKET_KEY=settings.OBJECT_STORAGE_ACCESS_KEY_ID, + AIRBYTE_BUCKET_SECRET=settings.OBJECT_STORAGE_SECRET_ACCESS_KEY, + AIRBYTE_BUCKET_DOMAIN="objectstorage:19000", + ), + ): + async with await WorkflowEnvironment.start_time_skipping() as activity_environment: + async with Worker( + activity_environment.client, + task_queue=DATA_WAREHOUSE_TASK_QUEUE, + workflows=[ExternalDataJobWorkflow], + activities=ACTIVITIES, # type: ignore + workflow_runner=UnsandboxedWorkflowRunner(), + ): + await activity_environment.client.execute_workflow( # type: ignore + ExternalDataJobWorkflow.run, + inputs, + id=workflow_id, + task_queue=DATA_WAREHOUSE_TASK_QUEUE, + retry_policy=RetryPolicy(maximum_attempts=1), + ) + + run = await get_latest_run_if_exists(team_id=team.pk, pipeline_id=source.pk) + + assert run is not None + assert run.status == ExternalDataJob.Status.COMPLETED + + res = await sync_to_async(execute_hogql_query)(f"SELECT * FROM {table_name}", team) + assert len(res.results) == 1 + + for name, field in external_tables.get(table_name, {}).items(): + if field.hidden: + continue + assert name in (res.columns or []) + + +@pytest.mark.django_db(transaction=True) +@pytest.mark.asyncio +async def test_stripe_balance_transactions(team, stripe_balance_transaction): + await _run( + team=team, + schema_name="BalanceTransaction", + table_name="stripe_balancetransaction", + source_type="Stripe", + job_inputs={"stripe_secret_key": "test-key", "stripe_account_id": "acct_id"}, + mock_data_response=stripe_balance_transaction["data"], + ) + + +@pytest.mark.django_db(transaction=True) +@pytest.mark.asyncio +async def test_stripe_charges(team, stripe_charge): + await _run( + team=team, + schema_name="Charge", + table_name="stripe_charge", + source_type="Stripe", + job_inputs={"stripe_secret_key": "test-key", "stripe_account_id": "acct_id"}, + mock_data_response=stripe_charge["data"], + ) + + +@pytest.mark.django_db(transaction=True) +@pytest.mark.asyncio +async def test_stripe_customer(team, stripe_customer): + await _run( + team=team, + schema_name="Customer", + table_name="stripe_customer", + source_type="Stripe", + job_inputs={"stripe_secret_key": "test-key", "stripe_account_id": "acct_id"}, + mock_data_response=stripe_customer["data"], + ) + + +@pytest.mark.django_db(transaction=True) +@pytest.mark.asyncio +async def test_stripe_invoice(team, stripe_invoice): + await _run( + team=team, + schema_name="Invoice", + table_name="stripe_invoice", + source_type="Stripe", + job_inputs={"stripe_secret_key": "test-key", "stripe_account_id": "acct_id"}, + mock_data_response=stripe_invoice["data"], + ) + + +@pytest.mark.django_db(transaction=True) +@pytest.mark.asyncio +async def test_stripe_price(team, stripe_price): + await _run( + team=team, + schema_name="Price", + table_name="stripe_price", + source_type="Stripe", + job_inputs={"stripe_secret_key": "test-key", "stripe_account_id": "acct_id"}, + mock_data_response=stripe_price["data"], + ) + + +@pytest.mark.django_db(transaction=True) +@pytest.mark.asyncio +async def test_stripe_product(team, stripe_product): + await _run( + team=team, + schema_name="Product", + table_name="stripe_product", + source_type="Stripe", + job_inputs={"stripe_secret_key": "test-key", "stripe_account_id": "acct_id"}, + mock_data_response=stripe_product["data"], + ) + + +@pytest.mark.django_db(transaction=True) +@pytest.mark.asyncio +async def test_stripe_subscription(team, stripe_subscription): + await _run( + team=team, + schema_name="Subscription", + table_name="stripe_subscription", + source_type="Stripe", + job_inputs={"stripe_secret_key": "test-key", "stripe_account_id": "acct_id"}, + mock_data_response=stripe_subscription["data"], + ) diff --git a/posthog/temporal/tests/external_data/test_external_data_job.py b/posthog/temporal/tests/external_data/test_external_data_job.py index de33372ed07cd..01224cb58c410 100644 --- a/posthog/temporal/tests/external_data/test_external_data_job.py +++ b/posthog/temporal/tests/external_data/test_external_data_job.py @@ -702,6 +702,7 @@ async def setup_job_1(): BUCKET_URL=f"s3://{BUCKET_NAME}", AIRBYTE_BUCKET_KEY=settings.OBJECT_STORAGE_ACCESS_KEY_ID, AIRBYTE_BUCKET_SECRET=settings.OBJECT_STORAGE_SECRET_ACCESS_KEY, + AIRBYTE_BUCKET_DOMAIN="objectstorage:19000", ): await asyncio.gather( activity_environment.run(import_data_activity, job_1_inputs), diff --git a/posthog/warehouse/data_load/validate_schema.py b/posthog/warehouse/data_load/validate_schema.py index 951e1e67aefe5..738ed72904eb6 100644 --- a/posthog/warehouse/data_load/validate_schema.py +++ b/posthog/warehouse/data_load/validate_schema.py @@ -206,6 +206,7 @@ async def validate_schema_and_update_table( f"Data Warehouse: Could not validate schema for external data job {job.pk}", exc_info=e, ) + raise e # TODO: figure out data deletes - currently borked right now # if ( diff --git a/posthog/warehouse/models/external_data_job.py b/posthog/warehouse/models/external_data_job.py index 23594f724ddc2..e9504686d560d 100644 --- a/posthog/warehouse/models/external_data_job.py +++ b/posthog/warehouse/models/external_data_job.py @@ -3,6 +3,7 @@ from django.conf import settings from posthog.models.team import Team from posthog.models.utils import CreatedMetaFields, UUIDModel, sane_repr +from posthog.settings import TEST from posthog.warehouse.s3 import get_s3_client from uuid import UUID from posthog.warehouse.util import database_sync_to_async @@ -35,6 +36,11 @@ def folder_path(self) -> str: return f"team_{self.team_id}_{self.pipeline.source_type}_{str(self.schema_id)}".lower().replace("-", "_") def url_pattern_by_schema(self, schema: str) -> str: + if TEST: + return ( + f"http://{settings.AIRBYTE_BUCKET_DOMAIN}/test-pipeline/{self.folder_path}/{schema.lower()}/*.parquet" + ) + return f"https://{settings.AIRBYTE_BUCKET_DOMAIN}/dlt/{self.folder_path}/{schema.lower()}/*.parquet" def delete_data_in_bucket(self) -> None: