Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KubeCluster Client –> Scheduler: missing port number in address 'kubernetes' #405

Closed
haf opened this issue Feb 10, 2022 · 5 comments
Closed

Comments

@haf
Copy link

haf commented Feb 10, 2022

What happened:

When spawning a new k8s cluster, I get these errors:

run-mmm-dask-27408369-swmpw run-mmm CRITICAL:our_package.main:Running of program failed: missing port number in address 'kubernetes'
run-mmm-dask-27408369-swmpw run-mmm Traceback (most recent call last):
run-mmm-dask-27408369-swmpw run-mmm   File "/venv/lib/python3.9/site-packages/our_package/main.py", line 84, in main
run-mmm-dask-27408369-swmpw run-mmm     return run(config, sem)
run-mmm-dask-27408369-swmpw run-mmm   File "/venv/lib/python3.9/site-packages/our_package/main.py", line 70, in run
run-mmm-dask-27408369-swmpw run-mmm     return run_dask(config)
run-mmm-dask-27408369-swmpw run-mmm   File "/venv/lib/python3.9/site-packages/our_package/impl_dask.py", line 404, in run_dask
run-mmm-dask-27408369-swmpw run-mmm     client = create_client(config)
run-mmm-dask-27408369-swmpw run-mmm   File "/venv/lib/python3.9/site-packages/our_package/impl_dask.py", line 257, in create_client
run-mmm-dask-27408369-swmpw run-mmm     return create_kube(config)
run-mmm-dask-27408369-swmpw run-mmm   File "/venv/lib/python3.9/site-packages/our_package/impl_dask.py", line 246, in create_kube
run-mmm-dask-27408369-swmpw run-mmm     return Client(k)
run-mmm-dask-27408369-swmpw run-mmm   File "/venv/lib/python3.9/site-packages/distributed/client.py", line 923, in __init__
run-mmm-dask-27408369-swmpw run-mmm     self.start(timeout=timeout)
run-mmm-dask-27408369-swmpw run-mmm   File "/venv/lib/python3.9/site-packages/distributed/client.py", line 1081, in start
run-mmm-dask-27408369-swmpw run-mmm     sync(self.loop, self._start, **kwargs)
run-mmm-dask-27408369-swmpw run-mmm   File "/venv/lib/python3.9/site-packages/distributed/utils.py", line 363, in sync
run-mmm-dask-27408369-swmpw run-mmm     raise exc.with_traceback(tb)
run-mmm-dask-27408369-swmpw run-mmm Traceback (most recent call last):
run-mmm-dask-27408369-swmpw run-mmm   File "/usr/local/lib/python3.9/runpy.py", line 197, in _run_module_as_main
run-mmm-dask-27408369-swmpw run-mmm     return _run_code(code, main_globals, None,
run-mmm-dask-27408369-swmpw run-mmm   File "/usr/local/lib/python3.9/runpy.py", line 87, in _run_code
run-mmm-dask-27408369-swmpw run-mmm     exec(code, run_globals)
run-mmm-dask-27408369-swmpw run-mmm   File "/venv/lib/python3.9/site-packages/our_package/main.py", line 91, in <module>
run-mmm-dask-27408369-swmpw run-mmm   File "/venv/lib/python3.9/site-packages/distributed/utils.py", line 348, in f
run-mmm-dask-27408369-swmpw run-mmm     result[0] = yield future
run-mmm-dask-27408369-swmpw run-mmm   File "/venv/lib/python3.9/site-packages/tornado/gen.py", line 762, in run
run-mmm-dask-27408369-swmpw run-mmm     value = future.result()
run-mmm-dask-27408369-swmpw run-mmm   File "/venv/lib/python3.9/site-packages/distributed/client.py", line 1173, in _start
run-mmm-dask-27408369-swmpw run-mmm     await self._ensure_connected(timeout=timeout)
run-mmm-dask-27408369-swmpw run-mmm     sys.exit(main(sys.argv[1:]))
run-mmm-dask-27408369-swmpw run-mmm   File "/venv/lib/python3.9/site-packages/our_package/main.py", line 87, in main
run-mmm-dask-27408369-swmpw run-mmm   File "/venv/lib/python3.9/site-packages/distributed/client.py", line 1232, in _ensure_connected
run-mmm-dask-27408369-swmpw run-mmm     comm = await connect(
run-mmm-dask-27408369-swmpw run-mmm   File "/venv/lib/python3.9/site-packages/distributed/comm/core.py", line 289, in connect
run-mmm-dask-27408369-swmpw run-mmm     comm = await asyncio.wait_for(
run-mmm-dask-27408369-swmpw run-mmm   File "/usr/local/lib/python3.9/asyncio/tasks.py", line 479, in wait_for
run-mmm-dask-27408369-swmpw run-mmm     return fut.result()
run-mmm-dask-27408369-swmpw run-mmm   File "/venv/lib/python3.9/site-packages/distributed/comm/tcp.py", line 405, in connect
run-mmm-dask-27408369-swmpw run-mmm     ip, port = parse_host_port(address)
run-mmm-dask-27408369-swmpw run-mmm   File "/venv/lib/python3.9/site-packages/distributed/comm/addressing.py", line 95, in parse_host_port
run-mmm-dask-27408369-swmpw run-mmm     port = _default()
run-mmm-dask-27408369-swmpw run-mmm   File "/venv/lib/python3.9/site-packages/distributed/comm/addressing.py", line 73, in _default
run-mmm-dask-27408369-swmpw run-mmm     raise ValueError(f"missing port number in address {address!r}")

With versions:


[[package]]
name = "dask-kubernetes"
version = "2022.1.0"
description = "Native Kubernetes integration for Dask"
category = "main"
optional = false
python-versions = ">=3.7"

What you expected to happen:

I haven't declared kubernetes anywhere, so I don't expect it to crash on it.

I've disabled istio injection, so it should not be a problem with istio.

When looking at the pods, both a worker and a scheduler is created in the cluster and they connect to each other (looking at their logs).

The code is running from the same cluster as it's deploying to.

Minimal Complete Verifiable Example:

from os import getenv
from dask import config as dconfig
from dask.distributed import Client, LocalCluster, as_completed
from dask_kubernetes import KubeCluster
from distributed import Future

def create_kube(config):
    env = [
        {
            "name": "K8S_NODE_NAME",
            "valueFrom": {"fieldRef": {"fieldPath": "spec.nodeName"}},
        },
        {
            "name": "K8S_POD_NAME",
            "valueFrom": {"fieldRef": {"fieldPath": "metadata.name"}},
        },
        {
            "name": "EXAMPLE",
            "valueFrom": {
                "secretKeyRef": {
                    "name": "analytics-pguser-modelruns",
                    "key": "host",
                },
            },
        },
    ]

    # v1/Pod spec
    pod_template = {
        "metadata": {
            "annotations": {
                "sidecar.istio.io/inject": "false",
            },
            "labels": {
                "app": "run-mmm",
                "component": "worker",
                "via": "DaskCluster",
            },
        },
        "spec": {
            "serviceAccountName": "dask-worker",
            "tolerations": [
                {
                    "key": "dedicated",
                    "operator": "Equal",
                    "value": "dask",
                    "effect": "NoSchedule",
                },
            ],
            "affinity": {
                "nodeAffinity": {
                    "requiredDuringSchedulingIgnoredDuringExecution": {
                        "nodeSelectorTerms": [
                            {
                                "matchExpressions": [
                                    {
                                        "key": "dedicated",
                                        "operator": "In",
                                        "values": ["dask"],
                                    },
                                ]
                            },
                        ]
                    }
                }
            },
            "containers": [
                {
                    "name": "worker",
                    "image": config.dask.kube_image,
                    "args": [
                        "dask-worker",
                        "$(DASK_SCHEDULER_ADDRESS)",
                        "--no-dashboard",
                        "--nthreads",
                        "40",
                        "--nprocs",
                        "1",
                        "--death-timeout",
                        "60",
                        "--memory-limit",
                        "64GB",
                    ],
                    "env": env,
                    "resources": {
                        "requests": {
                            "cpu": "15000m",
                            "memory": "60G",
                        },
                        "limits": {
                            "cpu": "16000m",
                            "memory": "64G",
                        },
                    },
                }
            ],
            "restartPolicy": "Never",
        },
    }

    scheduler_pod_template = {
        **pod_template,
        "metadata": {
            "annotations": {
                "sidecar.istio.io/inject": "false",
            },
            "labels": {
                "app": "run-mmm",
                "component": "scheduler",
                "via": "DaskCluster",
            },
        },
        "tolerations": [],
        "affinity": None,
        "spec": {
            **pod_template["spec"],
            "containers": [
                {
                    "name": "scheduler",
                    "image": config.dask.kube_image,
                    "args": [
                        "dask-scheduler",
                        "--port",
                        "8786",
                        "--bokeh-port",
                        "8787",
                    ],
                    "resources": {
                        "requests": {
                            "cpu": "100m",
                            "memory": "256Mi",
                        },
                        "limits": {
                            "cpu": "2",
                            "memory": "2Gi",
                        },
                    },
                }
            ],
        },
    }

    k = KubeCluster(
        pod_template,
        name=getenv("K8S_POD_NAME", default="run-mmm-dask"),
        namespace=config.dask.kube_ns,
        scheduler_service_wait_timeout=60 * 10,
        scheduler_pod_template=scheduler_pod_template,
    )

    # k.adapt(minimum=config.dask.kube_min, maximum=config.dask.kube_max)
    k.scale(3)

    logger.info(f"Using scheduler_address={k.scheduler_address}")

    logger.info("Using Dask config:")
    for k, v in dconfig.config.items():
        logger.info(f"{str(k)}={str(v)}")

    return Client(k)

Anything else we need to know?:

Environment:


[[package]]
name = "dask"
version = "2022.1.1"
description = "Parallel PyData with Task Scheduling"
category = "main"
optional = false
python-versions = ">=3.7"

  • Python version: FROM python:3.9-slim as base
FROM python:3.9-slim as base

ENV VIRTUAL_ENV=/venv \
  PIP_NO_CACHE_DIR=1 \
  PIP_DISABLE_PIP_VERSION_CHECK=1 \
  PIP_DEFAULT_TIMEOUT=100 \
  BUILD_ESSENTIAL_VERSION=12.9

WORKDIR /app

# We're putting the installation of these packages in the base image because cmdstanpy
# actually compiles the models at runtime rather than at compile time. While this makes
# for more lax security, this container is not expected to listen to user-supplied
# inputs.
RUN apt-get update \
    && DEBIAN_FRONTEND=noninteractive apt-get install -y --no-install-recommends \
      tzdata \
      ca-certificates \
      build-essential=${BUILD_ESSENTIAL_VERSION} \
      curl \
      python3-dev \
      libssl-dev \
    && rm -rf /var/lib/apt/lists/*


FROM base as builder

RUN python -m venv ${VIRTUAL_ENV}
ENV PATH="$VIRTUAL_ENV/bin:$PATH" \
  CMDSTANPY_VERSION=1.0.0

# When we install cmdstanpy here, we're also installing e.g. ujson, pandas and numpy
# because cmdstanpy depends on those; but then later, when installing the poetry-built
# wheels, we'll uninstall them and install their locked versions. This is a trade-off
# with running cmdstanpy.install after the wheels which is much worse from a cache-busting
# perspective.
RUN pip install --upgrade "cmdstanpy==${CMDSTANPY_VERSION}" wheel

COPY cmdstanpy-install.py ./
RUN HOME=/app python cmdstanpy-install.py

RUN echo VIRTUAL_ENV=${VIRTUAL_ENV}

# deps
COPY ./deps/*.whl ./
RUN pip install *.whl && rm -rf *.whl

COPY ./*.whl ./
RUN pip install *.whl && rm -rf *.whl


FROM base as final

ARG COMMIT_SHA
ARG COMMIT_REF

ENV HOME=/app

COPY --from=builder /venv /venv
COPY --from=builder ${HOME}/.cmdstan ${HOME}/.cmdstan

ENV COMMIT_SHA=${COMMIT_SHA} \
  COMMIT_REF=${COMMIT_REF} \
  PATH="${VIRTUAL_ENV}/bin:${PATH}"

RUN echo VIRTUAL_ENV=${VIRTUAL_ENV} PATH=${PATH}

# main entrypoint
ENTRYPOINT ["python", "-m", "our_package.main"]
CMD []
@jacobtomlinson
Copy link
Member

It looks like the DASK_SCHEDULER_ADDRESS variable may not be getting set correctly and is ending up as the string kubernetes. We will take a look!

@Matt711
Copy link
Member

Matt711 commented Feb 10, 2022

Hi @haf, can you tell us what value you used for the config argument?

@haf
Copy link
Author

haf commented Feb 11, 2022

@Matt711 Sure, the config.dask.kube_image is a private image you can't access built with the above Dockerfile, and the config.dask.kube_ns = 'flows'

@Matt711
Copy link
Member

Matt711 commented Feb 22, 2022

Hi @haf, sorry for the delay on this. I am trying to build you Dockerfile to reproduce your problem, but I am missing multiple python files. I'm not sure if those are files that you can provide. Or maybe you have another way I can reproduce what you're getting?

@jacobtomlinson
Copy link
Member

The classic KubeCluster was removed in #890. All users will need to migrate to the Dask Operator. Closing.

@jacobtomlinson jacobtomlinson closed this as not planned Won't fix, can't repro, duplicate, stale Apr 30, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants