From 74416b9f8ea97745703b003333c5f7b17a3637fa Mon Sep 17 00:00:00 2001 From: "Paul S. Schweigert" Date: Wed, 26 Jun 2024 12:10:32 -0400 Subject: [PATCH] WIP: using rayjobs Signed-off-by: Paul S. Schweigert --- .../charts/gateway/templates/deployment.yaml | 2 + .../templates/ray-job-code-sample.yaml | 35 ++++++ .../gateway/templates/rayclustersa.yaml | 3 +- .../gateway/templates/rayclustertemplate.yaml | 114 ++++++++++++++++++ .../charts/gateway/templates/role.yaml | 4 +- .../commands/update_jobs_statuses.py | 49 ++++---- gateway/api/ray.py | 57 +++++++++ gateway/api/schedule.py | 51 ++------ 8 files changed, 249 insertions(+), 66 deletions(-) create mode 100644 charts/qiskit-serverless/charts/gateway/templates/ray-job-code-sample.yaml diff --git a/charts/qiskit-serverless/charts/gateway/templates/deployment.yaml b/charts/qiskit-serverless/charts/gateway/templates/deployment.yaml index 7c03cfbcc..a27359cbb 100644 --- a/charts/qiskit-serverless/charts/gateway/templates/deployment.yaml +++ b/charts/qiskit-serverless/charts/gateway/templates/deployment.yaml @@ -74,6 +74,8 @@ spec: httpGet: path: /liveness port: http + initialDelaySeconds: 60 + periodSeconds: 20 readinessProbe: httpGet: path: /readiness diff --git a/charts/qiskit-serverless/charts/gateway/templates/ray-job-code-sample.yaml b/charts/qiskit-serverless/charts/gateway/templates/ray-job-code-sample.yaml new file mode 100644 index 000000000..a90f8b79d --- /dev/null +++ b/charts/qiskit-serverless/charts/gateway/templates/ray-job-code-sample.yaml @@ -0,0 +1,35 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: ray-job-code-sample +data: + sample_code.py: | + import ray + import os + import requests + + ray.init() + + @ray.remote + class Counter: + def __init__(self): + # Used to verify runtimeEnv + self.name = os.getenv("counter_name") + assert self.name == "test_counter" + self.counter = 0 + + def inc(self): + self.counter += 1 + + def get_counter(self): + return "{} got {}".format(self.name, self.counter) + + counter = Counter.remote() + + for _ in range(5): + ray.get(counter.inc.remote()) + print(ray.get(counter.get_counter.remote())) + + # Verify that the correct runtime env was used for the job. + assert requests.__version__ == "2.26.0" + diff --git a/charts/qiskit-serverless/charts/gateway/templates/rayclustersa.yaml b/charts/qiskit-serverless/charts/gateway/templates/rayclustersa.yaml index 839a04455..6d4f39814 100644 --- a/charts/qiskit-serverless/charts/gateway/templates/rayclustersa.yaml +++ b/charts/qiskit-serverless/charts/gateway/templates/rayclustersa.yaml @@ -13,6 +13,7 @@ rules: - ray.io resources: - rayclusters + - rayjobs verbs: - create - delete @@ -48,4 +49,4 @@ roleRef: subjects: - kind: ServiceAccount name: ray-cluster-sa -{{- end }} \ No newline at end of file +{{- end }} diff --git a/charts/qiskit-serverless/charts/gateway/templates/rayclustertemplate.yaml b/charts/qiskit-serverless/charts/gateway/templates/rayclustertemplate.yaml index 51e3ba921..eef2e36ac 100644 --- a/charts/qiskit-serverless/charts/gateway/templates/rayclustertemplate.yaml +++ b/charts/qiskit-serverless/charts/gateway/templates/rayclustertemplate.yaml @@ -522,3 +522,117 @@ data: - key: iptables.sh path: iptables.sh {{- end }} + rayjobtemplate.yaml: | + apiVersion: ray.io/v1 + kind: RayJob + metadata: + name: rayjob-sample + spec: + # submissionMode specifies how RayJob submits the Ray job to the RayCluster. + # The default value is "K8sJobMode", meaning RayJob will submit the Ray job via a submitter Kubernetes Job. + # The alternative value is "HTTPMode", indicating that KubeRay will submit the Ray job by sending an HTTP request to the RayCluster. + # submissionMode: "K8sJobMode" + entrypoint: python /home/ray/samples/sample_code.py + # shutdownAfterJobFinishes specifies whether the RayCluster should be deleted after the RayJob finishes. Default is false. + shutdownAfterJobFinishes: true + + # ttlSecondsAfterFinished specifies the number of seconds after which the RayCluster will be deleted after the RayJob finishes. + ttlSecondsAfterFinished: 10 + + # activeDeadlineSeconds is the duration in seconds that the RayJob may be active before + # KubeRay actively tries to terminate the RayJob; value must be positive integer. + # activeDeadlineSeconds: 120 + + # RuntimeEnvYAML represents the runtime environment configuration provided as a multi-line YAML string. + # See https://docs.ray.io/en/latest/ray-core/handling-dependencies.html for details. + # (New in KubeRay version 1.0.) + runtimeEnvYAML: | + pip: + - requests==2.26.0 + - pendulum==2.1.2 + env_vars: + counter_name: "test_counter" + + # Suspend specifies whether the RayJob controller should create a RayCluster instance. + # If a job is applied with the suspend field set to true, the RayCluster will not be created and we will wait for the transition to false. + # If the RayCluster is already created, it will be deleted. In the case of transition to false, a new RayCluste rwill be created. + # suspend: false + + # rayClusterSpec specifies the RayCluster instance to be created by the RayJob controller. + rayClusterSpec: + rayVersion: '2.30.0' # should match the Ray version in the image of the containers + # Ray head pod template + headGroupSpec: + # The `rayStartParams` are used to configure the `ray start` command. + # See https://github.com/ray-project/kuberay/blob/master/docs/guidance/rayStartParams.md for the default settings of `rayStartParams` in KubeRay. + # See https://docs.ray.io/en/latest/cluster/cli.html#ray-start for all available options in `rayStartParams`. + rayStartParams: + dashboard-host: '0.0.0.0' + #pod template + template: + spec: + containers: + - name: ray-head + image: icr.io/quantum-public/qiskit-serverless/ray-node:0.12.0-py310 + ports: + - containerPort: 6379 + name: gcs-server + - containerPort: 8265 # Ray dashboard + name: dashboard + - containerPort: 10001 + name: client + resources: + limits: + cpu: "1" + requests: + cpu: "200m" + volumeMounts: + - mountPath: /home/ray/samples + name: code-sample + volumes: + # You set volumes at the Pod level, then mount them into containers inside that Pod + - name: code-sample + configMap: + # Provide the name of the ConfigMap you want to mount. + name: ray-job-code-sample + # An array of keys from the ConfigMap to create as files + items: + - key: sample_code.py + path: sample_code.py + workerGroupSpecs: + # the pod replicas in this group typed worker + - replicas: 1 + minReplicas: 1 + maxReplicas: 5 + # logical group name, for this called small-group, also can be functional + groupName: small-group + # The `rayStartParams` are used to configure the `ray start` command. + # See https://github.com/ray-project/kuberay/blob/master/docs/guidance/rayStartParams.md for the default settings of `rayStartParams` in KubeRay. + # See https://docs.ray.io/en/latest/cluster/cli.html#ray-start for all available options in `rayStartParams`. + rayStartParams: {} + #pod template + template: + spec: + containers: + - name: ray-worker # must consist of lower case alphanumeric characters or '-', and must start and end with an alphanumeric character (e.g. 'my-name', or '123-abc' + image: icr.io/quantum-public/qiskit-serverless/ray-node:0.12.0-py310 + lifecycle: + preStop: + exec: + command: [ "/bin/sh","-c","ray stop" ] + resources: + limits: + cpu: "1" + requests: + cpu: "200m" + # SubmitterPodTemplate is the template for the pod that will run the `ray job submit` command against the RayCluster. + # If SubmitterPodTemplate is specified, the first container is assumed to be the submitter container. + # submitterPodTemplate: + # spec: + # restartPolicy: Never + # containers: + # - name: my-custom-rayjob-submitter-pod + # image: rayproject/ray:2.9.0 + # # If Command is not specified, the correct command will be supplied at runtime using the RayJob spec `entrypoint` field. + # # Specifying Command is not recommended. + # # command: ["sh", "-c", "ray job submit --address=http://$RAY_DASHBOARD_ADDRESS --submission-id=$RAY_JOB_SUBMISSION_ID -- echo hello world"] diff --git a/charts/qiskit-serverless/charts/gateway/templates/role.yaml b/charts/qiskit-serverless/charts/gateway/templates/role.yaml index 30c279dc8..f09cdfdeb 100644 --- a/charts/qiskit-serverless/charts/gateway/templates/role.yaml +++ b/charts/qiskit-serverless/charts/gateway/templates/role.yaml @@ -7,6 +7,8 @@ rules: - ray.io resources: - rayclusters + - rayjobs + - rayjobs/status verbs: - create - delete @@ -32,5 +34,3 @@ rules: - get - list {{- end }} - - diff --git a/gateway/api/management/commands/update_jobs_statuses.py b/gateway/api/management/commands/update_jobs_statuses.py index 33bc10dfb..f5dfe34d1 100644 --- a/gateway/api/management/commands/update_jobs_statuses.py +++ b/gateway/api/management/commands/update_jobs_statuses.py @@ -6,9 +6,9 @@ from django.core.management.base import BaseCommand from api.models import Job -from api.ray import get_job_handler -from api.schedule import check_job_timeout, handle_job_status_not_available -from api.utils import ray_job_status_to_model_job_status, check_logs +from kubernetes import client as kubernetes_client, config +from kubernetes.client.exceptions import ApiException + logger = logging.getLogger("commands") @@ -23,22 +23,31 @@ def handle(self, *args, **options): updated_jobs_counter = 0 jobs = Job.objects.filter(status__in=Job.RUNNING_STATES) for job in jobs: + job_status = Job.PENDING if job.compute_resource: - job_status = Job.PENDING - success = True - job_handler = get_job_handler(job.compute_resource.host) - if job_handler: - ray_job_status = job_handler.status(job.ray_job_id) - if ray_job_status: - job_status = ray_job_status_to_model_job_status(ray_job_status) - else: - success = False - else: - success = False - - job_status = check_job_timeout(job, job_status) - if not success: - job_status = handle_job_status_not_available(job, job_status) + # get rayjob status + # TODO make util function + config.load_incluster_config() + k8s_client = kubernetes_client.api_client.ApiClient() + ray_job_name = "rayjob-sample" # TODO don't hardcode + + # Get cluster name + api_instance = kubernetes_client.CustomObjectsApi(k8s_client) + group = "ray.io" + version = "v1" + namespace = "default" + plural = "rayjobs" + + try: + api_response = api_instance.get_namespaced_custom_object_status( + group, version, namespace, plural, ray_job_name + ) + logger.debug( + f"new job status is {api_response["status"]["jobStatus"]}" + ) + job_status = api_response["status"]["jobStatus"] + except ApiException as e: + print("Exception when getting RayJob status: %s\n" % e) if job_status != job.status: logger.info( @@ -54,9 +63,7 @@ def handle(self, *args, **options): if job.in_terminal_state(): job.env_vars = "{}" - if job_handler: - logs = job_handler.logs(job.ray_job_id) - job.logs = check_logs(logs, job) + # TODO update logs on errors? try: job.save() diff --git a/gateway/api/ray.py b/gateway/api/ray.py index 257802cb1..6c6ba69c2 100644 --- a/gateway/api/ray.py +++ b/gateway/api/ray.py @@ -8,6 +8,7 @@ import time import uuid from typing import Optional +from time import sleep import requests import yaml @@ -201,6 +202,62 @@ def submit_job(job: Job) -> Job: return job +def create_ray_job( + job: Job, + cluster_name: Optional[str] = None, + cluster_data: Optional[str] = None, +) -> Optional[str]: + """Create ray job. + + This is still WIP and only runs a sample job. + """ + + namespace = settings.RAY_KUBERAY_NAMESPACE + jobtemplate = get_template("rayjobtemplate.yaml") + manifest = jobtemplate.render() + cluster_data = yaml.safe_load(manifest) + + config.load_incluster_config() + k8s_client = kubernetes_client.api_client.ApiClient() + dyn_client = DynamicClient(k8s_client) + raycluster_client = dyn_client.resources.get(api_version="v1", kind="RayJob") + response = raycluster_client.create(body=cluster_data, namespace=namespace) + ray_job_name = response.metadata.name + + logger.debug(f"Ray Job name is {ray_job_name}") + + # Get cluster name + api_instance = kubernetes_client.CustomObjectsApi(k8s_client) + group = "ray.io" + version = "v1" + namespace = "default" + plural = "rayjobs" + + status = False + while not status: + try: + print("getting status of rayjob") + api_response = api_instance.get_namespaced_custom_object_status( + group, version, namespace, plural, ray_job_name + ) + if "status" in api_response.keys(): + status = True + cluster_name = api_response["status"]["rayClusterName"] + job.status = api_response["status"]["jobStatus"] + else: + sleep(1) + except ApiException as e: + print("Exception when getting RayJob status: %s\n" % e) + + resource = None + if status and cluster_name: + resource = ComputeResource() + resource.owner = job.author + resource.title = cluster_name + resource.save() + return resource + + def create_ray_cluster( job: Job, cluster_name: Optional[str] = None, diff --git a/gateway/api/schedule.py b/gateway/api/schedule.py index 1cc3facce..45ad4a5b2 100644 --- a/gateway/api/schedule.py +++ b/gateway/api/schedule.py @@ -13,8 +13,8 @@ from opentelemetry import trace -from api.models import Job, ComputeResource -from api.ray import submit_job, create_ray_cluster, kill_ray_cluster +from api.models import Job +from api.ray import create_ray_job, kill_ray_cluster from api.utils import generate_cluster_name from main import settings as config @@ -41,46 +41,13 @@ def execute_job(job: Job) -> Job: tracer = trace.get_tracer("scheduler.tracer") with tracer.start_as_current_span("execute.job") as span: - compute_resource = ComputeResource.objects.filter( - owner=job.author, active=True - ).first() - - if not compute_resource: - cluster_name = generate_cluster_name(job.author.username) - span.set_attribute("job.clustername", cluster_name) - try: - compute_resource = create_ray_cluster(job, cluster_name=cluster_name) - except Exception: # pylint: disable=broad-exception-caught - # if something went wrong - # try to kill resource if it was allocated - logger.warning( - "Compute resource [%s] was not created properly.\n" - "Setting job [%s] status to [FAILED].", - cluster_name, - job, - ) - kill_ray_cluster(cluster_name) - job.status = Job.FAILED - job.logs += "\nCompute resource was not created properly." - - if compute_resource: - try: - job.compute_resource = compute_resource - job = submit_job(job) - job.status = Job.PENDING - except Exception: # pylint: disable=broad-exception-caught: - logger.error( - "Exception was caught during scheduling job on user [%s] resource.\n" - "Resource [%s] was in DB records, but address is not reachable.\n" - "Cleaning up db record and setting job [%s] to failed", - job.author, - compute_resource.title, - job.id, - ) - kill_ray_cluster(compute_resource.title) - compute_resource.delete() - job.status = Job.FAILED - job.logs += "\nCompute resource was not found." + cluster_name = generate_cluster_name(job.author.username) + span.set_attribute("job.clustername", cluster_name) + + # test out running ray job + job_resource = create_ray_job(job, cluster_name) + logger.info(f"Ray Job {job_resource} created!") + job.compute_resource = job_resource span.set_attribute("job.status", job.status) return job