Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: using rayjobs #1392

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ spec:
httpGet:
path: /liveness
port: http
initialDelaySeconds: 60
periodSeconds: 20
readinessProbe:
httpGet:
path: /readiness
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
apiVersion: v1
kind: ConfigMap
metadata:
name: ray-job-code-sample
data:
sample_code.py: |
import ray
import os
import requests

ray.init()

@ray.remote
class Counter:
def __init__(self):
# Used to verify runtimeEnv
self.name = os.getenv("counter_name")
assert self.name == "test_counter"
self.counter = 0

def inc(self):
self.counter += 1

def get_counter(self):
return "{} got {}".format(self.name, self.counter)

counter = Counter.remote()

for _ in range(5):
ray.get(counter.inc.remote())
print(ray.get(counter.get_counter.remote()))

# Verify that the correct runtime env was used for the job.
assert requests.__version__ == "2.26.0"

Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ rules:
- ray.io
resources:
- rayclusters
- rayjobs
verbs:
- create
- delete
Expand Down Expand Up @@ -48,4 +49,4 @@ roleRef:
subjects:
- kind: ServiceAccount
name: ray-cluster-sa
{{- end }}
{{- end }}
Original file line number Diff line number Diff line change
Expand Up @@ -522,3 +522,117 @@ data:
- key: iptables.sh
path: iptables.sh
{{- end }}
rayjobtemplate.yaml: |
apiVersion: ray.io/v1
kind: RayJob
metadata:
name: rayjob-sample
spec:
# submissionMode specifies how RayJob submits the Ray job to the RayCluster.
# The default value is "K8sJobMode", meaning RayJob will submit the Ray job via a submitter Kubernetes Job.
# The alternative value is "HTTPMode", indicating that KubeRay will submit the Ray job by sending an HTTP request to the RayCluster.
# submissionMode: "K8sJobMode"
entrypoint: python /home/ray/samples/sample_code.py
# shutdownAfterJobFinishes specifies whether the RayCluster should be deleted after the RayJob finishes. Default is false.
shutdownAfterJobFinishes: true

# ttlSecondsAfterFinished specifies the number of seconds after which the RayCluster will be deleted after the RayJob finishes.
ttlSecondsAfterFinished: 10

# activeDeadlineSeconds is the duration in seconds that the RayJob may be active before
# KubeRay actively tries to terminate the RayJob; value must be positive integer.
# activeDeadlineSeconds: 120

# RuntimeEnvYAML represents the runtime environment configuration provided as a multi-line YAML string.
# See https://docs.ray.io/en/latest/ray-core/handling-dependencies.html for details.
# (New in KubeRay version 1.0.)
runtimeEnvYAML: |
pip:
- requests==2.26.0
- pendulum==2.1.2
env_vars:
counter_name: "test_counter"

# Suspend specifies whether the RayJob controller should create a RayCluster instance.
# If a job is applied with the suspend field set to true, the RayCluster will not be created and we will wait for the transition to false.
# If the RayCluster is already created, it will be deleted. In the case of transition to false, a new RayCluste rwill be created.
# suspend: false

# rayClusterSpec specifies the RayCluster instance to be created by the RayJob controller.
rayClusterSpec:
rayVersion: '2.30.0' # should match the Ray version in the image of the containers
# Ray head pod template
headGroupSpec:
# The `rayStartParams` are used to configure the `ray start` command.
# See https://github.com/ray-project/kuberay/blob/master/docs/guidance/rayStartParams.md for the default settings of `rayStartParams` in KubeRay.
# See https://docs.ray.io/en/latest/cluster/cli.html#ray-start for all available options in `rayStartParams`.
rayStartParams:
dashboard-host: '0.0.0.0'
#pod template
template:
spec:
containers:
- name: ray-head
image: icr.io/quantum-public/qiskit-serverless/ray-node:0.12.0-py310
ports:
- containerPort: 6379
name: gcs-server
- containerPort: 8265 # Ray dashboard
name: dashboard
- containerPort: 10001
name: client
resources:
limits:
cpu: "1"
requests:
cpu: "200m"
volumeMounts:
- mountPath: /home/ray/samples
name: code-sample
volumes:
# You set volumes at the Pod level, then mount them into containers inside that Pod
- name: code-sample
configMap:
# Provide the name of the ConfigMap you want to mount.
name: ray-job-code-sample
# An array of keys from the ConfigMap to create as files
items:
- key: sample_code.py
path: sample_code.py
workerGroupSpecs:
# the pod replicas in this group typed worker
- replicas: 1
minReplicas: 1
maxReplicas: 5
# logical group name, for this called small-group, also can be functional
groupName: small-group
# The `rayStartParams` are used to configure the `ray start` command.
# See https://github.com/ray-project/kuberay/blob/master/docs/guidance/rayStartParams.md for the default settings of `rayStartParams` in KubeRay.
# See https://docs.ray.io/en/latest/cluster/cli.html#ray-start for all available options in `rayStartParams`.
rayStartParams: {}
#pod template
template:
spec:
containers:
- name: ray-worker # must consist of lower case alphanumeric characters or '-', and must start and end with an alphanumeric character (e.g. 'my-name', or '123-abc'
image: icr.io/quantum-public/qiskit-serverless/ray-node:0.12.0-py310
lifecycle:
preStop:
exec:
command: [ "/bin/sh","-c","ray stop" ]
resources:
limits:
cpu: "1"
requests:
cpu: "200m"
# SubmitterPodTemplate is the template for the pod that will run the `ray job submit` command against the RayCluster.
# If SubmitterPodTemplate is specified, the first container is assumed to be the submitter container.
# submitterPodTemplate:
# spec:
# restartPolicy: Never
# containers:
# - name: my-custom-rayjob-submitter-pod
# image: rayproject/ray:2.9.0
# # If Command is not specified, the correct command will be supplied at runtime using the RayJob spec `entrypoint` field.
# # Specifying Command is not recommended.
# # command: ["sh", "-c", "ray job submit --address=http://$RAY_DASHBOARD_ADDRESS --submission-id=$RAY_JOB_SUBMISSION_ID -- echo hello world"]
4 changes: 2 additions & 2 deletions charts/qiskit-serverless/charts/gateway/templates/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ rules:
- ray.io
resources:
- rayclusters
- rayjobs
- rayjobs/status
verbs:
- create
- delete
Expand All @@ -32,5 +34,3 @@ rules:
- get
- list
{{- end }}


49 changes: 28 additions & 21 deletions gateway/api/management/commands/update_jobs_statuses.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
from django.core.management.base import BaseCommand

from api.models import Job
from api.ray import get_job_handler
from api.schedule import check_job_timeout, handle_job_status_not_available
from api.utils import ray_job_status_to_model_job_status, check_logs
from kubernetes import client as kubernetes_client, config
from kubernetes.client.exceptions import ApiException


logger = logging.getLogger("commands")

Expand All @@ -23,22 +23,31 @@ def handle(self, *args, **options):
updated_jobs_counter = 0
jobs = Job.objects.filter(status__in=Job.RUNNING_STATES)
for job in jobs:
job_status = Job.PENDING
if job.compute_resource:
job_status = Job.PENDING
success = True
job_handler = get_job_handler(job.compute_resource.host)
if job_handler:
ray_job_status = job_handler.status(job.ray_job_id)
if ray_job_status:
job_status = ray_job_status_to_model_job_status(ray_job_status)
else:
success = False
else:
success = False

job_status = check_job_timeout(job, job_status)
if not success:
job_status = handle_job_status_not_available(job, job_status)
# get rayjob status
# TODO make util function
config.load_incluster_config()
k8s_client = kubernetes_client.api_client.ApiClient()
ray_job_name = "rayjob-sample" # TODO don't hardcode

# Get cluster name
api_instance = kubernetes_client.CustomObjectsApi(k8s_client)
group = "ray.io"
version = "v1"
namespace = "default"
plural = "rayjobs"

try:
api_response = api_instance.get_namespaced_custom_object_status(
group, version, namespace, plural, ray_job_name
)
logger.debug(
f"new job status is {api_response["status"]["jobStatus"]}"
)
job_status = api_response["status"]["jobStatus"]
except ApiException as e:
print("Exception when getting RayJob status: %s\n" % e)

if job_status != job.status:
logger.info(
Expand All @@ -54,9 +63,7 @@ def handle(self, *args, **options):
if job.in_terminal_state():
job.env_vars = "{}"

if job_handler:
logs = job_handler.logs(job.ray_job_id)
job.logs = check_logs(logs, job)
# TODO update logs on errors?

try:
job.save()
Expand Down
57 changes: 57 additions & 0 deletions gateway/api/ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import time
import uuid
from typing import Optional
from time import sleep

import requests
import yaml
Expand Down Expand Up @@ -201,6 +202,62 @@ def submit_job(job: Job) -> Job:
return job


def create_ray_job(
job: Job,
cluster_name: Optional[str] = None,
cluster_data: Optional[str] = None,
) -> Optional[str]:
"""Create ray job.

This is still WIP and only runs a sample job.
"""

namespace = settings.RAY_KUBERAY_NAMESPACE
jobtemplate = get_template("rayjobtemplate.yaml")
manifest = jobtemplate.render()
cluster_data = yaml.safe_load(manifest)

config.load_incluster_config()
k8s_client = kubernetes_client.api_client.ApiClient()
dyn_client = DynamicClient(k8s_client)
raycluster_client = dyn_client.resources.get(api_version="v1", kind="RayJob")
response = raycluster_client.create(body=cluster_data, namespace=namespace)
ray_job_name = response.metadata.name

logger.debug(f"Ray Job name is {ray_job_name}")

# Get cluster name
api_instance = kubernetes_client.CustomObjectsApi(k8s_client)
group = "ray.io"
version = "v1"
namespace = "default"
plural = "rayjobs"

status = False
while not status:
try:
print("getting status of rayjob")
api_response = api_instance.get_namespaced_custom_object_status(
group, version, namespace, plural, ray_job_name
)
if "status" in api_response.keys():
status = True
cluster_name = api_response["status"]["rayClusterName"]
job.status = api_response["status"]["jobStatus"]
else:
sleep(1)
except ApiException as e:
print("Exception when getting RayJob status: %s\n" % e)

resource = None
if status and cluster_name:
resource = ComputeResource()
resource.owner = job.author
resource.title = cluster_name
resource.save()
return resource


def create_ray_cluster(
job: Job,
cluster_name: Optional[str] = None,
Expand Down
51 changes: 9 additions & 42 deletions gateway/api/schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@

from opentelemetry import trace

from api.models import Job, ComputeResource
from api.ray import submit_job, create_ray_cluster, kill_ray_cluster
from api.models import Job
from api.ray import create_ray_job, kill_ray_cluster
from api.utils import generate_cluster_name
from main import settings as config

Expand All @@ -41,46 +41,13 @@ def execute_job(job: Job) -> Job:

tracer = trace.get_tracer("scheduler.tracer")
with tracer.start_as_current_span("execute.job") as span:
compute_resource = ComputeResource.objects.filter(
owner=job.author, active=True
).first()

if not compute_resource:
cluster_name = generate_cluster_name(job.author.username)
span.set_attribute("job.clustername", cluster_name)
try:
compute_resource = create_ray_cluster(job, cluster_name=cluster_name)
except Exception: # pylint: disable=broad-exception-caught
# if something went wrong
# try to kill resource if it was allocated
logger.warning(
"Compute resource [%s] was not created properly.\n"
"Setting job [%s] status to [FAILED].",
cluster_name,
job,
)
kill_ray_cluster(cluster_name)
job.status = Job.FAILED
job.logs += "\nCompute resource was not created properly."

if compute_resource:
try:
job.compute_resource = compute_resource
job = submit_job(job)
job.status = Job.PENDING
except Exception: # pylint: disable=broad-exception-caught:
logger.error(
"Exception was caught during scheduling job on user [%s] resource.\n"
"Resource [%s] was in DB records, but address is not reachable.\n"
"Cleaning up db record and setting job [%s] to failed",
job.author,
compute_resource.title,
job.id,
)
kill_ray_cluster(compute_resource.title)
compute_resource.delete()
job.status = Job.FAILED
job.logs += "\nCompute resource was not found."
cluster_name = generate_cluster_name(job.author.username)
span.set_attribute("job.clustername", cluster_name)

# test out running ray job
job_resource = create_ray_job(job, cluster_name)
logger.info(f"Ray Job {job_resource} created!")
job.compute_resource = job_resource

span.set_attribute("job.status", job.status)
return job
Expand Down
Loading