Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add option to provide Prefect API key to created jobs as a Kubernetes secret #99

Merged
merged 4 commits into from
Nov 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Security

## 0.3.2

Released November 7th, 2023.

### Added

- Option to worker to pass Prefect API key to created jobs as a Kubernetes secret - [#99](https://github.com/PrefectHQ/prefect-kubernetes/pull/99)

## 0.3.1

Released October 11th, 2023.
Expand Down
261 changes: 194 additions & 67 deletions prefect_kubernetes/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,79 +11,96 @@
Replace `my-work-pool` with the name of the work pool you want the worker
to poll for flow runs.

!!! example "Using a custom Kubernetes job manifest template"
The default template used for Kubernetes job manifests looks like this:
```yaml
---
apiVersion: batch/v1
kind: Job
metadata:
labels: "{{ labels }}"
namespace: "{{ namespace }}"
generateName: "{{ name }}-"
### Securing your Prefect Cloud API key
If you are using Prefect Cloud and would like to pass your Prefect Cloud API key to
created jobs via a Kubernetes secret, set the
`PREFECT_KUBERNETES_WORKER_STORE_PREFECT_API_IN_SECRET` environment variable before
starting your worker:

```bash
export PREFECT_KUBERNETES_WORKER_STORE_PREFECT_API_IN_SECRET="true"
prefect worker start --pool 'my-work-pool' --type kubernetes
```

Note that your work will need permission to create secrets in the same namespace(s)
that Kubernetes jobs are created in to execute flow runs.

### Using a custom Kubernetes job manifest template

The default template used for Kubernetes job manifests looks like this:
```yaml
---
apiVersion: batch/v1
kind: Job
metadata:
labels: "{{ labels }}"
namespace: "{{ namespace }}"
generateName: "{{ name }}-"
spec:
ttlSecondsAfterFinished: "{{ finished_job_ttl }}"
template:
spec:
ttlSecondsAfterFinished: "{{ finished_job_ttl }}"
template:
spec:
parallelism: 1
completions: 1
restartPolicy: Never
serviceAccountName: "{{ service_account_name }}"
containers:
- name: prefect-job
env: "{{ env }}"
image: "{{ image }}"
imagePullPolicy: "{{ image_pull_policy }}"
args: "{{ command }}"
```

Each values enclosed in `{{ }}` is a placeholder that will be replaced with
a value at runtime. The values that can be used a placeholders are defined
by the `variables` schema defined in the base job template.

The default job manifest and available variables can be customized on a work pool
by work pool basis. These customizations can be made via the Prefect UI when
creating or editing a work pool.

For example, if you wanted to allow custom memory requests for a Kubernetes work
pool you could update the job manifest template to look like this:

```yaml
---
apiVersion: batch/v1
kind: Job
metadata:
labels: "{{ labels }}"
namespace: "{{ namespace }}"
generateName: "{{ name }}-"
parallelism: 1
completions: 1
restartPolicy: Never
serviceAccountName: "{{ service_account_name }}"
containers:
- name: prefect-job
env: "{{ env }}"
image: "{{ image }}"
imagePullPolicy: "{{ image_pull_policy }}"
args: "{{ command }}"
```

Each values enclosed in `{{ }}` is a placeholder that will be replaced with
a value at runtime. The values that can be used a placeholders are defined
by the `variables` schema defined in the base job template.

The default job manifest and available variables can be customized on a work pool
by work pool basis. These customizations can be made via the Prefect UI when
creating or editing a work pool.

For example, if you wanted to allow custom memory requests for a Kubernetes work
pool you could update the job manifest template to look like this:

```yaml
---
apiVersion: batch/v1
kind: Job
metadata:
labels: "{{ labels }}"
namespace: "{{ namespace }}"
generateName: "{{ name }}-"
spec:
ttlSecondsAfterFinished: "{{ finished_job_ttl }}"
template:
spec:
ttlSecondsAfterFinished: "{{ finished_job_ttl }}"
template:
spec:
parallelism: 1
completions: 1
restartPolicy: Never
serviceAccountName: "{{ service_account_name }}"
containers:
- name: prefect-job
env: "{{ env }}"
image: "{{ image }}"
imagePullPolicy: "{{ image_pull_policy }}"
args: "{{ command }}"
resources:
requests:
memory: "{{ memory }}Mi"
limits:
memory: 128Mi
```

In this new template, the `memory` placeholder allows customization of the memory
allocated to Kubernetes jobs created by workers in this work pool, but the limit
is hard-coded and cannot be changed by deployments.
parallelism: 1
completions: 1
restartPolicy: Never
serviceAccountName: "{{ service_account_name }}"
containers:
- name: prefect-job
env: "{{ env }}"
image: "{{ image }}"
imagePullPolicy: "{{ image_pull_policy }}"
args: "{{ command }}"
resources:
requests:
memory: "{{ memory }}Mi"
limits:
memory: 128Mi
```

In this new template, the `memory` placeholder allows customization of the memory
allocated to Kubernetes jobs created by workers in this work pool, but the limit
is hard-coded and cannot be changed by deployments.

For more information about work pools and workers,
checkout out the [Prefect docs](https://docs.prefect.io/concepts/work-pools/).
"""
import asyncio
import base64
import enum
import logging
import math
Expand All @@ -95,6 +112,8 @@
from typing import TYPE_CHECKING, Any, Dict, Generator, Optional, Tuple

import anyio.abc
from kubernetes.client.exceptions import ApiException
from kubernetes.client.models import V1ObjectMeta, V1Secret
from prefect.blocks.kubernetes import KubernetesClusterConfig
from prefect.exceptions import (
InfrastructureError,
Expand Down Expand Up @@ -517,6 +536,10 @@ class KubernetesWorker(BaseWorker):
_documentation_url = "https://prefecthq.github.io/prefect-kubernetes/worker/"
_logo_url = "https://cdn.sanity.io/images/3ugk85nk/production/2d0b896006ad463b49c28aaac14f31e00e32cfab-250x250.png" # noqa

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._created_secrets = {}

async def run(
self,
flow_run: "FlowRun",
Expand Down Expand Up @@ -584,6 +607,32 @@ async def kill_infrastructure(
self._stop_job, infrastructure_pid, configuration, grace_seconds
)

async def teardown(self, *exc_info):
await super().teardown(*exc_info)

await self._clean_up_created_secrets()

async def _clean_up_created_secrets(self):
"""Deletes any secrets created during the worker's operation."""
coros = []
for key, configuration in self._created_secrets.items():
with self._get_configured_kubernetes_client(configuration) as client:
with self._get_core_client(client) as core_client:
coros.append(
run_sync_in_worker_thread(
core_client.delete_namespaced_secret,
name=key[0],
namespace=key[1],
)
)

results = await asyncio.gather(*coros, return_exceptions=True)
for result in results:
if isinstance(result, Exception):
self._logger.warning(
"Failed to delete created secret with exception: %s", result
)

def _stop_job(
self,
infrastructure_pid: str,
Expand Down Expand Up @@ -656,12 +705,59 @@ def _get_configured_kubernetes_client(
except kubernetes.config.ConfigException:
return kubernetes.config.new_client_from_config()

def _replace_api_key_with_secret(
self, configuration: KubernetesWorkerJobConfiguration, client: "ApiClient"
):
"""Replaces the PREFECT_API_KEY environment variable with a Kubernetes secret"""
manifest_env = configuration.job_manifest["spec"]["template"]["spec"][
"containers"
][0].get("env")
manifest_api_key_env = next(
(
env_entry
for env_entry in manifest_env
if env_entry.get("name") == "PREFECT_API_KEY"
),
{},
)
api_key = manifest_api_key_env.get("value")
if api_key:
secret_name = f"prefect-{_slugify_name(self.name)}-api-key"
secret = self._upsert_secret(
name=secret_name,
value=api_key,
namespace=configuration.namespace,
client=client,
)
# Store configuration so that we can delete the secret when the worker shuts
# down
self._created_secrets[
(secret.metadata.name, secret.metadata.namespace)
] = configuration
new_api_env_entry = {
"name": "PREFECT_API_KEY",
"valueFrom": {"secretKeyRef": {"name": secret_name, "key": "value"}},
}
manifest_env = [
entry if entry.get("name") != "PREFECT_API_KEY" else new_api_env_entry
for entry in manifest_env
]
configuration.job_manifest["spec"]["template"]["spec"]["containers"][0][
"env"
] = manifest_env

def _create_job(
self, configuration: KubernetesWorkerJobConfiguration, client: "ApiClient"
) -> "V1Job":
"""
Creates a Kubernetes job from a job manifest.
"""
if os.environ.get(
"PREFECT_KUBERNETES_WORKER_STORE_PREFECT_API_IN_SECRET", ""
).strip().lower() in ("true", "1"):
self._replace_api_key_with_secret(
configuration=configuration, client=client
)
try:
with self._get_batch_client(client) as batch_client:
job = batch_client.create_namespaced_job(
Expand All @@ -681,6 +777,37 @@ def _create_job(

return job

def _upsert_secret(
self, name: str, value: str, namespace: str, client: "ApiClient"
):
encoded_value = base64.b64encode(value.encode("utf-8")).decode("utf-8")
with self._get_core_client(client) as core_client:
try:
# Get the current version of the Secret and update it with the
# new value
current_secret = core_client.read_namespaced_secret(
name=name, namespace=namespace
)
current_secret.data = {"value": encoded_value}
secret = core_client.replace_namespaced_secret(
name=name, namespace=namespace, body=current_secret
)
except ApiException as exc:
if exc.status != 404:
raise
# Create the secret if it doesn't already exist
metadata = V1ObjectMeta(name=name, namespace=namespace)
secret = V1Secret(
api_version="v1",
kind="Secret",
metadata=metadata,
data={"value": encoded_value},
)
secret = core_client.create_namespaced_secret(
namespace=namespace, body=secret
)
return secret

@contextmanager
def _get_batch_client(
self, client: "ApiClient"
Expand Down
Loading