Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cannot close cluster after interrupt (ctrl-c) #444

Closed
cagantomer opened this issue Apr 27, 2022 · 7 comments
Closed

Cannot close cluster after interrupt (ctrl-c) #444

cagantomer opened this issue Apr 27, 2022 · 7 comments

Comments

@cagantomer
Copy link
Contributor

What happened:

I am using KubeCluster for running ad-hoc dask cluster as part of my script.

I am trying to gracefully shut down the cluster in case there was an interrupt (SIGINT / ctrl-C, SIGKILL) with the example below.

When the scripts runs without interrupt, it will correctly clean up the cluster created. When interrupt it, with ctrl-c, the handler catches the exception, it but calling client.cluster.close in the finally clause raises a timeout error (see below). The workers and scheduler pods remain active and not terminated.

What you expected to happen:
The pods of the ad-hoc cluster should terminate and there should not be any exception.

Minimal Complete Verifiable Example:

from pydoc import cli
import time
import signal
import random

from dask_kubernetes import KubeCluster, make_pod_spec
from dask.distributed import Client, wait


def get_client() -> Client:
    """Get a (dask) client for remote"""

    num_workers = 5
    image = "daskdev/dask:2022.3.0-py3.9"

    pod_spec = make_pod_spec(
        image=image,
        memory_request="4G",
        cpu_request=1,
        extra_pod_config={"nodeSelector": {"env": "sw"}},
    )

    name = f"dask-k8s-example-{{uuid}}"        

    cluster = KubeCluster(
        pod_spec, namespace="research", name=name, n_workers=num_workers
    )

    print(
        f"started KubeCluster - dashboard address is {cluster.dashboard_link}, scheduler address: {cluster.scheduler.address}"
    )

    client = Client(cluster)

    # TODO: add timeout (and maybe make this optional)
    client.wait_for_workers(num_workers)

    return client


def double(value: int):
    time.sleep(random.random() * 3)
    return value + value

def square(value: int):
    time.sleep(random.random() * 4)
    return value * value

if __name__ == "__main__":
    client = get_client()
    
    def handle_signal(sig, frame):
        print(f"got a signal {sig} with frame {frame}")
        raise Exception("handled ctrl-c")

    # signal.signal(signal.SIGINT, handle_signal)

    try:
        tasks = []
        for i in range(20):
            ipi = client.submit(double, i)
            isq = client.submit(square, ipi)
            tasks.append(isq)

        print("waiting for tasks to finish...")
        wait(tasks)

        for task in tasks:
            if task.status == "error":
                print("error in task", task)
            else:
                print(f"task {task}. results {task.result()}")
    except KeyboardInterrupt:
        print("ctrl-c (through KeyboardInterrupt)")
    except Exception as ex:
        print(ex)
    finally:
        print("closing...") 
        client.cluster.close()
        client.close()

The output from execution:

python bin/test_dask_kubecluster.py
Creating scheduler pod on cluster. This may take some time.
/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/distributed/client.py:1283: VersionMismatchWarning: Mismatched versions found

+---------+--------+-----------+---------+
| Package | client | scheduler | workers |
+---------+--------+-----------+---------+
| blosc | 1.10.6 | 1.10.2 | None |
| lz4 | None | 3.1.10 | None |
+---------+--------+-----------+---------+
warnings.warn(version_module.VersionMismatchWarning(msg[0]["warning"]))
started KubeCluster - dashboard address is http://localhost:8787/status, scheduler address: tcp://dask-k8s-example-f47db833-b.research:8786
waiting for tasks to finish...
^Cctrl-c (through KeyboardInterrupt)
closing...
2022-04-27 09:13:57,071 - tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOLoop object at 0x110361be0>>, <Task finished name='Task-165' coro=<SpecCluster._correct_state_internal() done, defined at /Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/distributed/deploy/spec.py:314> exception=OSError('Timed out trying to connect to tcp://localhost:56654 after 30 s')>)
Traceback (most recent call last):
File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/distributed/comm/tcp.py", line 444, in connect
stream = await self.client.connect(
File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/tornado/tcpclient.py", line 265, in connect
addrinfo = await self.resolver.resolve(host, port, af)
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "/usr/local/Cellar/[email protected]/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/tasks.py", line 490, in wait_for
return fut.result()
asyncio.exceptions.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/distributed/comm/core.py", line 289, in connect
comm = await asyncio.wait_for(
File "/usr/local/Cellar/[email protected]/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/tasks.py", line 492, in wait_for
raise exceptions.TimeoutError() from exc
asyncio.exceptions.TimeoutError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/tornado/ioloop.py", line 741, in _run_callback
ret = callback()
File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/tornado/ioloop.py", line 765, in _discard_future_result
future.result()
File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/distributed/deploy/spec.py", line 406, in _close
await self._correct_state()
File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/distributed/deploy/spec.py", line 321, in _correct_state_internal
await self.scheduler_comm.retire_workers(workers=list(to_close))
File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/distributed/core.py", line 844, in send_recv_from_rpc
comm = await self.live_comm()
File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/distributed/core.py", line 801, in live_comm
comm = await connect(
File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/distributed/comm/core.py", line 315, in connect
raise OSError(
OSError: Timed out trying to connect to tcp://localhost:56654 after 30 s
Traceback (most recent call last):
File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/distributed/comm/tcp.py", line 444, in connect
stream = await self.client.connect(
File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/tornado/tcpclient.py", line 265, in connect
addrinfo = await self.resolver.resolve(host, port, af)
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "/usr/local/Cellar/[email protected]/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/tasks.py", line 490, in wait_for
return fut.result()
asyncio.exceptions.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/distributed/comm/core.py", line 289, in connect
comm = await asyncio.wait_for(
File "/usr/local/Cellar/[email protected]/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/tasks.py", line 492, in wait_for
raise exceptions.TimeoutError() from exc
asyncio.exceptions.TimeoutError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
File "/Users/tomercagan/dev/nextresearch/bin/test_dask_kubecluster.py", line 79, in
client.cluster.close()
File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/distributed/deploy/cluster.py", line 193, in close
return self.sync(self._close, callback_timeout=timeout)
File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/distributed/utils.py", line 309, in sync
return sync(
File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/distributed/utils.py", line 376, in sync
raise exc.with_traceback(tb)
File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/distributed/utils.py", line 349, in f
result = yield future
File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/tornado/gen.py", line 762, in run
value = future.result()
File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/tornado/ioloop.py", line 741, in _run_callback
ret = callback()
File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/tornado/ioloop.py", line 765, in _discard_future_result
future.result()
File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/distributed/deploy/spec.py", line 406, in _close
await self._correct_state()
File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/distributed/deploy/spec.py", line 321, in _correct_state_internal
await self.scheduler_comm.retire_workers(workers=list(to_close))
File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/distributed/core.py", line 844, in send_recv_from_rpc
comm = await self.live_comm()
File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/distributed/core.py", line 801, in live_comm
comm = await connect(
File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/distributed/comm/core.py", line 315, in connect
raise OSError(
OSError: Timed out trying to connect to tcp://localhost:56654 after 30 s
2022-04-27 09:13:57,171 - distributed.client - ERROR - Failed to reconnect to scheduler after 30.00 seconds, closing client

Anything else we need to know?:

Click ctrl-c once "waiting for tasks to finish..." appears on the screen.

Also, not related, but the dashboard_link always shows localhost instead of something more useful (I recall reading a discussion about it - but it was a while ago).

Environment:

  • Dask version: 2022.3.0
  • dask-kubernetes: 2022.1.0
  • Python version: 3.9.10
  • Operating System: Linux (based on daskdev/dask image)
  • Install method (conda, pip, source): docker / kubernetes
@cagantomer
Copy link
Contributor Author

I can try to work on it though not sure I know enough yet. Is there a "development" guide that I can use to get up and running?

@cagantomer
Copy link
Contributor Author

BTW, as a workaround, I am using the following to "manually" find and delete the nodes:

import pathlib
from kubernetes import client as k8sClient, config

DASK_CLUSTER_NAME_LABEL = "dask.org/cluster-name"
DASK_COMPONENT_LABEL = "dask.org/component"

def load_k8s_config():
    """load config - either from .kube in home directory (for local clients) or in-cluster (for e.g. CI)"""
    if (pathlib.Path.home() / ".kube" / "config").exists():
        config.load_kube_config()
    else:
        config.load_incluster_config()
        
def delete_adhoc_cluster(cluster_name: str, namespace: str):
    """delete the dask cluster created by KubeCluster"""
    load_k8s_config()
    v1 = k8sClient.CoreV1Api()

    pods_to_delete = []
    pods_list = v1.list_namespaced_pod(namespace=namespace)
    for pod in pods_list.items:
        if (
            DASK_CLUSTER_NAME_LABEL in pod.metadata.labels
            and pod.metadata.labels[DASK_CLUSTER_NAME_LABEL] == cluster_name
        ):
            pods_to_delete.append(
                dict(
                    name=pod.metadata.name, 
                    component=pod.metadata.labels.get(DASK_COMPONENT_LABEL, "n/a")
                )
            )

    for pod in pods_to_delete:
        print(f"deleting {pod['name']} ({pod['component']})...")
        v1.delete_namespaced_pod(pod['name'], "research")

@jacobtomlinson
Copy link
Member

Apologies looks like I had typed a response here but forgot to press the button, nice that GitHub saves these things!

Thanks @cagantomer I am able to reproduce this locally. My guess from looking at the traceback is that the cluster manager is unable to connect to the scheduler after things start closing out. Perhaps it's worth exploring the port forwarding code in case it has been closed.

You might also be interested in trying the operator we've been working on which is intended to supersede the current KubeCluster implementation. One of the goals of the operator is to handle lifecycle things like this in a more k8s native way.

@cagantomer
Copy link
Contributor Author

I saw the operator efforts and I kind of have mixed feeling towards it. I am reluctant regarding anything that requires extra installations :-)

I really liked the KubeCluster experience - just having the right permissions to k8s and everything works relatively smoothly...

What are the advantages of using the operator?

@jacobtomlinson
Copy link
Member

I am reluctant regarding anything that requires extra installations :-)

That's totally understandable. I'm optimistic that the couple of extra lines you need to run the first time you use it will be worth it.

just having the right permissions to k8s and everything works relatively smoothly...

We often find that this is rare and folks don't have the right permissions. Particularly creating pods as that can be a large security risk.

What are the advantages of using the operator?

I'll try to enumerate some of the goals:

Switching to Custom Resource Definitions and an Operator hugely simplifies the client-side code and provides much better decoupling. This will help maintainability as the current KubeCluster has grown unweildy.

In multi-user clusters an admin can install the operator once for the cluster and then user's can just happily create Dask clusters.

With the current KubeCluster all state is tied to the instance of the object in Python, which means clean-up or reuse can be unpleasant if the Python process ends unexpectedly. In the new one state lives in k8s and clusters can be created/scaled/deleted in a k8s native way using kubectl in addition to the new experimental KubeCluster.

Many folks use Dask clusters in a multi-stage pipeline, where each stage is a separate Python script or notebook. The current KubeCluster cannot persist between stages, but clusters created with the operator can.

We are also in the process of adding a DaskJob CRD which will behave much like a k8s Job resource but with a Dask cluster attached. This should feel familiar to Kubeflow users who use the various training operators it provides.

The decoupled state means we have been able to implement support for dask-ctl so that clusters can be managed via the CLI/API that provides along with the new CLI dashboard (once we get that finished).

We currently struggle with supporting clusters that have Istio installed due to the way our pods talk directly to each other. The operator aims to solve that by handling the complex services required in a way that is transparent to the user.

The autoscaling logic is moved from the cluster manager to the operator which will also benefit multi-stage pipeline workflows.

@cagantomer
Copy link
Contributor Author

@jacobtomlinson - thanks for the information. Once we are stable with our current implementation, I'll look into trying the operator (with hope the DevOps will help with it 😄)

@jacobtomlinson
Copy link
Member

The classic KubeCluster was removed in #890. All users will need to migrate to the Dask Operator. Closing.

@jacobtomlinson jacobtomlinson closed this as not planned Won't fix, can't repro, duplicate, stale Apr 30, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants