From 675f6d7f318a9fa7a72070b301cfd76076698b63 Mon Sep 17 00:00:00 2001 From: kevingrismore <146098880+kevingrismore@users.noreply.github.com> Date: Thu, 1 Feb 2024 11:41:33 -0500 Subject: [PATCH] Add TCP keepalive to `KubernetesWorker` (#108) * add keepalive * fix multi-line string * formatting * update changelog * move socket import * change to env var * better env var name * default true --- CHANGELOG.md | 20 ++++++++++++++++--- prefect_kubernetes/utilities.py | 33 +++++++++++++++++++++++++++++++ prefect_kubernetes/worker.py | 14 ++++++++++--- tests/test_utilities.py | 35 +++++++++++++++++++++++++++++++++ 4 files changed, 96 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d493aa4..1c66d83 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,7 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added -- A `ResilientWatcher` utility class to reconnect Kubernetes client streams on `ProtocolErrors` - [#107](https://github.com/PrefectHQ/prefect-kubernetes/pull/107) +- TCP keepalive option for preventing closure of inactive connections - [#108](https://github.com/PrefectHQ/prefect-kubernetes/pull/108) ### Changed @@ -21,6 +21,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Security +## 0.3.3 + +Released January 23rd, 2024. + +### Added + +- A `ResilientWatcher` utility class to reconnect Kubernetes client streams on `ProtocolErrors` - [#107](https://github.com/PrefectHQ/prefect-kubernetes/pull/107) + ## 0.3.2 Released November 7th, 2023. @@ -55,7 +63,6 @@ Released September 18th, 2023. - Logging recent job events when pod scheduling fails - [#88](https://github.com/PrefectHQ/prefect-kubernetes/pull/88) - Event logging for pod events - [#91](https://github.com/PrefectHQ/prefect-kubernetes/pull/91) - ### Fixed - `env` handling to allow hard coding of environment variable in base job template - [#94](https://github.com/PrefectHQ/prefect-kubernetes/pull/94) @@ -65,6 +72,7 @@ Released September 18th, 2023. Released July 20th, 2023. ### Changed + - Promoted workers to GA, removed beta disclaimers ## 0.2.10 @@ -99,6 +107,7 @@ Released May 4th, 2023. ### Added - Fixed issue where `KubernetesEventReplicator` would not shutdown after a flow-run was completed resulting in new flow-runs not being picked up and the worker hanging on exit. - [#57](https://github.com/PrefectHQ/prefect-kubernetes/pull/57) + ## 0.2.6 Released April 28th, 2023. @@ -113,7 +122,7 @@ Released April 20th, 2023. ### Added -- `kill_infrastructure` method on `KubernetesWorker` which stops jobs for cancelled flow runs - [#52](https://github.com/PrefectHQ/prefect-kubernetes/pull/52) +- `kill_infrastructure` method on `KubernetesWorker` which stops jobs for cancelled flow runs - [#52](https://github.com/PrefectHQ/prefect-kubernetes/pull/52) ## 0.2.4 @@ -149,6 +158,7 @@ Released March 24, 2023. Released February 17, 2023. ### Added + - Sync compatibility for block method calls used by `run_namespaced_job` - [#34](https://github.com/PrefectHQ/prefect-kubernetes/pull/34) ## 0.2.0 @@ -156,6 +166,7 @@ Released February 17, 2023. Released December 23, 2022. ### Added + - `KubernetesJob` block for running a Kubernetes job from a manifest - [#28](https://github.com/PrefectHQ/prefect-kubernetes/pull/28) - `run_namespaced_job` flow allowing easy execution of a well-specified `KubernetesJob` block on a cluster specified by `KubernetesCredentials` - [#28](https://github.com/PrefectHQ/prefect-kubernetes/pull/28) - `convert_manifest_to_model` utility function for converting a Kubernetes manifest to a model object - [#28](https://github.com/PrefectHQ/prefect-kubernetes/pull/28) @@ -163,7 +174,9 @@ Released December 23, 2022. ## 0.1.0 Released November 21, 2022. + ### Added + - `KubernetesCredentials` block for generating authenticated Kubernetes clients - [#19](https://github.com/PrefectHQ/prefect-kubernetes/pull/19) - Tasks for interacting with `job` resources: `create_namespaced_job`, `delete_namespaced_job`, `list_namespaced_job`, `patch_namespaced_job`, `read_namespaced_job`, `replace_namespaced_job` - [#19](https://github.com/PrefectHQ/prefect-kubernetes/pull/19) - Tasks for interacting with `pod` resources: `create_namespaced_pod`, `delete_namespaced_pod`, `list_namespaced_pod`, `patch_namespaced_pod`, `read_namespaced_pod`, `read_namespaced_pod_logs`, `replace_namespaced_pod` - [#21](https://github.com/PrefectHQ/prefect-kubernetes/pull/21) @@ -173,4 +186,5 @@ Released November 21, 2022. - Tasks for interacting with `deployment` resources: `create_namespaced_deployment`, `delete_namespaced_deployment`, `list_namespaced_deployment`, `patch_namespaced_deployment`, `read_namespaced_deployment`, `replace_namespaced_deployment` - [#25](https://github.com/PrefectHQ/prefect-kubernetes/pull/25) ### Changed + - `KubernetesCredentials` block to use a single `get_client` method capable of creating all resource-specific client types - [#21](https://github.com/PrefectHQ/prefect-kubernetes/pull/21) diff --git a/prefect_kubernetes/utilities.py b/prefect_kubernetes/utilities.py index d5c0381..741f336 100644 --- a/prefect_kubernetes/utilities.py +++ b/prefect_kubernetes/utilities.py @@ -1,11 +1,14 @@ """ Utilities for working with the Python Kubernetes API. """ import logging +import socket +import sys import time from pathlib import Path from typing import Callable, List, Optional, Set, Type, TypeVar, Union import urllib3 from kubernetes import watch +from kubernetes.client import ApiClient from kubernetes.client import models as k8s_models from prefect.infrastructure.kubernetes import KubernetesJob, KubernetesManifest from slugify import slugify @@ -35,6 +38,36 @@ def add(self, value): super().add(value) +def enable_socket_keep_alive(client: ApiClient) -> None: + """ + Setting the keep-alive flags on the kubernetes client object. + Unfortunately neither the kubernetes library nor the urllib3 library which + kubernetes is using internally offer the functionality to enable keep-alive + messages. Thus the flags are added to be used on the underlying sockets. + """ + + socket_options = [(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)] + + if hasattr(socket, "TCP_KEEPINTVL"): + socket_options.append((socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 30)) + + if hasattr(socket, "TCP_KEEPCNT"): + socket_options.append((socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 6)) + + if hasattr(socket, "TCP_KEEPIDLE"): + socket_options.append((socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 6)) + + if sys.platform == "darwin": + # TCP_KEEP_ALIVE not available on socket module in macOS, but defined in + # https://github.com/apple/darwin-xnu/blob/2ff845c2e033bd0ff64b5b6aa6063a1f8f65aa32/bsd/netinet/tcp.h#L215 + TCP_KEEP_ALIVE = 0x10 + socket_options.append((socket.IPPROTO_TCP, TCP_KEEP_ALIVE, 30)) + + client.rest_client.pool_manager.connection_pool_kw[ + "socket_options" + ] = socket_options + + def convert_manifest_to_model( manifest: Union[Path, str, KubernetesManifest], v1_model_name: str ) -> V1KubernetesModel: diff --git a/prefect_kubernetes/worker.py b/prefect_kubernetes/worker.py index ec37d77..6cd46eb 100644 --- a/prefect_kubernetes/worker.py +++ b/prefect_kubernetes/worker.py @@ -148,6 +148,7 @@ _slugify_label_key, _slugify_label_value, _slugify_name, + enable_socket_keep_alive, ) if TYPE_CHECKING: @@ -690,7 +691,7 @@ def _get_configured_kubernetes_client( # if a hard-coded cluster config is provided, use it if configuration.cluster_config: - return kubernetes.config.new_client_from_config_dict( + client = kubernetes.config.new_client_from_config_dict( config_dict=configuration.cluster_config.config, context=configuration.cluster_config.context_name, ) @@ -702,9 +703,16 @@ def _get_configured_kubernetes_client( try: kubernetes.config.load_incluster_config() config = kubernetes.client.Configuration.get_default_copy() - return kubernetes.client.ApiClient(configuration=config) + client = kubernetes.client.ApiClient(configuration=config) except kubernetes.config.ConfigException: - return kubernetes.config.new_client_from_config() + client = kubernetes.config.new_client_from_config() + + if os.environ.get( + "PREFECT_KUBERNETES_WORKER_ADD_TCP_KEEPALIVE", "TRUE" + ).strip().lower() in ("true", "1"): + enable_socket_keep_alive(client) + + return client def _replace_api_key_with_secret( self, configuration: KubernetesWorkerJobConfiguration, client: "ApiClient" diff --git a/tests/test_utilities.py b/tests/test_utilities.py index ed09ab2..1bdcbed 100644 --- a/tests/test_utilities.py +++ b/tests/test_utilities.py @@ -2,18 +2,23 @@ import uuid from typing import Type from unittest import mock +from unittest.mock import MagicMock import kubernetes import pytest import urllib3 from kubernetes.client import models as k8s_models +from kubernetes.config import ConfigException from prefect.infrastructure.kubernetes import KubernetesJob from prefect_kubernetes.utilities import ( ResilientStreamWatcher, convert_manifest_to_model, + enable_socket_keep_alive, ) +FAKE_CLUSTER = "fake-cluster" + base_path = "tests/sample_k8s_resources" sample_deployment_manifest = KubernetesJob.job_from_file( @@ -165,6 +170,25 @@ ) +@pytest.fixture +def mock_cluster_config(monkeypatch): + mock = MagicMock() + # We cannot mock this or the `except` clause will complain + mock.config.ConfigException = ConfigException + mock.list_kube_config_contexts.return_value = ( + [], + {"context": {"cluster": FAKE_CLUSTER}}, + ) + monkeypatch.setattr("kubernetes.config", mock) + monkeypatch.setattr("kubernetes.config.ConfigException", ConfigException) + return mock + + +@pytest.fixture +def mock_api_client(mock_cluster_config): + return MagicMock() + + @pytest.mark.parametrize( "manifest,model_name,expected_model", [ @@ -317,3 +341,14 @@ def my_stream(*args, **kwargs): "log3", "log4", ] + + +def test_keep_alive_updates_socket_options(mock_api_client): + enable_socket_keep_alive(mock_api_client) + + assert ( + mock_api_client.rest_client.pool_manager.connection_pool_kw[ + "socket_options" + ]._mock_set_call + is not None + )