Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Add TCP keepalive to KubernetesWorker (#108)
Browse files Browse the repository at this point in the history
* add keepalive

* fix multi-line string

* formatting

* update changelog

* move socket import

* change to env var

* better env var name

* default true
  • Loading branch information
kevingrismore authored Feb 1, 2024
1 parent f2e4d88 commit 675f6d7
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 6 deletions.
20 changes: 17 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- A `ResilientWatcher` utility class to reconnect Kubernetes client streams on `ProtocolErrors` - [#107](https://github.com/PrefectHQ/prefect-kubernetes/pull/107)
- TCP keepalive option for preventing closure of inactive connections - [#108](https://github.com/PrefectHQ/prefect-kubernetes/pull/108)

### Changed

Expand All @@ -21,6 +21,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Security

## 0.3.3

Released January 23rd, 2024.

### Added

- A `ResilientWatcher` utility class to reconnect Kubernetes client streams on `ProtocolErrors` - [#107](https://github.com/PrefectHQ/prefect-kubernetes/pull/107)

## 0.3.2

Released November 7th, 2023.
Expand Down Expand Up @@ -55,7 +63,6 @@ Released September 18th, 2023.
- Logging recent job events when pod scheduling fails - [#88](https://github.com/PrefectHQ/prefect-kubernetes/pull/88)
- Event logging for pod events - [#91](https://github.com/PrefectHQ/prefect-kubernetes/pull/91)


### Fixed

- `env` handling to allow hard coding of environment variable in base job template - [#94](https://github.com/PrefectHQ/prefect-kubernetes/pull/94)
Expand All @@ -65,6 +72,7 @@ Released September 18th, 2023.
Released July 20th, 2023.

### Changed

- Promoted workers to GA, removed beta disclaimers

## 0.2.10
Expand Down Expand Up @@ -99,6 +107,7 @@ Released May 4th, 2023.
### Added

- Fixed issue where `KubernetesEventReplicator` would not shutdown after a flow-run was completed resulting in new flow-runs not being picked up and the worker hanging on exit. - [#57](https://github.com/PrefectHQ/prefect-kubernetes/pull/57)

## 0.2.6

Released April 28th, 2023.
Expand All @@ -113,7 +122,7 @@ Released April 20th, 2023.

### Added

- `kill_infrastructure` method on `KubernetesWorker` which stops jobs for cancelled flow runs - [#52](https://github.com/PrefectHQ/prefect-kubernetes/pull/52)
- `kill_infrastructure` method on `KubernetesWorker` which stops jobs for cancelled flow runs - [#52](https://github.com/PrefectHQ/prefect-kubernetes/pull/52)

## 0.2.4

Expand Down Expand Up @@ -149,21 +158,25 @@ Released March 24, 2023.
Released February 17, 2023.

### Added

- Sync compatibility for block method calls used by `run_namespaced_job` - [#34](https://github.com/PrefectHQ/prefect-kubernetes/pull/34)

## 0.2.0

Released December 23, 2022.

### Added

- `KubernetesJob` block for running a Kubernetes job from a manifest - [#28](https://github.com/PrefectHQ/prefect-kubernetes/pull/28)
- `run_namespaced_job` flow allowing easy execution of a well-specified `KubernetesJob` block on a cluster specified by `KubernetesCredentials` - [#28](https://github.com/PrefectHQ/prefect-kubernetes/pull/28)
- `convert_manifest_to_model` utility function for converting a Kubernetes manifest to a model object - [#28](https://github.com/PrefectHQ/prefect-kubernetes/pull/28)

## 0.1.0

Released November 21, 2022.

### Added

- `KubernetesCredentials` block for generating authenticated Kubernetes clients - [#19](https://github.com/PrefectHQ/prefect-kubernetes/pull/19)
- Tasks for interacting with `job` resources: `create_namespaced_job`, `delete_namespaced_job`, `list_namespaced_job`, `patch_namespaced_job`, `read_namespaced_job`, `replace_namespaced_job` - [#19](https://github.com/PrefectHQ/prefect-kubernetes/pull/19)
- Tasks for interacting with `pod` resources: `create_namespaced_pod`, `delete_namespaced_pod`, `list_namespaced_pod`, `patch_namespaced_pod`, `read_namespaced_pod`, `read_namespaced_pod_logs`, `replace_namespaced_pod` - [#21](https://github.com/PrefectHQ/prefect-kubernetes/pull/21)
Expand All @@ -173,4 +186,5 @@ Released November 21, 2022.
- Tasks for interacting with `deployment` resources: `create_namespaced_deployment`, `delete_namespaced_deployment`, `list_namespaced_deployment`, `patch_namespaced_deployment`, `read_namespaced_deployment`, `replace_namespaced_deployment` - [#25](https://github.com/PrefectHQ/prefect-kubernetes/pull/25)

### Changed

- `KubernetesCredentials` block to use a single `get_client` method capable of creating all resource-specific client types - [#21](https://github.com/PrefectHQ/prefect-kubernetes/pull/21)
33 changes: 33 additions & 0 deletions prefect_kubernetes/utilities.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
""" Utilities for working with the Python Kubernetes API. """
import logging
import socket
import sys
import time
from pathlib import Path
from typing import Callable, List, Optional, Set, Type, TypeVar, Union

import urllib3
from kubernetes import watch
from kubernetes.client import ApiClient
from kubernetes.client import models as k8s_models
from prefect.infrastructure.kubernetes import KubernetesJob, KubernetesManifest
from slugify import slugify
Expand Down Expand Up @@ -35,6 +38,36 @@ def add(self, value):
super().add(value)


def enable_socket_keep_alive(client: ApiClient) -> None:
"""
Setting the keep-alive flags on the kubernetes client object.
Unfortunately neither the kubernetes library nor the urllib3 library which
kubernetes is using internally offer the functionality to enable keep-alive
messages. Thus the flags are added to be used on the underlying sockets.
"""

socket_options = [(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)]

if hasattr(socket, "TCP_KEEPINTVL"):
socket_options.append((socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 30))

if hasattr(socket, "TCP_KEEPCNT"):
socket_options.append((socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 6))

if hasattr(socket, "TCP_KEEPIDLE"):
socket_options.append((socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 6))

if sys.platform == "darwin":
# TCP_KEEP_ALIVE not available on socket module in macOS, but defined in
# https://github.com/apple/darwin-xnu/blob/2ff845c2e033bd0ff64b5b6aa6063a1f8f65aa32/bsd/netinet/tcp.h#L215
TCP_KEEP_ALIVE = 0x10
socket_options.append((socket.IPPROTO_TCP, TCP_KEEP_ALIVE, 30))

client.rest_client.pool_manager.connection_pool_kw[
"socket_options"
] = socket_options


def convert_manifest_to_model(
manifest: Union[Path, str, KubernetesManifest], v1_model_name: str
) -> V1KubernetesModel:
Expand Down
14 changes: 11 additions & 3 deletions prefect_kubernetes/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@
_slugify_label_key,
_slugify_label_value,
_slugify_name,
enable_socket_keep_alive,
)

if TYPE_CHECKING:
Expand Down Expand Up @@ -690,7 +691,7 @@ def _get_configured_kubernetes_client(

# if a hard-coded cluster config is provided, use it
if configuration.cluster_config:
return kubernetes.config.new_client_from_config_dict(
client = kubernetes.config.new_client_from_config_dict(
config_dict=configuration.cluster_config.config,
context=configuration.cluster_config.context_name,
)
Expand All @@ -702,9 +703,16 @@ def _get_configured_kubernetes_client(
try:
kubernetes.config.load_incluster_config()
config = kubernetes.client.Configuration.get_default_copy()
return kubernetes.client.ApiClient(configuration=config)
client = kubernetes.client.ApiClient(configuration=config)
except kubernetes.config.ConfigException:
return kubernetes.config.new_client_from_config()
client = kubernetes.config.new_client_from_config()

if os.environ.get(
"PREFECT_KUBERNETES_WORKER_ADD_TCP_KEEPALIVE", "TRUE"
).strip().lower() in ("true", "1"):
enable_socket_keep_alive(client)

return client

def _replace_api_key_with_secret(
self, configuration: KubernetesWorkerJobConfiguration, client: "ApiClient"
Expand Down
35 changes: 35 additions & 0 deletions tests/test_utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,23 @@
import uuid
from typing import Type
from unittest import mock
from unittest.mock import MagicMock

import kubernetes
import pytest
import urllib3
from kubernetes.client import models as k8s_models
from kubernetes.config import ConfigException
from prefect.infrastructure.kubernetes import KubernetesJob

from prefect_kubernetes.utilities import (
ResilientStreamWatcher,
convert_manifest_to_model,
enable_socket_keep_alive,
)

FAKE_CLUSTER = "fake-cluster"

base_path = "tests/sample_k8s_resources"

sample_deployment_manifest = KubernetesJob.job_from_file(
Expand Down Expand Up @@ -165,6 +170,25 @@
)


@pytest.fixture
def mock_cluster_config(monkeypatch):
mock = MagicMock()
# We cannot mock this or the `except` clause will complain
mock.config.ConfigException = ConfigException
mock.list_kube_config_contexts.return_value = (
[],
{"context": {"cluster": FAKE_CLUSTER}},
)
monkeypatch.setattr("kubernetes.config", mock)
monkeypatch.setattr("kubernetes.config.ConfigException", ConfigException)
return mock


@pytest.fixture
def mock_api_client(mock_cluster_config):
return MagicMock()


@pytest.mark.parametrize(
"manifest,model_name,expected_model",
[
Expand Down Expand Up @@ -317,3 +341,14 @@ def my_stream(*args, **kwargs):
"log3",
"log4",
]


def test_keep_alive_updates_socket_options(mock_api_client):
enable_socket_keep_alive(mock_api_client)

assert (
mock_api_client.rest_client.pool_manager.connection_pool_kw[
"socket_options"
]._mock_set_call
is not None
)

0 comments on commit 675f6d7

Please sign in to comment.