diff --git a/tests/test_worker.py b/tests/test_worker.py index b198ac3..708e6a6 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -1,3 +1,4 @@ +import base64 import re import uuid from contextlib import contextmanager @@ -18,6 +19,7 @@ V1ListMeta, V1ObjectMeta, V1ObjectReference, + V1Secret, ) from kubernetes.config import ConfigException from prefect.client.schemas import FlowRun @@ -29,6 +31,7 @@ from prefect.server.schemas.core import Flow from prefect.server.schemas.responses import DeploymentResponse from prefect.settings import ( + PREFECT_API_KEY, PREFECT_EXPERIMENTAL_ENABLE_WORKERS, PREFECT_EXPERIMENTAL_WARN_WORKERS, get_current_settings, @@ -43,7 +46,7 @@ from pydantic import ValidationError from prefect_kubernetes import KubernetesWorker -from prefect_kubernetes.utilities import _slugify_label_value +from prefect_kubernetes.utilities import _slugify_label_value, _slugify_name from prefect_kubernetes.worker import KubernetesWorkerJobConfiguration FAKE_CLUSTER = "fake-cluster" @@ -151,6 +154,11 @@ def _mock_pods_stream_that_returns_running_pod(*args, **kwargs): ] +@pytest.fixture +def enable_store_api_key_in_secret(monkeypatch): + monkeypatch.setenv("PREFECT_KUBERNETES_WORKER_STORE_PREFECT_API_IN_SECRET", "true") + + from_template_and_values_cases = [ ( # default base template with no values @@ -1382,6 +1390,122 @@ async def test_uses_image_variable( ]["spec"]["containers"][0]["image"] assert image == "foo" + async def test_can_store_api_key_in_secret( + self, + flow_run, + mock_core_client, + mock_watch, + mock_batch_client, + enable_store_api_key_in_secret, + ): + mock_watch.stream = _mock_pods_stream_that_returns_running_pod + mock_core_client.read_namespaced_secret.side_effect = ApiException(status=404) + + configuration = await KubernetesWorkerJobConfiguration.from_template_and_values( + KubernetesWorker.get_default_base_job_template(), {"image": "foo"} + ) + with temporary_settings(updates={PREFECT_API_KEY: "fake"}): + async with KubernetesWorker(work_pool_name="test") as k8s_worker: + configuration.prepare_for_flow_run(flow_run=flow_run) + await k8s_worker.run(flow_run, configuration) + mock_batch_client.create_namespaced_job.assert_called_once() + env = mock_batch_client.create_namespaced_job.call_args[0][1]["spec"][ + "template" + ]["spec"]["containers"][0]["env"] + assert { + "name": "PREFECT_API_KEY", + "valueFrom": { + "secretKeyRef": { + "name": f"prefect-{_slugify_name(k8s_worker.name)}-api-key", + "key": "value", + } + }, + } in env + mock_core_client.create_namespaced_secret.assert_called_with( + namespace=configuration.namespace, + body=V1Secret( + api_version="v1", + kind="Secret", + metadata=V1ObjectMeta( + name=f"prefect-{_slugify_name(k8s_worker.name)}-api-key", + namespace=configuration.namespace, + ), + data={ + "value": base64.b64encode("fake".encode("utf-8")).decode( + "utf-8" + ) + }, + ), + ) + + # Make sure secret gets deleted + assert mock_core_client.delete_namespaced_secret( + name=f"prefect-{_slugify_name(k8s_worker.name)}-api-key", + namespace=configuration.namespace, + ) + + async def test_store_api_key_in_existing_secret( + self, + flow_run, + mock_core_client, + mock_watch, + mock_batch_client, + enable_store_api_key_in_secret, + ): + mock_watch.stream = _mock_pods_stream_that_returns_running_pod + + configuration = await KubernetesWorkerJobConfiguration.from_template_and_values( + KubernetesWorker.get_default_base_job_template(), {"image": "foo"} + ) + with temporary_settings(updates={PREFECT_API_KEY: "fake"}): + async with KubernetesWorker(work_pool_name="test") as k8s_worker: + mock_core_client.read_namespaced_secret.return_value = V1Secret( + api_version="v1", + kind="Secret", + metadata=V1ObjectMeta( + name=f"prefect-{_slugify_name(k8s_worker.name)}-api-key", + namespace=configuration.namespace, + ), + data={ + "value": base64.b64encode("fake".encode("utf-8")).decode( + "utf-8" + ) + }, + ) + + configuration.prepare_for_flow_run(flow_run=flow_run) + await k8s_worker.run(flow_run, configuration) + mock_batch_client.create_namespaced_job.assert_called_once() + env = mock_batch_client.create_namespaced_job.call_args[0][1]["spec"][ + "template" + ]["spec"]["containers"][0]["env"] + assert { + "name": "PREFECT_API_KEY", + "valueFrom": { + "secretKeyRef": { + "name": f"prefect-{_slugify_name(k8s_worker.name)}-api-key", + "key": "value", + } + }, + } in env + mock_core_client.replace_namespaced_secret.assert_called_with( + name=f"prefect-{_slugify_name(k8s_worker.name)}-api-key", + namespace=configuration.namespace, + body=V1Secret( + api_version="v1", + kind="Secret", + metadata=V1ObjectMeta( + name=f"prefect-{_slugify_name(k8s_worker.name)}-api-key", + namespace=configuration.namespace, + ), + data={ + "value": base64.b64encode("fake".encode("utf-8")).decode( + "utf-8" + ) + }, + ), + ) + async def test_create_job_failure( self, flow_run,