Skip to content

Commit

Permalink
fix(data-warehouse): Reduce the amount of stale running jobs (#25752)
Browse files Browse the repository at this point in the history
  • Loading branch information
Gilbert09 authored Oct 24, 2024
1 parent 0e6b73b commit 1fe8f1d
Show file tree
Hide file tree
Showing 6 changed files with 157 additions and 62 deletions.
90 changes: 50 additions & 40 deletions posthog/temporal/data_imports/external_data_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from posthog.warehouse.data_load.source_templates import create_warehouse_templates_for_source

from posthog.warehouse.external_data_source.jobs import (
aget_running_job_for_schema,
aupdate_external_job_status,
)
from posthog.warehouse.models import (
Expand All @@ -50,9 +51,8 @@

@dataclasses.dataclass
class UpdateExternalDataJobStatusInputs:
id: str
team_id: int
run_id: str
job_id: str | None
schema_id: str
status: str
internal_error: str | None
Expand All @@ -63,6 +63,16 @@ class UpdateExternalDataJobStatusInputs:
async def update_external_data_job_model(inputs: UpdateExternalDataJobStatusInputs) -> None:
logger = await bind_temporal_worker_logger(team_id=inputs.team_id)

if inputs.job_id is None:
job: ExternalDataJob | None = await aget_running_job_for_schema(inputs.schema_id)
if job is None:
logger.info("No job to update status on")
return

job_id = str(job.pk)
else:
job_id = inputs.job_id

if inputs.internal_error:
logger.exception(
f"External data job failed for external data schema {inputs.schema_id} with error: {inputs.internal_error}"
Expand All @@ -74,14 +84,14 @@ async def update_external_data_job_model(inputs: UpdateExternalDataJobStatusInpu
await aupdate_should_sync(schema_id=inputs.schema_id, team_id=inputs.team_id, should_sync=False)

await aupdate_external_job_status(
job_id=inputs.id,
job_id=job_id,
status=inputs.status,
latest_error=inputs.latest_error,
team_id=inputs.team_id,
)

logger.info(
f"Updated external data job with for external data source {inputs.run_id} to status {inputs.status}",
f"Updated external data job with for external data source {job_id} to status {inputs.status}",
)


Expand Down Expand Up @@ -149,43 +159,8 @@ async def run(self, inputs: ExternalDataWorkflowInputs):

assert inputs.external_data_schema_id is not None

# create external data job and trigger activity
create_external_data_job_inputs = CreateExternalDataJobModelActivityInputs(
team_id=inputs.team_id,
schema_id=inputs.external_data_schema_id,
source_id=inputs.external_data_source_id,
)

job_id, incremental = await workflow.execute_activity(
create_external_data_job_model_activity,
create_external_data_job_inputs,
start_to_close_timeout=dt.timedelta(minutes=1),
retry_policy=RetryPolicy(
initial_interval=dt.timedelta(seconds=10),
maximum_interval=dt.timedelta(seconds=60),
maximum_attempts=3,
non_retryable_error_types=["NotNullViolation", "IntegrityError"],
),
)

# Check billing limits
hit_billing_limit = await workflow.execute_activity(
check_billing_limits_activity,
CheckBillingLimitsActivityInputs(job_id=job_id, team_id=inputs.team_id),
start_to_close_timeout=dt.timedelta(minutes=1),
retry_policy=RetryPolicy(
initial_interval=dt.timedelta(seconds=10),
maximum_interval=dt.timedelta(seconds=60),
maximum_attempts=3,
),
)

if hit_billing_limit:
return

update_inputs = UpdateExternalDataJobStatusInputs(
id=job_id,
run_id=job_id,
job_id=None,
status=ExternalDataJob.Status.COMPLETED,
latest_error=None,
internal_error=None,
Expand All @@ -194,6 +169,41 @@ async def run(self, inputs: ExternalDataWorkflowInputs):
)

try:
# create external data job and trigger activity
create_external_data_job_inputs = CreateExternalDataJobModelActivityInputs(
team_id=inputs.team_id,
schema_id=inputs.external_data_schema_id,
source_id=inputs.external_data_source_id,
)

job_id, incremental = await workflow.execute_activity(
create_external_data_job_model_activity,
create_external_data_job_inputs,
start_to_close_timeout=dt.timedelta(minutes=1),
retry_policy=RetryPolicy(
maximum_attempts=1,
non_retryable_error_types=["NotNullViolation", "IntegrityError"],
),
)

update_inputs.job_id = job_id

# Check billing limits
hit_billing_limit = await workflow.execute_activity(
check_billing_limits_activity,
CheckBillingLimitsActivityInputs(job_id=job_id, team_id=inputs.team_id),
start_to_close_timeout=dt.timedelta(minutes=1),
retry_policy=RetryPolicy(
initial_interval=dt.timedelta(seconds=10),
maximum_interval=dt.timedelta(seconds=60),
maximum_attempts=3,
),
)

if hit_billing_limit:
update_inputs.status = ExternalDataJob.Status.CANCELLED
return

await workflow.execute_activity(
sync_new_schemas_activity,
SyncNewSchemasActivityInputs(source_id=str(inputs.external_data_source_id), team_id=inputs.team_id),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
from ee.billing.quota_limiting import QuotaLimitingCaches, QuotaResource, list_limited_team_attributes
from posthog.models.team.team import Team
from posthog.temporal.common.logger import bind_temporal_worker_logger
from posthog.warehouse.external_data_source.jobs import aupdate_external_job_status
from posthog.warehouse.models.external_data_job import ExternalDataJob


@dataclasses.dataclass
Expand All @@ -28,14 +26,6 @@ async def check_billing_limits_activity(inputs: CheckBillingLimitsActivityInputs

if team.api_token in limited_team_tokens_rows_synced:
logger.info("Billing limits hit. Canceling sync")

await aupdate_external_job_status(
job_id=inputs.job_id,
status=ExternalDataJob.Status.CANCELLED,
latest_error=None,
team_id=inputs.team_id,
)

return True

return False
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from posthog.warehouse.external_data_source.jobs import (
create_external_data_job,
)
from posthog.warehouse.models import aget_schema_by_id
from posthog.warehouse.models.external_data_schema import (
ExternalDataSchema,
)
Expand Down Expand Up @@ -44,11 +43,7 @@ async def create_external_data_job_model_activity(inputs: CreateExternalDataJobM
f"Created external data job for external data source {inputs.source_id}",
)

schema_model = await aget_schema_by_id(inputs.schema_id, inputs.team_id)
if schema_model is None:
raise ValueError(f"Schema with ID {inputs.schema_id} not found")

return str(job.id), schema_model.is_incremental
return str(job.id), schema.is_incremental
except Exception as e:
logger.exception(
f"External data job failed on create_external_data_job_model_activity for {str(inputs.source_id)} with error: {e}"
Expand Down
94 changes: 94 additions & 0 deletions posthog/temporal/tests/data_imports/test_end_to_end.py
Original file line number Diff line number Diff line change
Expand Up @@ -811,3 +811,97 @@ async def test_billing_limits(team, stripe_customer):

with pytest.raises(Exception):
await sync_to_async(execute_hogql_query)("SELECT * FROM stripe_customer", team)


@pytest.mark.django_db(transaction=True)
@pytest.mark.asyncio
async def test_create_external_job_failure(team, stripe_customer):
source = await sync_to_async(ExternalDataSource.objects.create)(
source_id=uuid.uuid4(),
connection_id=uuid.uuid4(),
destination_id=uuid.uuid4(),
team=team,
status="running",
source_type="Stripe",
job_inputs={"stripe_secret_key": "test-key", "stripe_account_id": "acct_id"},
)

schema = await sync_to_async(ExternalDataSchema.objects.create)(
name="Customer",
team_id=team.pk,
source_id=source.pk,
sync_type=ExternalDataSchema.SyncType.FULL_REFRESH,
sync_type_config={},
)

workflow_id = str(uuid.uuid4())
inputs = ExternalDataWorkflowInputs(
team_id=team.id,
external_data_source_id=source.pk,
external_data_schema_id=schema.id,
)

with mock.patch(
"posthog.temporal.data_imports.workflow_activities.check_billing_limits.list_limited_team_attributes",
) as mock_list_limited_team_attributes:
mock_list_limited_team_attributes.side_effect = Exception("Ruhoh!")

with pytest.raises(Exception):
await _execute_run(workflow_id, inputs, stripe_customer["data"])

job: ExternalDataJob = await sync_to_async(ExternalDataJob.objects.get)(team_id=team.id, schema_id=schema.pk)

assert job.status == ExternalDataJob.Status.FAILED

with pytest.raises(Exception):
await sync_to_async(execute_hogql_query)("SELECT * FROM stripe_customer", team)


@pytest.mark.django_db(transaction=True)
@pytest.mark.asyncio
async def test_create_external_job_failure_no_job_model(team, stripe_customer):
source = await sync_to_async(ExternalDataSource.objects.create)(
source_id=uuid.uuid4(),
connection_id=uuid.uuid4(),
destination_id=uuid.uuid4(),
team=team,
status="running",
source_type="Stripe",
job_inputs={"stripe_secret_key": "test-key", "stripe_account_id": "acct_id"},
)

schema = await sync_to_async(ExternalDataSchema.objects.create)(
name="Customer",
team_id=team.pk,
source_id=source.pk,
sync_type=ExternalDataSchema.SyncType.FULL_REFRESH,
sync_type_config={},
)

workflow_id = str(uuid.uuid4())
inputs = ExternalDataWorkflowInputs(
team_id=team.id,
external_data_source_id=source.pk,
external_data_schema_id=schema.id,
)

@sync_to_async
def get_jobs():
jobs = ExternalDataJob.objects.filter(team_id=team.id, schema_id=schema.pk)

return list(jobs)

with mock.patch(
"posthog.temporal.data_imports.workflow_activities.create_job_model.create_external_data_job",
) as mock_list_limited_team_attributes:
mock_list_limited_team_attributes.side_effect = Exception("Ruhoh!")

with pytest.raises(Exception):
await _execute_run(workflow_id, inputs, stripe_customer["data"])

jobs: list[ExternalDataJob] = await get_jobs()

assert len(jobs) == 0

with pytest.raises(Exception):
await sync_to_async(execute_hogql_query)("SELECT * FROM stripe_customer", team)
Original file line number Diff line number Diff line change
Expand Up @@ -247,8 +247,7 @@ async def test_update_external_job_activity(activity_environment, team, **kwargs
)

inputs = UpdateExternalDataJobStatusInputs(
id=str(new_job.id),
run_id=str(new_job.id),
job_id=str(new_job.id),
status=ExternalDataJob.Status.COMPLETED,
latest_error=None,
internal_error=None,
Expand Down Expand Up @@ -292,8 +291,7 @@ async def test_update_external_job_activity_with_retryable_error(activity_enviro
)

inputs = UpdateExternalDataJobStatusInputs(
id=str(new_job.id),
run_id=str(new_job.id),
job_id=str(new_job.id),
status=ExternalDataJob.Status.COMPLETED,
latest_error=None,
internal_error="Some other retryable error",
Expand Down Expand Up @@ -338,8 +336,7 @@ async def test_update_external_job_activity_with_non_retryable_error(activity_en
)

inputs = UpdateExternalDataJobStatusInputs(
id=str(new_job.id),
run_id=str(new_job.id),
job_id=str(new_job.id),
status=ExternalDataJob.Status.COMPLETED,
latest_error=None,
internal_error="NoSuchTableError: TableA",
Expand Down
9 changes: 9 additions & 0 deletions posthog/warehouse/external_data_source/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,15 @@ def create_external_data_job(
return job


@database_sync_to_async
def aget_running_job_for_schema(schema_id: str) -> ExternalDataJob | None:
return (
ExternalDataJob.objects.filter(schema_id=schema_id, status=ExternalDataJob.Status.RUNNING)
.order_by("-created_at")
.first()
)


@database_sync_to_async
def aupdate_external_job_status(
job_id: str, team_id: int, status: ExternalDataJob.Status, latest_error: str | None
Expand Down

0 comments on commit 1fe8f1d

Please sign in to comment.