Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Cache Kubernetes ApiClient (#111)
Browse files Browse the repository at this point in the history
* add hashing util

* add client caching

* fix config typehint

* clear cache for testing call count

* make the hashable class look like the original

* add hash collection tests

* add client caching test

* more consistent naming

* fix docstring

* fix another docstring

* old docstring was better

* switch to existing hash utility

* update changelog
  • Loading branch information
kevingrismore authored Feb 2, 2024
1 parent d2b0f47 commit 43b2746
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 24 deletions.
6 changes: 4 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Fixed

- Reverting [#107](https://github.com/PrefectHQ/prefect-kubernetes/pull/107) to address
deadlocking issue.
- Added Kubernetes client caching to prevent worker from running out of sockets - [#111](https://github.com/PrefectHQ/prefect-kubernetes/pull/111)

- Reverting [#107](https://github.com/PrefectHQ/prefect-kubernetes/pull/107) to address
deadlocking issue.

### Security

Expand Down
1 change: 1 addition & 0 deletions prefect_kubernetes/utilities.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
""" Utilities for working with the Python Kubernetes API. """

import socket
import sys
from pathlib import Path
Expand Down
89 changes: 68 additions & 21 deletions prefect_kubernetes/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@
For more information about work pools and workers,
checkout out the [Prefect docs](https://docs.prefect.io/concepts/work-pools/).
"""

import asyncio
import base64
import enum
Expand All @@ -109,6 +110,8 @@
import time
from contextlib import contextmanager
from datetime import datetime
from functools import lru_cache
from threading import Lock
from typing import TYPE_CHECKING, Any, Dict, Generator, Optional, Tuple

import anyio.abc
Expand All @@ -124,6 +127,7 @@
from prefect.server.schemas.responses import DeploymentResponse
from prefect.utilities.asyncutils import run_sync_in_worker_thread
from prefect.utilities.dockerutils import get_prefect_image_name
from prefect.utilities.hashing import hash_objects
from prefect.utilities.importtools import lazy_import
from prefect.utilities.pydantic import JsonPatch
from prefect.utilities.templating import find_placeholders
Expand All @@ -134,6 +138,7 @@
BaseWorkerResult,
)
from pydantic import VERSION as PYDANTIC_VERSION
from pydantic import BaseModel

if PYDANTIC_VERSION.startswith("2."):
from pydantic.v1 import Field, validator
Expand Down Expand Up @@ -161,6 +166,63 @@
else:
kubernetes = lazy_import("kubernetes")

_LOCK = Lock()


class HashableKubernetesClusterConfig(BaseModel):
"""
A hashable version of the KubernetesClusterConfig class.
Used for caching.
"""

config: dict = Field(
default=..., description="The entire contents of a kubectl config file."
)
context_name: str = Field(
default=..., description="The name of the kubectl context to use."
)

def __hash__(self):
"""Make the conifg hashable."""
return hash(
(
hash_objects(self.config),
self.context_name,
)
)


@lru_cache(maxsize=8, typed=True)
def _get_configured_kubernetes_client_cached(
cluster_config: Optional[HashableKubernetesClusterConfig] = None,
) -> Any:
"""Returns a configured Kubernetes client."""
with _LOCK:
# if a hard-coded cluster config is provided, use it
if cluster_config:
client = kubernetes.config.new_client_from_config_dict(
config_dict=cluster_config.config,
context=cluster_config.context_name,
)
else:
# If no hard-coded config specified, try to load Kubernetes configuration
# within a cluster. If that doesn't work, try to load the configuration
# from the local environment, allowing any further ConfigExceptions to
# bubble up.
try:
kubernetes.config.load_incluster_config()
config = kubernetes.client.Configuration.get_default_copy()
client = kubernetes.client.ApiClient(configuration=config)
except kubernetes.config.ConfigException:
client = kubernetes.config.new_client_from_config()

if os.environ.get(
"PREFECT_KUBERNETES_WORKER_ADD_TCP_KEEPALIVE", "TRUE"
).strip().lower() in ("true", "1"):
enable_socket_keep_alive(client)

return client


def _get_default_job_manifest_template() -> Dict[str, Any]:
"""Returns the default job manifest template used by the Kubernetes worker."""
Expand Down Expand Up @@ -688,30 +750,15 @@ def _get_configured_kubernetes_client(
Returns a configured Kubernetes client.
"""

# if a hard-coded cluster config is provided, use it
cluster_config = None

if configuration.cluster_config:
client = kubernetes.config.new_client_from_config_dict(
config_dict=configuration.cluster_config.config,
context=configuration.cluster_config.context_name,
cluster_config = HashableKubernetesClusterConfig(
config=configuration.cluster_config.config,
context_name=configuration.cluster_config.context_name,
)
else:
# If no hard-coded config specified, try to load Kubernetes configuration
# within a cluster. If that doesn't work, try to load the configuration
# from the local environment, allowing any further ConfigExceptions to
# bubble up.
try:
kubernetes.config.load_incluster_config()
config = kubernetes.client.Configuration.get_default_copy()
client = kubernetes.client.ApiClient(configuration=config)
except kubernetes.config.ConfigException:
client = kubernetes.config.new_client_from_config()

if os.environ.get(
"PREFECT_KUBERNETES_WORKER_ADD_TCP_KEEPALIVE", "TRUE"
).strip().lower() in ("true", "1"):
enable_socket_keep_alive(client)

return client
return _get_configured_kubernetes_client_cached(cluster_config)

def _replace_api_key_with_secret(
self, configuration: KubernetesWorkerJobConfiguration, client: "ApiClient"
Expand Down
29 changes: 28 additions & 1 deletion tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@

from prefect_kubernetes import KubernetesWorker
from prefect_kubernetes.utilities import _slugify_label_value, _slugify_name
from prefect_kubernetes.worker import KubernetesWorkerJobConfiguration
from prefect_kubernetes.worker import (
KubernetesWorkerJobConfiguration,
_get_configured_kubernetes_client_cached,
)

FAKE_CLUSTER = "fake-cluster"
MOCK_CLUSTER_UID = "1234"
Expand Down Expand Up @@ -1978,6 +1981,7 @@ async def test_defaults_to_incluster_config(
mock_cluster_config,
mock_batch_client,
):
_get_configured_kubernetes_client_cached.cache_clear()
mock_watch.stream = _mock_pods_stream_that_returns_running_pod

async with KubernetesWorker(work_pool_name="test") as k8s_worker:
Expand All @@ -1995,13 +1999,36 @@ async def test_uses_cluster_config_if_not_in_cluster(
mock_cluster_config,
mock_batch_client,
):
_get_configured_kubernetes_client_cached.cache_clear()
mock_watch.stream = _mock_pods_stream_that_returns_running_pod
mock_cluster_config.load_incluster_config.side_effect = ConfigException()
async with KubernetesWorker(work_pool_name="test") as k8s_worker:
await k8s_worker.run(flow_run, default_configuration)

mock_cluster_config.new_client_from_config.assert_called_once()

async def test_get_configured_kubernetes_client_cached(
self,
flow_run,
default_configuration,
mock_core_client,
mock_watch,
mock_cluster_config,
mock_batch_client,
):
_get_configured_kubernetes_client_cached.cache_clear()
mock_watch.stream = _mock_pods_stream_that_returns_running_pod

assert _get_configured_kubernetes_client_cached.cache_info().hits == 0

async with KubernetesWorker(work_pool_name="test") as k8s_worker:
await k8s_worker.run(flow_run, default_configuration)
await k8s_worker.run(flow_run, default_configuration)
await k8s_worker.run(flow_run, default_configuration)

assert _get_configured_kubernetes_client_cached.cache_info().misses == 1
assert _get_configured_kubernetes_client_cached.cache_info().hits == 2

@pytest.mark.parametrize("job_timeout", [24, 100])
async def test_allows_configurable_timeouts_for_pod_and_job_watches(
self,
Expand Down

0 comments on commit 43b2746

Please sign in to comment.