Skip to content

Commit

Permalink
feat: add timed_out status to managed proxy workflow (#23606)
Browse files Browse the repository at this point in the history
  • Loading branch information
frankh authored Jul 11, 2024
1 parent d45f28a commit 5ceda44
Show file tree
Hide file tree
Showing 7 changed files with 139 additions and 33 deletions.
12 changes: 11 additions & 1 deletion frontend/src/scenes/settings/project/ManagedReverseProxy.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ import { proxyLogic, ProxyRecord } from './proxyLogic'

const MAX_PROXY_RECORDS = 3

const statusText = {
valid: 'live',
timed_out: 'timed out',
}

export function ManagedReverseProxy(): JSX.Element {
const { formState, proxyRecords, proxyRecordsLoading } = useValues(proxyLogic)
const { showForm, deleteRecord } = useActions(proxyLogic)
Expand Down Expand Up @@ -58,12 +63,17 @@ export function ManagedReverseProxy(): JSX.Element {
)}
>
{status === 'issuing' && <Spinner />}
<span className="capitalize">{status === 'valid' ? 'live' : status}</span>
<span className="capitalize">{statusText[status] || status}</span>
{status === 'waiting' && (
<Tooltip title="Waiting for DNS records to be created">
<IconInfo className="cursor-pointer" />
</Tooltip>
)}
{status === 'timed_out' && (
<Tooltip title="Timed out waiting for DNS records to be created. Please delete the record and try again">
<IconInfo className="cursor-pointer" />
</Tooltip>
)}
</div>
)
},
Expand Down
2 changes: 1 addition & 1 deletion latest_migrations.manifest
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ contenttypes: 0002_remove_content_type_name
ee: 0016_rolemembership_organization_member
otp_static: 0002_throttling
otp_totp: 0002_auto_20190420_0723
posthog: 0435_alter_action_slack_message_format
posthog: 0436_alter_proxyrecord_status
sessions: 0001_initial
social_django: 0010_uid_db_index
two_factor: 0007_auto_20201201_1019
6 changes: 5 additions & 1 deletion posthog/api/proxy_record.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,11 @@ def create(self, request, *args, **kwargs):
def destroy(self, request, *args, pk=None, **kwargs):
record = self.organization.proxy_records.get(id=pk)

if record and record.status in (ProxyRecord.Status.WAITING, ProxyRecord.Status.ERRORING):
if record and record.status in (
ProxyRecord.Status.WAITING,
ProxyRecord.Status.ERRORING,
ProxyRecord.Status.TIMED_OUT,
):
record.delete()
elif record:
temporal = sync_connect()
Expand Down
27 changes: 27 additions & 0 deletions posthog/migrations/0436_alter_proxyrecord_status.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Generated by Django 4.2.11 on 2024-07-11 13:49

from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
("posthog", "0435_alter_action_slack_message_format"),
]

operations = [
migrations.AlterField(
model_name="proxyrecord",
name="status",
field=models.CharField(
choices=[
("waiting", "Waiting"),
("issuing", "Issuing"),
("valid", "Valid"),
("erroring", "Erroring"),
("deleting", "Deleting"),
("timed_out", "Timed Out"),
],
default="waiting",
),
),
]
1 change: 1 addition & 0 deletions posthog/models/proxy_record.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ class Status(models.TextChoices):
VALID = "valid"
ERRORING = "erroring"
DELETING = "deleting"
TIMED_OUT = "timed_out"

status: models.CharField = models.CharField(
choices=Status.choices,
Expand Down
9 changes: 8 additions & 1 deletion posthog/temporal/proxy_service/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ class NonRetriableException(Exception):
pass


class RecordDeletedException(NonRetriableException):
pass


@dataclass
class UpdateProxyRecordInputs:
organization_id: uuid.UUID
Expand All @@ -45,7 +49,10 @@ async def update_proxy_record(inputs: UpdateProxyRecordInputs):
@sync_to_async
def update_record(proxy_record_id):
connection.connect()
pr = ProxyRecord.objects.get(id=proxy_record_id)
prs = ProxyRecord.objects.filter(id=proxy_record_id)
if len(prs) == 0:
raise RecordDeletedException("proxy record was deleted before workflow completed")
pr = prs[0]
pr.status = inputs.status
# clear message after every transition
pr.message = ""
Expand Down
115 changes: 86 additions & 29 deletions posthog/temporal/proxy_service/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

from temporalio import activity, workflow
import temporalio.common
from temporalio.exceptions import ApplicationError
from temporalio.exceptions import ActivityError, ApplicationError, RetryState

from posthog.models import ProxyRecord
from posthog.temporal.batch_exports.base import PostHogWorkflow
Expand All @@ -20,6 +20,7 @@
from posthog.temporal.proxy_service.common import (
get_grpc_client,
NonRetriableException,
RecordDeletedException,
update_proxy_record,
UpdateProxyRecordInputs,
)
Expand Down Expand Up @@ -77,7 +78,7 @@ def update_record_message(*, proxy_record_id, message):
pr.save()

if not await record_exists(inputs.proxy_record_id):
raise NonRetriableException("proxy record was deleted while waiting for DNS records")
raise RecordDeletedException("proxy record was deleted while waiting for DNS records")

try:
cnames = dns.resolver.query(inputs.domain, "CNAME")
Expand Down Expand Up @@ -107,7 +108,7 @@ def update_record_message(*, proxy_record_id, message):
message="The DNS record appears to have Cloudflare proxying enabled - please disable this. For more information see [the docs](https://posthog.com/docs/advanced/proxy/managed-reverse-proxy)",
)
raise
except (dns.resolver.NXDOMAIN, ApplicationError):
except (dns.resolver.NXDOMAIN, dns.resolver.Timeout, ApplicationError):
# retriable
raise
except Exception as e:
Expand All @@ -133,7 +134,7 @@ def record_exists(proxy_record_id) -> bool:
return len(pr) > 0

if not await record_exists(inputs.proxy_record_id):
raise NonRetriableException("proxy record was deleted while waiting for certificate to be provisioned")
raise RecordDeletedException("proxy record was deleted while waiting for certificate to be provisioned")

client = await get_grpc_client()

Expand Down Expand Up @@ -200,29 +201,57 @@ def parse_inputs(inputs: list[str]) -> CreateManagedProxyInputs:
@temporalio.workflow.run
async def run(self, inputs: CreateManagedProxyInputs) -> None:
"""Workflow implementation to create a Managed reverse Proxy."""

logger = await bind_temporal_org_worker_logger(organization_id=inputs.organization_id)
try:
# Wait for DNS record to be created.
# This will fail and retry infinitely until the expected resolution is found.
# Timeout after 7 days - users will need to delete and recreate after this time.
await temporalio.workflow.execute_activity(
wait_for_dns_records,
WaitForDNSRecordsInputs(
organization_id=inputs.organization_id,
proxy_record_id=inputs.proxy_record_id,
domain=inputs.domain,
target_cname=inputs.target_cname,
),
schedule_to_close_timeout=dt.timedelta(days=7),
start_to_close_timeout=dt.timedelta(seconds=2),
retry_policy=temporalio.common.RetryPolicy(
backoff_coefficient=1.1,
initial_interval=dt.timedelta(seconds=3),
maximum_interval=dt.timedelta(seconds=3600),
maximum_attempts=0,
non_retryable_error_types=["NonRetriableException"],
),
)
try:
# Wait for DNS record to be created.
# This will fail and retry infinitely until the expected resolution is found.
# Timeout after 7 days - users will need to delete and recreate after this time.
await temporalio.workflow.execute_activity(
wait_for_dns_records,
WaitForDNSRecordsInputs(
organization_id=inputs.organization_id,
proxy_record_id=inputs.proxy_record_id,
domain=inputs.domain,
target_cname=inputs.target_cname,
),
schedule_to_close_timeout=dt.timedelta(days=7),
start_to_close_timeout=dt.timedelta(seconds=10),
retry_policy=temporalio.common.RetryPolicy(
backoff_coefficient=1.1,
initial_interval=dt.timedelta(seconds=3),
maximum_interval=dt.timedelta(seconds=300),
maximum_attempts=0,
non_retryable_error_types=["NonRetriableException", "RecordDeletedException"],
),
)
except ActivityError as e:
if e.retry_state != RetryState.TIMEOUT:
raise

# If we time out waiting for DNS records set to TIMED_OUT status
# This is not really an "error", as it's on the customer to set the DNS
# records and we have no control over it.
logger.info(
"Timed out waiting for DNS records for domain %s",
inputs.domain,
)

# Handle schedule-to-close timeout specifically
await temporalio.workflow.execute_activity(
update_proxy_record,
UpdateProxyRecordInputs(
organization_id=inputs.organization_id,
proxy_record_id=inputs.proxy_record_id,
status=ProxyRecord.Status.TIMED_OUT.value,
),
start_to_close_timeout=dt.timedelta(seconds=60),
retry_policy=temporalio.common.RetryPolicy(
maximum_attempts=10,
non_retryable_error_types=["NonRetriableException", "RecordDeletedException"],
),
)
return

# We've found the correct DNS record - update record to the ISSUING state
await temporalio.workflow.execute_activity(
Expand All @@ -235,6 +264,7 @@ async def run(self, inputs: CreateManagedProxyInputs) -> None:
start_to_close_timeout=dt.timedelta(seconds=10),
retry_policy=temporalio.common.RetryPolicy(
maximum_attempts=2,
non_retryable_error_types=["NonRetriableException", "RecordDeletedException"],
),
)

Expand All @@ -247,7 +277,7 @@ async def run(self, inputs: CreateManagedProxyInputs) -> None:
retry_policy=temporalio.common.RetryPolicy(
initial_interval=dt.timedelta(seconds=10),
maximum_attempts=5,
non_retryable_error_types=["NonRetriableException"],
non_retryable_error_types=["NonRetriableException", "RecordDeletedException"],
),
)

Expand All @@ -266,7 +296,7 @@ async def run(self, inputs: CreateManagedProxyInputs) -> None:
initial_interval=dt.timedelta(seconds=1),
maximum_interval=dt.timedelta(seconds=10),
maximum_attempts=0,
non_retryable_error_types=["NonRetriableException"],
non_retryable_error_types=["NonRetriableException", "RecordDeletedException"],
),
)

Expand All @@ -281,10 +311,36 @@ async def run(self, inputs: CreateManagedProxyInputs) -> None:
start_to_close_timeout=dt.timedelta(seconds=10),
retry_policy=temporalio.common.RetryPolicy(
maximum_attempts=2,
non_retryable_error_types=["NonRetriableException", "RecordDeletedException"],
),
)

except Exception:
except RecordDeletedException:
logger.info(
"Record was deleted before completing provisioning for id %s (%s)",
inputs.proxy_record_id,
inputs.domain,
)

# if the record has been deleted don't error the workflow, just ignore
return

except Exception as e:
logger.info(
"Exception caught during workflow run: %s (%s)",
e,
type(e),
)

if hasattr(e, "cause") and e.cause.type == "RecordDeletedException":
logger.info(
"Record was deleted before completing provisioning for id %s (%s)",
inputs.proxy_record_id,
inputs.domain,
)

# if the record has been deleted don't error the workflow, just ignore
return
# Something went wrong - set the record to error state
await temporalio.workflow.execute_activity(
update_proxy_record,
Expand All @@ -296,6 +352,7 @@ async def run(self, inputs: CreateManagedProxyInputs) -> None:
start_to_close_timeout=dt.timedelta(seconds=60),
retry_policy=temporalio.common.RetryPolicy(
maximum_attempts=10,
non_retryable_error_types=["NonRetriableException", "RecordDeletedException"],
),
)
raise

0 comments on commit 5ceda44

Please sign in to comment.