Skip to content

Commit

Permalink
chore(data-warehouse): Added more reasons for jobs to not retry (#25785)
Browse files Browse the repository at this point in the history
  • Loading branch information
Gilbert09 authored Oct 25, 2024
1 parent d1f4eba commit 76efa0b
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 12 deletions.
1 change: 0 additions & 1 deletion mypy-baseline.txt
Original file line number Diff line number Diff line change
Expand Up @@ -808,7 +808,6 @@ posthog/temporal/tests/external_data/test_external_data_job.py:0: error: Invalid
posthog/temporal/tests/external_data/test_external_data_job.py:0: error: Invalid index type "str" for "dict[Type, Sequence[str]]"; expected type "Type" [index]
posthog/temporal/tests/external_data/test_external_data_job.py:0: error: Invalid index type "str" for "dict[Type, Sequence[str]]"; expected type "Type" [index]
posthog/temporal/tests/external_data/test_external_data_job.py:0: error: Invalid index type "str" for "dict[Type, Sequence[str]]"; expected type "Type" [index]
posthog/temporal/tests/external_data/test_external_data_job.py:0: error: Invalid index type "str" for "dict[Type, Sequence[str]]"; expected type "Type" [index]
posthog/api/test/batch_exports/conftest.py:0: error: Signature of "run" incompatible with supertype "Worker" [override]
posthog/api/test/batch_exports/conftest.py:0: note: Superclass:
posthog/api/test/batch_exports/conftest.py:0: note: def run(self) -> Coroutine[Any, Any, None]
Expand Down
49 changes: 40 additions & 9 deletions posthog/temporal/data_imports/external_data_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import datetime as dt
import json

import posthoganalytics
from temporalio import activity, exceptions, workflow
from temporalio.common import RetryPolicy

Expand All @@ -21,6 +22,7 @@
create_external_data_job_model_activity,
)
from posthog.temporal.data_imports.workflow_activities.import_data import ImportDataActivityInputs, import_data_activity
from posthog.utils import get_machine_id
from posthog.warehouse.data_load.service import (
a_delete_external_data_schedule,
a_external_data_workflow_exists,
Expand All @@ -37,23 +39,35 @@
ExternalDataJob,
get_active_schemas_for_source_id,
ExternalDataSource,
get_external_data_source,
)
from posthog.temporal.common.logger import bind_temporal_worker_logger
from posthog.warehouse.models.external_data_schema import aupdate_should_sync


Non_Retryable_Schema_Errors = [
"NoSuchTableError",
"401 Client Error: Unauthorized for url: https://api.stripe.com",
"403 Client Error: Forbidden for url: https://api.stripe.com",
]
Non_Retryable_Schema_Errors: dict[ExternalDataSource.Type, list[str]] = {
ExternalDataSource.Type.STRIPE: [
"401 Client Error: Unauthorized for url: https://api.stripe.com",
"403 Client Error: Forbidden for url: https://api.stripe.com",
],
ExternalDataSource.Type.POSTGRES: [
"NoSuchTableError",
"is not permitted to log in",
"Tenant or user not found connection to server",
"FATAL: Tenant or user not found",
"error received from server in SCRAM exchange: Wrong password",
"could not translate host name",
],
ExternalDataSource.Type.ZENDESK: ["404 Client Error: Not Found for url", "403 Client Error: Forbidden for url"],
}


@dataclasses.dataclass
class UpdateExternalDataJobStatusInputs:
team_id: int
job_id: str | None
schema_id: str
source_id: str
status: str
internal_error: str | None
latest_error: str | None
Expand All @@ -78,10 +92,26 @@ async def update_external_data_job_model(inputs: UpdateExternalDataJobStatusInpu
f"External data job failed for external data schema {inputs.schema_id} with error: {inputs.internal_error}"
)

has_non_retryable_error = any(error in inputs.internal_error for error in Non_Retryable_Schema_Errors)
if has_non_retryable_error:
logger.info("Schema has a non-retryable error - turning off syncing")
await aupdate_should_sync(schema_id=inputs.schema_id, team_id=inputs.team_id, should_sync=False)
source: ExternalDataSource = await get_external_data_source(inputs.source_id)
non_retryable_errors = Non_Retryable_Schema_Errors.get(ExternalDataSource.Type(source.source_type))

if non_retryable_errors is not None:
has_non_retryable_error = any(error in inputs.internal_error for error in non_retryable_errors)
if has_non_retryable_error:
logger.info("Schema has a non-retryable error - turning off syncing")
posthoganalytics.capture(
get_machine_id(),
"schema non-retryable error",
{
"schemaId": inputs.schema_id,
"sourceId": inputs.source_id,
"sourceType": source.source_type,
"jobId": inputs.job_id,
"teamId": inputs.team_id,
"error": inputs.internal_error,
},
)
await aupdate_should_sync(schema_id=inputs.schema_id, team_id=inputs.team_id, should_sync=False)

await aupdate_external_job_status(
job_id=job_id,
Expand Down Expand Up @@ -166,6 +196,7 @@ async def run(self, inputs: ExternalDataWorkflowInputs):
internal_error=None,
team_id=inputs.team_id,
schema_id=str(inputs.external_data_schema_id),
source_id=str(inputs.external_data_source_id),
)

try:
Expand Down
54 changes: 54 additions & 0 deletions posthog/temporal/tests/data_imports/test_end_to_end.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from asgiref.sync import sync_to_async
from django.conf import settings
from django.test import override_settings
import posthoganalytics
import pytest
import pytest_asyncio
import psycopg
Expand Down Expand Up @@ -905,3 +906,56 @@ def get_jobs():

with pytest.raises(Exception):
await sync_to_async(execute_hogql_query)("SELECT * FROM stripe_customer", team)


@pytest.mark.django_db(transaction=True)
@pytest.mark.asyncio
async def test_non_retryable_error(team, stripe_customer):
source = await sync_to_async(ExternalDataSource.objects.create)(
source_id=uuid.uuid4(),
connection_id=uuid.uuid4(),
destination_id=uuid.uuid4(),
team=team,
status="running",
source_type="Stripe",
job_inputs={"stripe_secret_key": "test-key", "stripe_account_id": "acct_id"},
)

schema = await sync_to_async(ExternalDataSchema.objects.create)(
name="Customer",
team_id=team.pk,
source_id=source.pk,
sync_type=ExternalDataSchema.SyncType.FULL_REFRESH,
sync_type_config={},
)

workflow_id = str(uuid.uuid4())
inputs = ExternalDataWorkflowInputs(
team_id=team.id,
external_data_source_id=source.pk,
external_data_schema_id=schema.id,
)

with (
mock.patch(
"posthog.temporal.data_imports.workflow_activities.check_billing_limits.list_limited_team_attributes",
) as mock_list_limited_team_attributes,
mock.patch.object(posthoganalytics, "capture") as capture_mock,
):
mock_list_limited_team_attributes.side_effect = Exception(
"401 Client Error: Unauthorized for url: https://api.stripe.com"
)

with pytest.raises(Exception):
await _execute_run(workflow_id, inputs, stripe_customer["data"])

capture_mock.assert_called_once()

job: ExternalDataJob = await sync_to_async(ExternalDataJob.objects.get)(team_id=team.id, schema_id=schema.pk)
await sync_to_async(schema.refresh_from_db)()

assert job.status == ExternalDataJob.Status.FAILED
assert schema.should_sync is False

with pytest.raises(Exception):
await sync_to_async(execute_hogql_query)("SELECT * FROM stripe_customer", team)
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ async def test_update_external_job_activity(activity_environment, team, **kwargs
latest_error=None,
internal_error=None,
schema_id=str(schema.pk),
source_id=str(new_source.pk),
team_id=team.id,
)

Expand Down Expand Up @@ -296,6 +297,7 @@ async def test_update_external_job_activity_with_retryable_error(activity_enviro
latest_error=None,
internal_error="Some other retryable error",
schema_id=str(schema.pk),
source_id=str(new_source.pk),
team_id=team.id,
)

Expand All @@ -317,11 +319,11 @@ async def test_update_external_job_activity_with_non_retryable_error(activity_en
destination_id=uuid.uuid4(),
team=team,
status="running",
source_type="Stripe",
source_type="Postgres",
)

schema = await sync_to_async(ExternalDataSchema.objects.create)(
name=PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING[new_source.source_type][0],
name="test_123",
team_id=team.id,
source_id=new_source.pk,
should_sync=True,
Expand All @@ -341,6 +343,7 @@ async def test_update_external_job_activity_with_non_retryable_error(activity_en
latest_error=None,
internal_error="NoSuchTableError: TableA",
schema_id=str(schema.pk),
source_id=str(new_source.pk),
team_id=team.id,
)
with mock.patch("posthog.warehouse.models.external_data_schema.external_data_workflow_exists", return_value=False):
Expand Down

0 comments on commit 76efa0b

Please sign in to comment.