From 0dd47dd428bd4c62e3d3ba9311d38dd57f6be1b5 Mon Sep 17 00:00:00 2001 From: Alexander Streed Date: Mon, 6 Nov 2023 14:45:45 -0600 Subject: [PATCH] Fixes error when creating a secret --- prefect_kubernetes/worker.py | 32 +++++++++++++++++++++++--------- 1 file changed, 23 insertions(+), 9 deletions(-) diff --git a/prefect_kubernetes/worker.py b/prefect_kubernetes/worker.py index e38c22b..7e80531 100644 --- a/prefect_kubernetes/worker.py +++ b/prefect_kubernetes/worker.py @@ -97,6 +97,8 @@ from typing import TYPE_CHECKING, Any, Dict, Generator, Optional, Tuple import anyio.abc +from kubernetes.client.exceptions import ApiException +from kubernetes.client.models import V1ObjectMeta, V1Secret from prefect.blocks.kubernetes import KubernetesClusterConfig from prefect.exceptions import ( InfrastructureError, @@ -688,12 +690,10 @@ def _get_configured_kubernetes_client( except kubernetes.config.ConfigException: return kubernetes.config.new_client_from_config() - def _create_job( + def _replace_api_key_with_secret( self, configuration: KubernetesWorkerJobConfiguration, client: "ApiClient" - ) -> "V1Job": - """ - Creates a Kubernetes job from a job manifest. - """ + ): + """Replaces the PREFECT_API_KEY environment variable with a Kubernetes secret""" manifest_env = configuration.job_manifest["spec"]["template"]["spec"][ "containers" ][0].get("env") @@ -707,7 +707,7 @@ def _create_job( ) api_key = manifest_api_key_env.get("value") if api_key: - secret_name = f"{_slugify_name(self.name)}-api-key" + secret_name = f"prefect-{_slugify_name(self.name)}-api-key" secret = self._upsert_secret( name=secret_name, value=api_key, @@ -731,6 +731,18 @@ def _create_job( "env" ] = manifest_env + def _create_job( + self, configuration: KubernetesWorkerJobConfiguration, client: "ApiClient" + ) -> "V1Job": + """ + Creates a Kubernetes job from a job manifest. + """ + if os.environ.get( + "PREFECT_KUBERNETES_WORKER_STORE_PREFECT_API_IN_SECRET", "" + ).strip().lower() in ("true", "1"): + self._replace_api_key_with_secret( + configuration=configuration, client=client + ) try: with self._get_batch_client(client) as batch_client: job = batch_client.create_namespaced_job( @@ -765,10 +777,12 @@ def _upsert_secret( secret = core_client.replace_namespaced_secret( name=name, namespace=namespace, body=current_secret ) - except client.rest.ApiException: + except ApiException as exc: + if exc.status != 404: + raise # Create the secret if it doesn't already exist - metadata = client.V1ObjectMeta(name=name, namespace=namespace) - secret = client.V1Secret( + metadata = V1ObjectMeta(name=name, namespace=namespace) + secret = V1Secret( api_version="v1", kind="Secret", metadata=metadata,