Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Adds tests
Browse files Browse the repository at this point in the history
  • Loading branch information
desertaxle committed Nov 7, 2023
1 parent 65b978b commit fbcf5c4
Showing 1 changed file with 125 additions and 1 deletion.
126 changes: 125 additions & 1 deletion tests/test_worker.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import base64
import re
import uuid
from contextlib import contextmanager
Expand All @@ -18,6 +19,7 @@
V1ListMeta,
V1ObjectMeta,
V1ObjectReference,
V1Secret,
)
from kubernetes.config import ConfigException
from prefect.client.schemas import FlowRun
Expand All @@ -29,6 +31,7 @@
from prefect.server.schemas.core import Flow
from prefect.server.schemas.responses import DeploymentResponse
from prefect.settings import (
PREFECT_API_KEY,
PREFECT_EXPERIMENTAL_ENABLE_WORKERS,
PREFECT_EXPERIMENTAL_WARN_WORKERS,
get_current_settings,
Expand All @@ -43,7 +46,7 @@
from pydantic import ValidationError

from prefect_kubernetes import KubernetesWorker
from prefect_kubernetes.utilities import _slugify_label_value
from prefect_kubernetes.utilities import _slugify_label_value, _slugify_name
from prefect_kubernetes.worker import KubernetesWorkerJobConfiguration

FAKE_CLUSTER = "fake-cluster"
Expand Down Expand Up @@ -151,6 +154,11 @@ def _mock_pods_stream_that_returns_running_pod(*args, **kwargs):
]


@pytest.fixture
def enable_store_api_key_in_secret(monkeypatch):
monkeypatch.setenv("PREFECT_KUBERNETES_WORKER_STORE_PREFECT_API_IN_SECRET", "true")


from_template_and_values_cases = [
(
# default base template with no values
Expand Down Expand Up @@ -1382,6 +1390,122 @@ async def test_uses_image_variable(
]["spec"]["containers"][0]["image"]
assert image == "foo"

async def test_can_store_api_key_in_secret(
self,
flow_run,
mock_core_client,
mock_watch,
mock_batch_client,
enable_store_api_key_in_secret,
):
mock_watch.stream = _mock_pods_stream_that_returns_running_pod
mock_core_client.read_namespaced_secret.side_effect = ApiException(status=404)

configuration = await KubernetesWorkerJobConfiguration.from_template_and_values(
KubernetesWorker.get_default_base_job_template(), {"image": "foo"}
)
with temporary_settings(updates={PREFECT_API_KEY: "fake"}):
async with KubernetesWorker(work_pool_name="test") as k8s_worker:
configuration.prepare_for_flow_run(flow_run=flow_run)
await k8s_worker.run(flow_run, configuration)
mock_batch_client.create_namespaced_job.assert_called_once()
env = mock_batch_client.create_namespaced_job.call_args[0][1]["spec"][
"template"
]["spec"]["containers"][0]["env"]
assert {
"name": "PREFECT_API_KEY",
"valueFrom": {
"secretKeyRef": {
"name": f"prefect-{_slugify_name(k8s_worker.name)}-api-key",
"key": "value",
}
},
} in env
mock_core_client.create_namespaced_secret.assert_called_with(
namespace=configuration.namespace,
body=V1Secret(
api_version="v1",
kind="Secret",
metadata=V1ObjectMeta(
name=f"prefect-{_slugify_name(k8s_worker.name)}-api-key",
namespace=configuration.namespace,
),
data={
"value": base64.b64encode("fake".encode("utf-8")).decode(
"utf-8"
)
},
),
)

# Make sure secret gets deleted
assert mock_core_client.delete_namespaced_secret(
name=f"prefect-{_slugify_name(k8s_worker.name)}-api-key",
namespace=configuration.namespace,
)

async def test_store_api_key_in_existing_secret(
self,
flow_run,
mock_core_client,
mock_watch,
mock_batch_client,
enable_store_api_key_in_secret,
):
mock_watch.stream = _mock_pods_stream_that_returns_running_pod

configuration = await KubernetesWorkerJobConfiguration.from_template_and_values(
KubernetesWorker.get_default_base_job_template(), {"image": "foo"}
)
with temporary_settings(updates={PREFECT_API_KEY: "fake"}):
async with KubernetesWorker(work_pool_name="test") as k8s_worker:
mock_core_client.read_namespaced_secret.return_value = V1Secret(
api_version="v1",
kind="Secret",
metadata=V1ObjectMeta(
name=f"prefect-{_slugify_name(k8s_worker.name)}-api-key",
namespace=configuration.namespace,
),
data={
"value": base64.b64encode("fake".encode("utf-8")).decode(
"utf-8"
)
},
)

configuration.prepare_for_flow_run(flow_run=flow_run)
await k8s_worker.run(flow_run, configuration)
mock_batch_client.create_namespaced_job.assert_called_once()
env = mock_batch_client.create_namespaced_job.call_args[0][1]["spec"][
"template"
]["spec"]["containers"][0]["env"]
assert {
"name": "PREFECT_API_KEY",
"valueFrom": {
"secretKeyRef": {
"name": f"prefect-{_slugify_name(k8s_worker.name)}-api-key",
"key": "value",
}
},
} in env
mock_core_client.replace_namespaced_secret.assert_called_with(
name=f"prefect-{_slugify_name(k8s_worker.name)}-api-key",
namespace=configuration.namespace,
body=V1Secret(
api_version="v1",
kind="Secret",
metadata=V1ObjectMeta(
name=f"prefect-{_slugify_name(k8s_worker.name)}-api-key",
namespace=configuration.namespace,
),
data={
"value": base64.b64encode("fake".encode("utf-8")).decode(
"utf-8"
)
},
),
)

async def test_create_job_failure(
self,
flow_run,
Expand Down

0 comments on commit fbcf5c4

Please sign in to comment.