Skip to content

Commit

Permalink
A quick attempt at making taskproc handle migrations from one k8s clu…
Browse files Browse the repository at this point in the history
…ster to another
  • Loading branch information
EvanKrall authored and jfongatyelp committed May 21, 2024
1 parent f3c78c5 commit e1d14a2
Showing 1 changed file with 52 additions and 26 deletions.
78 changes: 52 additions & 26 deletions task_processing/plugins/kubernetes/kubernetes_pod_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from typing import Collection
from typing import Optional

from kubernetes import watch
from kubernetes import watch as kube_watch
from kubernetes.client import V1Affinity
from kubernetes.client import V1Container
from kubernetes.client import V1ContainerPort
Expand Down Expand Up @@ -65,13 +65,20 @@ def __init__(
kubeconfig_path: Optional[str] = None,
task_configs: Optional[Collection[KubernetesTaskConfig]] = [],
emit_events_without_state_transitions: bool = False,
old_kubeconfig_paths: Collection[str] = (),
) -> None:
if not version:
version = "unknown_task_processing"
user_agent = f"{namespace}/v{version}"
self.kube_client = KubeClient(
kubeconfig_path=kubeconfig_path, user_agent=user_agent
)

self.old_kube_clients = [
KubeClient(kubeconfig_path=old_kubeconfig_path, user_agent=user_agent)
for old_kubeconfig_path in old_kubeconfig_paths
]

self.namespace = namespace

# Pod modified events that did not result in a pod state transition are usually not
Expand Down Expand Up @@ -99,17 +106,23 @@ def __init__(

# TODO(TASKPROC-243): keep track of resourceVersion so that we can continue event processing
# from where we left off on restarts
self.watch = watch.Watch()
self.pod_event_watch_thread = threading.Thread(
target=self._pod_event_watch_loop,
# ideally this wouldn't be a daemon thread, but a watch.Watch() only checks
# if it should stop after receiving an event - and it's possible that we
# have periods with no events so instead we'll attempt to stop the watch
# and then join() with a small timeout to make sure that, if we shutdown
# with the thread alive, we did not drop any events
daemon=True,
)
self.pod_event_watch_thread.start()
self.pod_event_watch_threads = []
self.watches = []
for kube_client in [self.kube_client] + self.old_kube_clients:
watch = kube_watch.Watch()
pod_event_watch_thread = threading.Thread(
target=self._pod_event_watch_loop,
args=(kube_client, watch),
# ideally this wouldn't be a daemon thread, but a watch.Watch() only checks
# if it should stop after receiving an event - and it's possible that we
# have periods with no events so instead we'll attempt to stop the watch
# and then join() with a small timeout to make sure that, if we shutdown
# with the thread alive, we did not drop any events
daemon=True,
)
pod_event_watch_thread.start()
self.pod_event_watch_threads.append(pod_event_watch_thread)
self.watches.append(watch)

self.pending_event_processing_thread = threading.Thread(
target=self._pending_event_processing_loop,
Expand All @@ -136,7 +149,9 @@ def _initialize_existing_task(self, task_config: KubernetesTaskConfig) -> None:
),
)

def _pod_event_watch_loop(self) -> None:
def _pod_event_watch_loop(
self, kube_client: KubeClient, watch: kube_watch.Watch
) -> None:
logger.debug(f"Starting watching Pod events for namespace={self.namespace}.")
# TODO(TASKPROC-243): we'll need to correctly handle resourceVersion expiration for the case
# where the gap between task_proc shutting down and coming back up is long enough for data
Expand All @@ -148,8 +163,8 @@ def _pod_event_watch_loop(self) -> None:
# see: https://github.com/kubernetes/kubernetes/issues/74022
while not self.stopping:
try:
for pod_event in self.watch.stream(
self.kube_client.core.list_namespaced_pod, self.namespace
for pod_event in watch.stream(
kube_client.core.list_namespaced_pod, self.namespace
):
# it's possible that we've received an event after we've already set the stop
# flag since Watch streams block forever, so re-check if we've stopped before
Expand All @@ -161,7 +176,7 @@ def _pod_event_watch_loop(self) -> None:
break
except ApiException as e:
if not self.stopping:
if not self.kube_client.maybe_reload_on_exception(exception=e):
if not kube_client.maybe_reload_on_exception(exception=e):
logger.exception(
"Unhandled API exception while watching Pod events - restarting watch!"
)
Expand Down Expand Up @@ -576,11 +591,14 @@ def run(self, task_config: KubernetesTaskConfig) -> Optional[str]:

def reconcile(self, task_config: KubernetesTaskConfig) -> None:
pod_name = task_config.pod_name
try:
pod = self.kube_client.get_pod(namespace=self.namespace, pod_name=pod_name)
except Exception:
logger.exception(f"Hit an exception attempting to fetch pod {pod_name}")
pod = None
for kube_client in [self.kube_client] + self.old_kube_clients:
try:
pod = kube_client.get_pod(namespace=self.namespace, pod_name=pod_name)
except Exception:
logger.exception(f"Hit an exception attempting to fetch pod {pod_name}")
pod = None
else:
break

if pod_name not in self.task_metadata:
self._initialize_existing_task(task_config)
Expand Down Expand Up @@ -627,9 +645,15 @@ def kill(self, task_id: str) -> bool:
This function will request that Kubernetes delete the named Pod and will return
True if the Pod termination request was succesfully emitted or False otherwise.
"""
terminated = self.kube_client.terminate_pod(
namespace=self.namespace,
pod_name=task_id,
# NOTE: we're purposely not removing this task from `task_metadata` as we want
# to handle that with the Watch that we'll set to monitor each Pod for events.
# TODO(TASKPROC-242): actually handle termination events
terminated = any(
kube_client.terminate_pod(
namespace=self.namespace,
pod_name=task_id,
)
for kube_client in [self.kube_client] + self.old_kube_clients
)
if terminated:
logger.info(
Expand Down Expand Up @@ -665,12 +689,14 @@ def stop(self) -> None:
logger.debug("Signaling Pod event Watch to stop streaming events...")
# make sure that we've stopped watching for events before calling join() - otherwise,
# join() will block until we hit the configured timeout (or forever with no timeout).
self.watch.stop()
for watch in self.watches:
watch.stop()
# timeout arbitrarily chosen - we mostly just want to make sure that we have a small
# grace period to flush the current event to the pending_events queue as well as
# any other clean-up - it's possible that after this join() the thread is still alive
# but in that case we can be reasonably sure that we're not dropping any data.
self.pod_event_watch_thread.join(timeout=POD_WATCH_THREAD_JOIN_TIMEOUT_S)
for pod_event_watch_thread in self.pod_event_watch_threads:
pod_event_watch_thread.join(timeout=POD_WATCH_THREAD_JOIN_TIMEOUT_S)

logger.debug("Waiting for all pending PodEvents to be processed...")
# once we've stopped updating the pending events queue, we then wait until we're done
Expand Down

0 comments on commit e1d14a2

Please sign in to comment.