Skip to content

Commit

Permalink
Add Autoscaling to Operator (#451)
Browse files Browse the repository at this point in the history
  • Loading branch information
Matt711 authored Aug 24, 2022
1 parent 4a69e3a commit 8bd5083
Show file tree
Hide file tree
Showing 16 changed files with 422 additions and 11 deletions.
8 changes: 6 additions & 2 deletions .github/workflows/helmcluster.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ jobs:
matrix:
python-version: ["3.8", "3.9", "3.10"]

env:
KUBECONFIG: .pytest-kind/pytest-kind/kubeconfig

steps:
- uses: actions/checkout@v2
- uses: actions/setup-python@v2
Expand All @@ -30,6 +33,7 @@ jobs:
- name: Install deps
run: ./ci/install-deps.sh
- name: Run tests
env:
KUBECONFIG: .pytest-kind/pytest-kind/kubeconfig
run: pytest dask_kubernetes/common/tests dask_kubernetes/helm/tests
- name: Debug k8s resources
if: success() || failure()
run: kubectl get all -A
8 changes: 6 additions & 2 deletions .github/workflows/kubecluster.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ jobs:
matrix:
python-version: ["3.8", "3.9", "3.10"]

env:
KUBECONFIG: .pytest-kind/pytest-kind/kubeconfig

steps:
- uses: actions/checkout@v2
- uses: actions/setup-python@v2
Expand All @@ -30,6 +33,7 @@ jobs:
- name: Install deps
run: ./ci/install-deps.sh
- name: Run tests
env:
KUBECONFIG: .pytest-kind/pytest-kind/kubeconfig
run: pytest dask_kubernetes/common/tests dask_kubernetes/classic/tests
- name: Debug k8s resources
if: success() || failure()
run: kubectl get all -A
7 changes: 6 additions & 1 deletion .github/workflows/operator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ jobs:
matrix:
python-version: ["3.8", "3.9", "3.10"]

env:
KUBECONFIG: .pytest-kind/pytest-kind/kubeconfig

steps:
- uses: actions/checkout@v2
- uses: actions/setup-python@v2
Expand All @@ -33,6 +36,8 @@ jobs:
run: ./ci/install-deps.sh
- name: Run tests
env:
KUBECONFIG: .pytest-kind/pytest-kind/kubeconfig
TEST_ISTIO: "true"
run: pytest dask_kubernetes/common/tests dask_kubernetes/operator/tests dask_kubernetes/experimental/tests
- name: Debug k8s resources
if: success() || failure()
run: kubectl get all -A
1 change: 1 addition & 0 deletions ci/pre-commit-crd.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ def main(version, *args):
run_action("daskcluster.yaml", temp_dir, crd_path, output_paths)
run_action("daskworkergroup.yaml", temp_dir, crd_path, output_paths)
run_action("daskjob.yaml", temp_dir, crd_path, output_paths)
run_action("daskautoscaler.yaml", temp_dir, crd_path, output_paths)

else:
run_action(changed_file, temp_dir, crd_path, output_paths)
Expand Down
2 changes: 1 addition & 1 deletion dask_kubernetes/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def customresources(k8s_cluster):
temp_dir = tempfile.TemporaryDirectory()
crd_path = os.path.join(DIR, "operator", "customresources")

for crd in ["daskcluster", "daskworkergroup", "daskjob"]:
for crd in ["daskcluster", "daskworkergroup", "daskjob", "daskautoscaler"]:
run_generate(
os.path.join(crd_path, f"{crd}.yaml"),
os.path.join(crd_path, f"{crd}.patch.yaml"),
Expand Down
67 changes: 62 additions & 5 deletions dask_kubernetes/experimental/kubecluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,15 @@ async def _scale(self, n, worker_group="default"):
custom_objects_api.api_client.set_default_header(
"content-type", "application/merge-patch+json"
)
# Disable adaptivity if enabled
with suppress(kubernetes.client.ApiException):
await custom_objects_api.delete_namespaced_custom_object(
group="kubernetes.dask.org",
version="v1",
plural="daskautoscalers",
namespace=self.namespace,
name=self.name,
)
await custom_objects_api.patch_namespaced_custom_object_scale(
group="kubernetes.dask.org",
version="v1",
Expand All @@ -500,11 +509,59 @@ async def _scale(self, n, worker_group="default"):
body={"spec": {"replicas": n}},
)

def adapt(self, *args, **kwargs):
"""Turn on adaptivity"""
raise NotImplementedError(
"Adaptive mode is not supported yet for this KubeCluster."
)
def adapt(self, minimum=None, maximum=None):
"""Turn on adaptivity
Parameters
----------
minimum : int
Minimum number of workers
minimum : int
Maximum number of workers
Examples
--------
>>> cluster.adapt() # Allow scheduler to add/remove workers within k8s cluster resource limits
>>> cluster.adapt(minimum=1, maximum=10) # Allow scheduler to add/remove workers within 1-10 range
"""
return self.sync(self._adapt, minimum, maximum)

async def _adapt(self, minimum=None, maximum=None):
async with kubernetes.client.api_client.ApiClient() as api_client:
custom_objects_api = kubernetes.client.CustomObjectsApi(api_client)
custom_objects_api.api_client.set_default_header(
"content-type", "application/merge-patch+json"
)
try:
await custom_objects_api.patch_namespaced_custom_object_scale(
group="kubernetes.dask.org",
version="v1",
plural="daskautoscalers",
namespace=self.namespace,
name=self.name,
body={"spec": {"minimum": minimum, "maximum": maximum}},
)
except kubernetes.client.ApiException:
await custom_objects_api.create_namespaced_custom_object(
group="kubernetes.dask.org",
version="v1",
plural="daskautoscalers",
namespace=self.namespace,
body={
"apiVersion": "kubernetes.dask.org/v1",
"kind": "DaskAutoscaler",
"metadata": {
"name": self.name,
"dask.org/cluster-name": self.cluster_name,
"dask.org/component": "autoscaler",
},
"spec": {
"cluster": self.cluster_name,
"minimum": minimum,
"maximum": maximum,
},
},
)

def _build_scheduler_spec(self, cluster_name):
# TODO: Take the values provided in the current class constructor
Expand Down
16 changes: 16 additions & 0 deletions dask_kubernetes/experimental/tests/test_kubecluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,19 @@ def test_additional_worker_groups(kopf_runner, docker_image):
def test_cluster_without_operator(docker_image):
with pytest.raises(TimeoutError, match="is the Dask Operator running"):
KubeCluster(name="noop", n_workers=1, image=docker_image, resource_timeout=1)


def test_adapt(kopf_runner, docker_image):
with kopf_runner:
with KubeCluster(
name="adaptive",
image=docker_image,
n_workers=0,
) as cluster:
cluster.adapt(minimum=0, maximum=1)
with Client(cluster) as client:
assert client.submit(lambda x: x + 1, 10).result() == 11

# Need to clean up the DaskAutoscaler object
# See https://github.com/dask/dask-kubernetes/issues/546
cluster.scale(0)
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
[]
37 changes: 37 additions & 0 deletions dask_kubernetes/operator/customresources/daskautoscaler.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: daskautoscalers.kubernetes.dask.org
spec:
scope: Namespaced
group: kubernetes.dask.org
names:
kind: DaskAutoscaler
plural: daskautoscalers
singular: daskautoscaler
versions:
- name: v1
served: true
storage: true
schema:
openAPIV3Schema:
type: object
properties:
spec:
type: object
required:
- cluster
- minimum
- maximum
properties:
cluster:
type: string
description: Name of the cluster to associate this worker group with
minimum:
type: integer
description: Minimum number of workers
maximum:
type: integer
description: Maximum number of workers
status:
type: object
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: daskautoscalers.kubernetes.dask.org
spec:
group: kubernetes.dask.org
names:
kind: DaskAutoscaler
plural: daskautoscalers
singular: daskautoscaler
scope: Namespaced
versions:
- name: v1
schema:
openAPIV3Schema:
properties:
spec:
properties:
cluster:
description: Name of the cluster to associate this worker group with
type: string
maximum:
description: Maximum number of workers
type: integer
minimum:
description: Minimum number of workers
type: integer
required:
- cluster
- minimum
- maximum
type: object
status:
type: object
type: object
served: true
storage: true
37 changes: 37 additions & 0 deletions dask_kubernetes/operator/deployment/manifests/daskautoscaler.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: daskautoscalers.kubernetes.dask.org
spec:
group: kubernetes.dask.org
names:
kind: DaskAutoscaler
plural: daskautoscalers
singular: daskautoscaler
scope: Namespaced
versions:
- name: v1
schema:
openAPIV3Schema:
properties:
spec:
properties:
cluster:
description: Name of the cluster to associate this worker group with
type: string
maximum:
description: Maximum number of workers
type: integer
minimum:
description: Minimum number of workers
type: integer
required:
- cluster
- minimum
- maximum
type: object
status:
type: object
type: object
served: true
storage: true
Loading

0 comments on commit 8bd5083

Please sign in to comment.