Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Fixes error when creating a secret
Browse files Browse the repository at this point in the history
  • Loading branch information
desertaxle committed Nov 7, 2023
1 parent 067a402 commit 0dd47dd
Showing 1 changed file with 23 additions and 9 deletions.
32 changes: 23 additions & 9 deletions prefect_kubernetes/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@
from typing import TYPE_CHECKING, Any, Dict, Generator, Optional, Tuple

import anyio.abc
from kubernetes.client.exceptions import ApiException
from kubernetes.client.models import V1ObjectMeta, V1Secret
from prefect.blocks.kubernetes import KubernetesClusterConfig
from prefect.exceptions import (
InfrastructureError,
Expand Down Expand Up @@ -688,12 +690,10 @@ def _get_configured_kubernetes_client(
except kubernetes.config.ConfigException:
return kubernetes.config.new_client_from_config()

def _create_job(
def _replace_api_key_with_secret(
self, configuration: KubernetesWorkerJobConfiguration, client: "ApiClient"
) -> "V1Job":
"""
Creates a Kubernetes job from a job manifest.
"""
):
"""Replaces the PREFECT_API_KEY environment variable with a Kubernetes secret"""
manifest_env = configuration.job_manifest["spec"]["template"]["spec"][
"containers"
][0].get("env")
Expand All @@ -707,7 +707,7 @@ def _create_job(
)
api_key = manifest_api_key_env.get("value")
if api_key:
secret_name = f"{_slugify_name(self.name)}-api-key"
secret_name = f"prefect-{_slugify_name(self.name)}-api-key"
secret = self._upsert_secret(
name=secret_name,
value=api_key,
Expand All @@ -731,6 +731,18 @@ def _create_job(
"env"
] = manifest_env

def _create_job(
self, configuration: KubernetesWorkerJobConfiguration, client: "ApiClient"
) -> "V1Job":
"""
Creates a Kubernetes job from a job manifest.
"""
if os.environ.get(
"PREFECT_KUBERNETES_WORKER_STORE_PREFECT_API_IN_SECRET", ""
).strip().lower() in ("true", "1"):
self._replace_api_key_with_secret(
configuration=configuration, client=client
)
try:
with self._get_batch_client(client) as batch_client:
job = batch_client.create_namespaced_job(
Expand Down Expand Up @@ -765,10 +777,12 @@ def _upsert_secret(
secret = core_client.replace_namespaced_secret(
name=name, namespace=namespace, body=current_secret
)
except client.rest.ApiException:
except ApiException as exc:
if exc.status != 404:
raise
# Create the secret if it doesn't already exist
metadata = client.V1ObjectMeta(name=name, namespace=namespace)
secret = client.V1Secret(
metadata = V1ObjectMeta(name=name, namespace=namespace)
secret = V1Secret(
api_version="v1",
kind="Secret",
metadata=metadata,
Expand Down

0 comments on commit 0dd47dd

Please sign in to comment.