Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Add tenacity retries to run method (#121)
Browse files Browse the repository at this point in the history
* add tenacity and retries

* add tenacity and retries

* add tenacity and retries

* move retries to _creates_job

---------

Co-authored-by: Alexander Streed <[email protected]>
  • Loading branch information
gabcoyne and desertaxle authored Mar 11, 2024
1 parent c7248ba commit 6e2024b
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 1 deletion.
16 changes: 16 additions & 0 deletions prefect_kubernetes/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@
else:
from pydantic import Field, validator

from tenacity import retry, stop_after_attempt, wait_fixed, wait_random
from typing_extensions import Literal

from prefect_kubernetes.events import KubernetesEventsReplicator
Expand All @@ -166,6 +167,12 @@
else:
kubernetes = lazy_import("kubernetes")

MAX_ATTEMPTS = 3
RETRY_MIN_DELAY_SECONDS = 1
RETRY_MIN_DELAY_JITTER_SECONDS = 0
RETRY_MAX_DELAY_JITTER_SECONDS = 3


_LOCK = Lock()


Expand Down Expand Up @@ -801,6 +808,15 @@ def _replace_api_key_with_secret(
"env"
] = manifest_env

@retry(
stop=stop_after_attempt(MAX_ATTEMPTS),
wait=wait_fixed(RETRY_MIN_DELAY_SECONDS)
+ wait_random(
RETRY_MIN_DELAY_JITTER_SECONDS,
RETRY_MAX_DELAY_JITTER_SECONDS,
),
reraise=True,
)
def _create_job(
self, configuration: KubernetesWorkerJobConfiguration, client: "ApiClient"
) -> "V1Job":
Expand Down
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
prefect>=2.13.5
kubernetes >= 24.2.0
kubernetes >= 24.2.0
tenacity >= 8.2.3
43 changes: 43 additions & 0 deletions tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1557,6 +1557,49 @@ async def test_create_job_failure(
):
await k8s_worker.run(flow_run, configuration)

async def test_create_job_retries(
self,
flow_run,
mock_core_client,
mock_watch,
mock_batch_client,
):
MAX_ATTEMPTS = 3
response = MagicMock()
response.data = {
"kind": "Status",
"apiVersion": "v1",
"metadata": {},
"status": "Failure",
"message": 'jobs.batch is forbidden: User "system:serviceaccount:helm-test:prefect-worker-dev" cannot create resource "jobs" in API group "batch" in the namespace "prefect"',
"reason": "Forbidden",
"details": {"group": "batch", "kind": "jobs"},
"code": 403,
}
response.status = 403
response.reason = "Forbidden"

mock_batch_client.create_namespaced_job.side_effect = ApiException(
http_resp=response
)

configuration = await KubernetesWorkerJobConfiguration.from_template_and_values(
KubernetesWorker.get_default_base_job_template(), {"image": "foo"}
)
async with KubernetesWorker(work_pool_name="test") as k8s_worker:
with pytest.raises(
InfrastructureError,
match=re.escape(
"Unable to create Kubernetes job: Forbidden: jobs.batch is forbidden: User "
'"system:serviceaccount:helm-test:prefect-worker-dev" cannot '
'create resource "jobs" in API group "batch" in the namespace '
'"prefect"'
),
):
await k8s_worker.run(flow_run, configuration)

assert mock_batch_client.create_namespaced_job.call_count == MAX_ATTEMPTS

async def test_create_job_failure_no_reason(
self,
flow_run,
Expand Down

0 comments on commit 6e2024b

Please sign in to comment.