From 067a4029fbe75cf58463877347c01d84c56e5ade Mon Sep 17 00:00:00 2001 From: Alexander Streed Date: Mon, 6 Nov 2023 10:29:21 -0600 Subject: [PATCH 1/4] Adds capability to worker to create secrets for API keys --- prefect_kubernetes/worker.py | 98 ++++++++++++++++++++++++++++++++++++ 1 file changed, 98 insertions(+) diff --git a/prefect_kubernetes/worker.py b/prefect_kubernetes/worker.py index 2481034..e38c22b 100644 --- a/prefect_kubernetes/worker.py +++ b/prefect_kubernetes/worker.py @@ -84,6 +84,8 @@ For more information about work pools and workers, checkout out the [Prefect docs](https://docs.prefect.io/concepts/work-pools/). """ +import asyncio +import base64 import enum import logging import math @@ -517,6 +519,10 @@ class KubernetesWorker(BaseWorker): _documentation_url = "https://prefecthq.github.io/prefect-kubernetes/worker/" _logo_url = "https://cdn.sanity.io/images/3ugk85nk/production/2d0b896006ad463b49c28aaac14f31e00e32cfab-250x250.png" # noqa + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._created_secrets = {} + async def run( self, flow_run: "FlowRun", @@ -584,6 +590,32 @@ async def kill_infrastructure( self._stop_job, infrastructure_pid, configuration, grace_seconds ) + async def teardown(self, *exc_info): + await super().teardown(*exc_info) + + await self._clean_up_created_secrets() + + async def _clean_up_created_secrets(self): + """Deletes any secrets created during the worker's operation.""" + coros = [] + for key, configuration in self._created_secrets.items(): + with self._get_configured_kubernetes_client(configuration) as client: + with self._get_core_client(client) as core_client: + coros.append( + run_sync_in_worker_thread( + core_client.delete_namespaced_secret, + name=key[0], + namespace=key[1], + ) + ) + + results = await asyncio.gather(*coros, return_exceptions=True) + for result in results: + if isinstance(result, Exception): + self._logger.warning( + "Failed to delete created secret with exception: %s", result + ) + def _stop_job( self, infrastructure_pid: str, @@ -662,6 +694,43 @@ def _create_job( """ Creates a Kubernetes job from a job manifest. """ + manifest_env = configuration.job_manifest["spec"]["template"]["spec"][ + "containers" + ][0].get("env") + manifest_api_key_env = next( + ( + env_entry + for env_entry in manifest_env + if env_entry.get("name") == "PREFECT_API_KEY" + ), + {}, + ) + api_key = manifest_api_key_env.get("value") + if api_key: + secret_name = f"{_slugify_name(self.name)}-api-key" + secret = self._upsert_secret( + name=secret_name, + value=api_key, + namespace=configuration.namespace, + client=client, + ) + # Store configuration so that we can delete the secret when the worker shuts + # down + self._created_secrets[ + (secret.metadata.name, secret.metadata.namespace) + ] = configuration + new_api_env_entry = { + "name": "PREFECT_API_KEY", + "valueFrom": {"secretKeyRef": {"name": secret_name, "key": "value"}}, + } + manifest_env = [ + entry if entry.get("name") != "PREFECT_API_KEY" else new_api_env_entry + for entry in manifest_env + ] + configuration.job_manifest["spec"]["template"]["spec"]["containers"][0][ + "env" + ] = manifest_env + try: with self._get_batch_client(client) as batch_client: job = batch_client.create_namespaced_job( @@ -681,6 +750,35 @@ def _create_job( return job + def _upsert_secret( + self, name: str, value: str, namespace: str, client: "ApiClient" + ): + encoded_value = base64.b64encode(value.encode("utf-8")).decode("utf-8") + with self._get_core_client(client) as core_client: + try: + # Get the current version of the Secret and update it with the + # new value + current_secret = core_client.read_namespaced_secret( + name=name, namespace=namespace + ) + current_secret.data = {"value": encoded_value} + secret = core_client.replace_namespaced_secret( + name=name, namespace=namespace, body=current_secret + ) + except client.rest.ApiException: + # Create the secret if it doesn't already exist + metadata = client.V1ObjectMeta(name=name, namespace=namespace) + secret = client.V1Secret( + api_version="v1", + kind="Secret", + metadata=metadata, + data={"value": encoded_value}, + ) + secret = core_client.create_namespaced_secret( + namespace=namespace, body=secret + ) + return secret + @contextmanager def _get_batch_client( self, client: "ApiClient" From 0dd47dd428bd4c62e3d3ba9311d38dd57f6be1b5 Mon Sep 17 00:00:00 2001 From: Alexander Streed Date: Mon, 6 Nov 2023 14:45:45 -0600 Subject: [PATCH 2/4] Fixes error when creating a secret --- prefect_kubernetes/worker.py | 32 +++++++++++++++++++++++--------- 1 file changed, 23 insertions(+), 9 deletions(-) diff --git a/prefect_kubernetes/worker.py b/prefect_kubernetes/worker.py index e38c22b..7e80531 100644 --- a/prefect_kubernetes/worker.py +++ b/prefect_kubernetes/worker.py @@ -97,6 +97,8 @@ from typing import TYPE_CHECKING, Any, Dict, Generator, Optional, Tuple import anyio.abc +from kubernetes.client.exceptions import ApiException +from kubernetes.client.models import V1ObjectMeta, V1Secret from prefect.blocks.kubernetes import KubernetesClusterConfig from prefect.exceptions import ( InfrastructureError, @@ -688,12 +690,10 @@ def _get_configured_kubernetes_client( except kubernetes.config.ConfigException: return kubernetes.config.new_client_from_config() - def _create_job( + def _replace_api_key_with_secret( self, configuration: KubernetesWorkerJobConfiguration, client: "ApiClient" - ) -> "V1Job": - """ - Creates a Kubernetes job from a job manifest. - """ + ): + """Replaces the PREFECT_API_KEY environment variable with a Kubernetes secret""" manifest_env = configuration.job_manifest["spec"]["template"]["spec"][ "containers" ][0].get("env") @@ -707,7 +707,7 @@ def _create_job( ) api_key = manifest_api_key_env.get("value") if api_key: - secret_name = f"{_slugify_name(self.name)}-api-key" + secret_name = f"prefect-{_slugify_name(self.name)}-api-key" secret = self._upsert_secret( name=secret_name, value=api_key, @@ -731,6 +731,18 @@ def _create_job( "env" ] = manifest_env + def _create_job( + self, configuration: KubernetesWorkerJobConfiguration, client: "ApiClient" + ) -> "V1Job": + """ + Creates a Kubernetes job from a job manifest. + """ + if os.environ.get( + "PREFECT_KUBERNETES_WORKER_STORE_PREFECT_API_IN_SECRET", "" + ).strip().lower() in ("true", "1"): + self._replace_api_key_with_secret( + configuration=configuration, client=client + ) try: with self._get_batch_client(client) as batch_client: job = batch_client.create_namespaced_job( @@ -765,10 +777,12 @@ def _upsert_secret( secret = core_client.replace_namespaced_secret( name=name, namespace=namespace, body=current_secret ) - except client.rest.ApiException: + except ApiException as exc: + if exc.status != 404: + raise # Create the secret if it doesn't already exist - metadata = client.V1ObjectMeta(name=name, namespace=namespace) - secret = client.V1Secret( + metadata = V1ObjectMeta(name=name, namespace=namespace) + secret = V1Secret( api_version="v1", kind="Secret", metadata=metadata, From ef1fd8fbf9b015c056e9b7bcec6c3b822c12745e Mon Sep 17 00:00:00 2001 From: Alexander Streed Date: Mon, 6 Nov 2023 20:48:37 -0800 Subject: [PATCH 3/4] Adds tests --- tests/test_worker.py | 126 ++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 125 insertions(+), 1 deletion(-) diff --git a/tests/test_worker.py b/tests/test_worker.py index b198ac3..708e6a6 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -1,3 +1,4 @@ +import base64 import re import uuid from contextlib import contextmanager @@ -18,6 +19,7 @@ V1ListMeta, V1ObjectMeta, V1ObjectReference, + V1Secret, ) from kubernetes.config import ConfigException from prefect.client.schemas import FlowRun @@ -29,6 +31,7 @@ from prefect.server.schemas.core import Flow from prefect.server.schemas.responses import DeploymentResponse from prefect.settings import ( + PREFECT_API_KEY, PREFECT_EXPERIMENTAL_ENABLE_WORKERS, PREFECT_EXPERIMENTAL_WARN_WORKERS, get_current_settings, @@ -43,7 +46,7 @@ from pydantic import ValidationError from prefect_kubernetes import KubernetesWorker -from prefect_kubernetes.utilities import _slugify_label_value +from prefect_kubernetes.utilities import _slugify_label_value, _slugify_name from prefect_kubernetes.worker import KubernetesWorkerJobConfiguration FAKE_CLUSTER = "fake-cluster" @@ -151,6 +154,11 @@ def _mock_pods_stream_that_returns_running_pod(*args, **kwargs): ] +@pytest.fixture +def enable_store_api_key_in_secret(monkeypatch): + monkeypatch.setenv("PREFECT_KUBERNETES_WORKER_STORE_PREFECT_API_IN_SECRET", "true") + + from_template_and_values_cases = [ ( # default base template with no values @@ -1382,6 +1390,122 @@ async def test_uses_image_variable( ]["spec"]["containers"][0]["image"] assert image == "foo" + async def test_can_store_api_key_in_secret( + self, + flow_run, + mock_core_client, + mock_watch, + mock_batch_client, + enable_store_api_key_in_secret, + ): + mock_watch.stream = _mock_pods_stream_that_returns_running_pod + mock_core_client.read_namespaced_secret.side_effect = ApiException(status=404) + + configuration = await KubernetesWorkerJobConfiguration.from_template_and_values( + KubernetesWorker.get_default_base_job_template(), {"image": "foo"} + ) + with temporary_settings(updates={PREFECT_API_KEY: "fake"}): + async with KubernetesWorker(work_pool_name="test") as k8s_worker: + configuration.prepare_for_flow_run(flow_run=flow_run) + await k8s_worker.run(flow_run, configuration) + mock_batch_client.create_namespaced_job.assert_called_once() + env = mock_batch_client.create_namespaced_job.call_args[0][1]["spec"][ + "template" + ]["spec"]["containers"][0]["env"] + assert { + "name": "PREFECT_API_KEY", + "valueFrom": { + "secretKeyRef": { + "name": f"prefect-{_slugify_name(k8s_worker.name)}-api-key", + "key": "value", + } + }, + } in env + mock_core_client.create_namespaced_secret.assert_called_with( + namespace=configuration.namespace, + body=V1Secret( + api_version="v1", + kind="Secret", + metadata=V1ObjectMeta( + name=f"prefect-{_slugify_name(k8s_worker.name)}-api-key", + namespace=configuration.namespace, + ), + data={ + "value": base64.b64encode("fake".encode("utf-8")).decode( + "utf-8" + ) + }, + ), + ) + + # Make sure secret gets deleted + assert mock_core_client.delete_namespaced_secret( + name=f"prefect-{_slugify_name(k8s_worker.name)}-api-key", + namespace=configuration.namespace, + ) + + async def test_store_api_key_in_existing_secret( + self, + flow_run, + mock_core_client, + mock_watch, + mock_batch_client, + enable_store_api_key_in_secret, + ): + mock_watch.stream = _mock_pods_stream_that_returns_running_pod + + configuration = await KubernetesWorkerJobConfiguration.from_template_and_values( + KubernetesWorker.get_default_base_job_template(), {"image": "foo"} + ) + with temporary_settings(updates={PREFECT_API_KEY: "fake"}): + async with KubernetesWorker(work_pool_name="test") as k8s_worker: + mock_core_client.read_namespaced_secret.return_value = V1Secret( + api_version="v1", + kind="Secret", + metadata=V1ObjectMeta( + name=f"prefect-{_slugify_name(k8s_worker.name)}-api-key", + namespace=configuration.namespace, + ), + data={ + "value": base64.b64encode("fake".encode("utf-8")).decode( + "utf-8" + ) + }, + ) + + configuration.prepare_for_flow_run(flow_run=flow_run) + await k8s_worker.run(flow_run, configuration) + mock_batch_client.create_namespaced_job.assert_called_once() + env = mock_batch_client.create_namespaced_job.call_args[0][1]["spec"][ + "template" + ]["spec"]["containers"][0]["env"] + assert { + "name": "PREFECT_API_KEY", + "valueFrom": { + "secretKeyRef": { + "name": f"prefect-{_slugify_name(k8s_worker.name)}-api-key", + "key": "value", + } + }, + } in env + mock_core_client.replace_namespaced_secret.assert_called_with( + name=f"prefect-{_slugify_name(k8s_worker.name)}-api-key", + namespace=configuration.namespace, + body=V1Secret( + api_version="v1", + kind="Secret", + metadata=V1ObjectMeta( + name=f"prefect-{_slugify_name(k8s_worker.name)}-api-key", + namespace=configuration.namespace, + ), + data={ + "value": base64.b64encode("fake".encode("utf-8")).decode( + "utf-8" + ) + }, + ), + ) + async def test_create_job_failure( self, flow_run, From 41132510f69e08ec105b258fdfa93eeab96bf6eb Mon Sep 17 00:00:00 2001 From: Alexander Streed Date: Mon, 6 Nov 2023 20:57:15 -0800 Subject: [PATCH 4/4] Add changelog and tests --- CHANGELOG.md | 8 ++ prefect_kubernetes/worker.py | 149 +++++++++++++++++++---------------- 2 files changed, 90 insertions(+), 67 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 975327a..f0ea866 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Security +## 0.3.2 + +Released November 7th, 2023. + +### Added + +- Option to worker to pass Prefect API key to created jobs as a Kubernetes secret - [#99](https://github.com/PrefectHQ/prefect-kubernetes/pull/99) + ## 0.3.1 Released October 11th, 2023. diff --git a/prefect_kubernetes/worker.py b/prefect_kubernetes/worker.py index 7e80531..a3b0def 100644 --- a/prefect_kubernetes/worker.py +++ b/prefect_kubernetes/worker.py @@ -11,75 +11,90 @@ Replace `my-work-pool` with the name of the work pool you want the worker to poll for flow runs. -!!! example "Using a custom Kubernetes job manifest template" - The default template used for Kubernetes job manifests looks like this: - ```yaml - --- - apiVersion: batch/v1 - kind: Job - metadata: - labels: "{{ labels }}" - namespace: "{{ namespace }}" - generateName: "{{ name }}-" +### Securing your Prefect Cloud API key +If you are using Prefect Cloud and would like to pass your Prefect Cloud API key to +created jobs via a Kubernetes secret, set the +`PREFECT_KUBERNETES_WORKER_STORE_PREFECT_API_IN_SECRET` environment variable before +starting your worker: + +```bash +export PREFECT_KUBERNETES_WORKER_STORE_PREFECT_API_IN_SECRET="true" +prefect worker start --pool 'my-work-pool' --type kubernetes +``` + +Note that your work will need permission to create secrets in the same namespace(s) +that Kubernetes jobs are created in to execute flow runs. + +### Using a custom Kubernetes job manifest template + +The default template used for Kubernetes job manifests looks like this: +```yaml +--- +apiVersion: batch/v1 +kind: Job +metadata: +labels: "{{ labels }}" +namespace: "{{ namespace }}" +generateName: "{{ name }}-" +spec: +ttlSecondsAfterFinished: "{{ finished_job_ttl }}" +template: spec: - ttlSecondsAfterFinished: "{{ finished_job_ttl }}" - template: - spec: - parallelism: 1 - completions: 1 - restartPolicy: Never - serviceAccountName: "{{ service_account_name }}" - containers: - - name: prefect-job - env: "{{ env }}" - image: "{{ image }}" - imagePullPolicy: "{{ image_pull_policy }}" - args: "{{ command }}" - ``` - - Each values enclosed in `{{ }}` is a placeholder that will be replaced with - a value at runtime. The values that can be used a placeholders are defined - by the `variables` schema defined in the base job template. - - The default job manifest and available variables can be customized on a work pool - by work pool basis. These customizations can be made via the Prefect UI when - creating or editing a work pool. - - For example, if you wanted to allow custom memory requests for a Kubernetes work - pool you could update the job manifest template to look like this: - - ```yaml - --- - apiVersion: batch/v1 - kind: Job - metadata: - labels: "{{ labels }}" - namespace: "{{ namespace }}" - generateName: "{{ name }}-" + parallelism: 1 + completions: 1 + restartPolicy: Never + serviceAccountName: "{{ service_account_name }}" + containers: + - name: prefect-job + env: "{{ env }}" + image: "{{ image }}" + imagePullPolicy: "{{ image_pull_policy }}" + args: "{{ command }}" +``` + +Each values enclosed in `{{ }}` is a placeholder that will be replaced with +a value at runtime. The values that can be used a placeholders are defined +by the `variables` schema defined in the base job template. + +The default job manifest and available variables can be customized on a work pool +by work pool basis. These customizations can be made via the Prefect UI when +creating or editing a work pool. + +For example, if you wanted to allow custom memory requests for a Kubernetes work +pool you could update the job manifest template to look like this: + +```yaml +--- +apiVersion: batch/v1 +kind: Job +metadata: +labels: "{{ labels }}" +namespace: "{{ namespace }}" +generateName: "{{ name }}-" +spec: +ttlSecondsAfterFinished: "{{ finished_job_ttl }}" +template: spec: - ttlSecondsAfterFinished: "{{ finished_job_ttl }}" - template: - spec: - parallelism: 1 - completions: 1 - restartPolicy: Never - serviceAccountName: "{{ service_account_name }}" - containers: - - name: prefect-job - env: "{{ env }}" - image: "{{ image }}" - imagePullPolicy: "{{ image_pull_policy }}" - args: "{{ command }}" - resources: - requests: - memory: "{{ memory }}Mi" - limits: - memory: 128Mi - ``` - - In this new template, the `memory` placeholder allows customization of the memory - allocated to Kubernetes jobs created by workers in this work pool, but the limit - is hard-coded and cannot be changed by deployments. + parallelism: 1 + completions: 1 + restartPolicy: Never + serviceAccountName: "{{ service_account_name }}" + containers: + - name: prefect-job + env: "{{ env }}" + image: "{{ image }}" + imagePullPolicy: "{{ image_pull_policy }}" + args: "{{ command }}" + resources: + requests: + memory: "{{ memory }}Mi" + limits: + memory: 128Mi +``` + +In this new template, the `memory` placeholder allows customization of the memory +allocated to Kubernetes jobs created by workers in this work pool, but the limit +is hard-coded and cannot be changed by deployments. For more information about work pools and workers, checkout out the [Prefect docs](https://docs.prefect.io/concepts/work-pools/).