diff --git a/.github/workflows/docker-build.yaml b/.github/workflows/docker-build.yaml index a008fd8ae..7ff654c94 100644 --- a/.github/workflows/docker-build.yaml +++ b/.github/workflows/docker-build.yaml @@ -10,8 +10,6 @@ jobs: timeout-minutes: 45 steps: - uses: actions/checkout@v4 - - name: Build the function - run: docker build -t test_function:latest --build-arg TARGETARCH="amd64" -f ./tests/basic/function/Sample-Docker ./tests/basic - name: Build the containers run: docker compose -f docker-compose-dev.yaml build - name: Run the jupyter profile @@ -29,6 +27,7 @@ jobs: shell: bash run: | cd tests/basic + rm ./06_function.py for f in *.py; do echo "$f" && python "$f" &>> basic.log; done done=$(cat basic.log | grep -c "DONE") if [[ $done == 4 ]] diff --git a/.github/workflows/kubernetes-deploy.yaml b/.github/workflows/kubernetes-deploy.yaml index d39e53f32..1d57b82fe 100644 --- a/.github/workflows/kubernetes-deploy.yaml +++ b/.github/workflows/kubernetes-deploy.yaml @@ -24,6 +24,8 @@ jobs: with: k8s-version: 1.29.x kind-worker-count: 0 + - name: Label nodes + run: kubectl label node kind-control-plane has-gpu=gpu has-cpu=cpu - name: Build and load gateway run: | docker build -t gateway:test -f ./gateway/Dockerfile . diff --git a/.github/workflows/notebook-local-verify.yaml b/.github/workflows/notebook-local-verify.yaml index d66ae10fe..8a280c3c0 100644 --- a/.github/workflows/notebook-local-verify.yaml +++ b/.github/workflows/notebook-local-verify.yaml @@ -22,6 +22,8 @@ jobs: for f in tests/basic/*.py; do sed -i "s/import ServerlessClient/import LocalClient/;s/= ServerlessClient(/= LocalClient(/;/token=os\.environ\.get/d;/host=os\.environ\.get/d" "$f"; done for f in tests/experimental/*.py; do sed -i "s/import ServerlessClient/import LocalClient/;s/= ServerlessClient(/= LocalClient(/;/token=os\.environ\.get/d;/host=os\.environ\.get/d" "$f"; done rm tests/basic/06_function.py + rm tests/experimental/file_download.py + rm tests/experimental/manage_data_directory.py - name: install dependencies shell: bash run: pip install client/ diff --git a/.github/workflows/update-component-versions.yaml b/.github/workflows/update-component-versions.yaml index 54ead635f..786811a00 100644 --- a/.github/workflows/update-component-versions.yaml +++ b/.github/workflows/update-component-versions.yaml @@ -44,6 +44,8 @@ jobs: sed -i "s/ray-node:${OLDNUM}/ray-node:${NEWNUM}/" charts/qiskit-serverless/values.yaml sed -i "s/version: ${OLDNUM}/version: ${NEWNUM}/" charts/qiskit-serverless/values.yaml sed -i "s/qiskit-serverless\/ray-node:${OLDNUM}/qiskit-serverless\/ray-node:${NEWNUM}/" docs/deployment/custom_function/Sample-Dockerfile + sed -i "s/FROM icr.io\/quantum-public\/qiskit-serverless\/ray-node${OLDNUM}/FROM icr.io\/quantum-public\/qiskit-serverless\/ray-node${NEWNUM}/" docs/deployment/deploying_custom_image_function.rst + sed -i "s/FROM icr.io\/quantum-public\/qiskit-serverless\/ray-node${OLDNUM}/FROM icr.io\/quantum-public\/qiskit-serverless\/ray-node${NEWNUM}/" docs/deployment/example_custom_image_function.rst sed -i "s/qiskit-serverless\/ray-node:${OLDNUM}/qiskit-serverless\/ray-node:${NEWNUM}/" gateway/main/settings.py cd charts/qiskit-serverless helm dependency update diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index df30b217c..110d95a9c 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -146,7 +146,7 @@ instead of forking it. ### Development environment This repository contains several projects with different technologies. Depending on the project that you selected (eg. gateway), from the project directory you will run: -- `pip install -r requirements.txt requirements-dev.txt` for python projects (strongly consider using a [virtual environment](https://docs.python.org/3/library/venv.html)!). +- `pip install -r requirements.txt -r requirements-dev.txt` for python projects (strongly consider using a [virtual environment](https://docs.python.org/3/library/venv.html)!). - `helm dependency build` for helm (Before running this command, make sure to check for helm configuration instructions specific to your selected project charts). - `terraform init` for terraform. diff --git a/charts/qiskit-serverless/Chart.lock b/charts/qiskit-serverless/Chart.lock index 35f1ffef8..cabdf99a0 100644 --- a/charts/qiskit-serverless/Chart.lock +++ b/charts/qiskit-serverless/Chart.lock @@ -1,7 +1,7 @@ dependencies: - name: gateway repository: "" - version: 0.17.0 + version: 0.17.1 - name: nginx-ingress-controller repository: https://charts.bitnami.com/bitnami version: 9.11.0 @@ -11,5 +11,5 @@ dependencies: - name: kuberay-operator repository: https://ray-project.github.io/kuberay-helm version: 1.1.1 -digest: sha256:0eaad7053682fb99f4c801a5f224dcf756968ebe9e5d3580f75eb94ceb7f8f21 -generated: "2024-09-26T14:09:39.57006234Z" +digest: sha256:e550274e4089d7b3e2926e0a1049b088a407875053a9ad071b288ef50e993076 +generated: "2024-10-09T14:47:34.054577191Z" diff --git a/charts/qiskit-serverless/Chart.yaml b/charts/qiskit-serverless/Chart.yaml index 6e28800d0..c52acdf93 100644 --- a/charts/qiskit-serverless/Chart.yaml +++ b/charts/qiskit-serverless/Chart.yaml @@ -4,13 +4,13 @@ description: Qiskit-Serverless helm chart that contains different dependencies. type: application -version: 0.17.0 -appVersion: "0.17.0" +version: 0.17.1 +appVersion: "0.17.1" dependencies: - name: gateway condition: gatewayEnable - version: 0.17.0 + version: 0.17.1 - name: nginx-ingress-controller condition: nginxIngressControllerEnable version: 9.11.0 diff --git a/charts/qiskit-serverless/charts/gateway/Chart.yaml b/charts/qiskit-serverless/charts/gateway/Chart.yaml index 422a8a24a..411d80c4e 100644 --- a/charts/qiskit-serverless/charts/gateway/Chart.yaml +++ b/charts/qiskit-serverless/charts/gateway/Chart.yaml @@ -15,10 +15,10 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. # Versions are expected to follow Semantic Versioning (https://semver.org/) -version: 0.17.0 +version: 0.17.1 # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. Versions are not expected to # follow Semantic Versioning. They should reflect the version the application is using. # It is recommended to use it with quotes. -appVersion: "0.17.0" +appVersion: "0.17.1" diff --git a/charts/qiskit-serverless/charts/gateway/templates/deployment.yaml b/charts/qiskit-serverless/charts/gateway/templates/deployment.yaml index e1aaa3dc0..e312cb95b 100644 --- a/charts/qiskit-serverless/charts/gateway/templates/deployment.yaml +++ b/charts/qiskit-serverless/charts/gateway/templates/deployment.yaml @@ -119,8 +119,14 @@ spec: value: {{ .Values.application.ray.minReplicas | quote }} - name: RAY_CLUSTER_WORKER_MAX_REPLICAS value: {{ .Values.application.ray.maxReplicas | quote }} + - name: RAY_CLUSTER_CPU_NODE_SELECTOR_LABEL + value: {{ .Values.application.nodeSelector.cpu | quote }} + - name: RAY_CLUSTER_GPU_NODE_SELECTOR_LABEL + value: {{ .Values.application.nodeSelector.gpu | quote }} - name: LIMITS_CPU_PER_TASK value: {{ .Values.application.ray.cpu | quote }} + - name: LIMITS_GPU_PER_TASK + value: {{ .Values.application.ray.gpu | quote }} - name: LIMITS_MEMORY_PER_TASK value: {{ .Values.application.ray.memory | quote }} {{- if .Values.application.superuser.enable }} @@ -233,6 +239,9 @@ spec: - name: ray-cluster-template configMap: name: rayclustertemplate + - name: gpu-jobs + configMap: + name: gpujobs serviceAccountName: {{ include "gateway.serviceAccountName" . }} securityContext: {{- toYaml .Values.podSecurityContext | nindent 8 }} @@ -271,6 +280,8 @@ spec: name: gateway-pv-storage - mountPath: "/tmp/templates/" name: ray-cluster-template + - mountPath: "/tmp/gpujobs/" + name: gpu-jobs resources: {{- toYaml .Values.scheduler.resources | nindent 12 }} env: @@ -310,10 +321,18 @@ spec: value: {{ .Release.Namespace }} - name: RAY_NODE_IMAGE value: {{ .Values.application.ray.nodeImage | quote }} + - name: RAY_CLUSTER_CPU_NODE_SELECTOR_LABEL + value: {{ .Values.application.nodeSelector.cpu | quote }} + - name: RAY_CLUSTER_GPU_NODE_SELECTOR_LABEL + value: {{ .Values.application.nodeSelector.gpu | quote }} - name: LIMITS_JOBS_PER_USER value: {{ .Values.application.limits.maxJobsPerUser | quote }} - name: LIMITS_MAX_CLUSTERS value: {{ .Values.application.limits.maxComputeResources | quote }} + - name: LIMITS_GPU_CLUSTERS + value: {{ .Values.application.limits.maxGpuResources | quote }} + - name: GATEWAY_GPU_JOBS_CONFIG + value: {{ .Values.application.ray.gpuJobsConfig | quote }} {{- if .Values.application.limits.keepClusterOnComplete }} - name: RAY_CLUSTER_NO_DELETE_ON_COMPLETE value: "True" diff --git a/charts/qiskit-serverless/charts/gateway/templates/gpu-jobs.yaml b/charts/qiskit-serverless/charts/gateway/templates/gpu-jobs.yaml new file mode 100644 index 000000000..f0c3682f7 --- /dev/null +++ b/charts/qiskit-serverless/charts/gateway/templates/gpu-jobs.yaml @@ -0,0 +1,9 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: gpujobs +data: + gpu-jobs.json: | + { + "gpu-functions": {} + } diff --git a/charts/qiskit-serverless/charts/gateway/templates/rayclustertemplate.yaml b/charts/qiskit-serverless/charts/gateway/templates/rayclustertemplate.yaml index a22cf0867..cc1f36771 100644 --- a/charts/qiskit-serverless/charts/gateway/templates/rayclustertemplate.yaml +++ b/charts/qiskit-serverless/charts/gateway/templates/rayclustertemplate.yaml @@ -98,9 +98,11 @@ data: protocol: TCP resources: limits: + nvidia.com/gpu: {{`{{gpu_request}}`}} cpu: {{ .Values.application.ray.cpu }} memory: {{ .Values.application.ray.memory }}Gi requests: + nvidia.com/gpu: {{`{{gpu_request}}`}} cpu: {{ .Values.application.ray.cpu }} memory: {{ .Values.application.ray.memory }}Gi securityContext: @@ -230,6 +232,7 @@ data: serviceAccount: ray-cluster-sa {{- end }} nodeSelector: + {{`{{node_selector_label}}`}} tolerations: [] securityContext: fsGroup: 1000 diff --git a/charts/qiskit-serverless/charts/gateway/values.yaml b/charts/qiskit-serverless/charts/gateway/values.yaml index 9b55cdc12..4ed00b210 100644 --- a/charts/qiskit-serverless/charts/gateway/values.yaml +++ b/charts/qiskit-serverless/charts/gateway/values.yaml @@ -18,15 +18,16 @@ application: superuser: enable: true ray: - nodeImage: "icr.io/quantum-public/qiskit-serverless/ray-node:0.17.0" + nodeImage: "icr.io/quantum-public/qiskit-serverless/ray-node:0.17.1" cpu: 2 memory: 2 + gpu: 1 replicas: 1 minReplicas: 1 maxReplicas: 4 opensslImage: registry.access.redhat.com/ubi8/openssl:8.8-9 kubectlImage: alpine/k8s:1.29.2@sha256:a51aa37f0a34ff827c7f2f9cb7f6fbb8f0e290fa625341be14c2fcc4b1880f60 - proxyImage: "icr.io/quantum-public/qiskit-serverless/proxy:0.17.0" + proxyImage: "icr.io/quantum-public/qiskit-serverless/proxy:0.17.1" scrapeWithPrometheus: true openTelemetry: false openTelemetryCollector: @@ -36,6 +37,7 @@ application: port: 4317 insecure: 0 useTLS: true + gpuJobsConfig: "api/v1/gpu-jobs.json" proxy: enabled: true cpu: 1 @@ -44,6 +46,7 @@ application: limits: maxJobsPerUser: 2 maxComputeResources: 4 + maxGpuResources: 1 keepClusterOnComplete: False programTimeoutDays: 14 qiskitRuntime: diff --git a/charts/qiskit-serverless/values.yaml b/charts/qiskit-serverless/values.yaml index c88e7ff18..81f938fdb 100644 --- a/charts/qiskit-serverless/values.yaml +++ b/charts/qiskit-serverless/values.yaml @@ -2,7 +2,7 @@ # Qiskit Serverless Info # =================== global: - version: 0.17.0 + version: 0.17.1 # =================== # Qiskit Serverless configs @@ -47,7 +47,7 @@ gateway: image: repository: "icr.io/quantum-public/qiskit-serverless/gateway" pullPolicy: IfNotPresent - tag: "0.17.0" + tag: "0.17.1" application: siteHost: "http://gateway:8000" rayHost: "http://kuberay-head-svc:8265" @@ -59,12 +59,16 @@ gateway: type: ClusterIP port: 8000 ray: - nodeImage: "icr.io/quantum-public/qiskit-serverless/ray-node:0.17.0" + nodeImage: "icr.io/quantum-public/qiskit-serverless/ray-node:0.17.1" opensslImage: registry.access.redhat.com/ubi8/openssl:8.8-9 kubectlImage: alpine/k8s:1.29.2@sha256:a51aa37f0a34ff827c7f2f9cb7f6fbb8f0e290fa625341be14c2fcc4b1880f60 + gpuJobsConfig: "/tmp/gpujobs/gpu-jobs.json" limits: maxJobsPerUser: 2 maxComputeResources: 4 + nodeSelector: + cpu: "has-cpu: cpu" + gpu: "has-gpu: gpu" cos: claimName: gateway-claim diff --git a/client/qiskit_serverless/VERSION.txt b/client/qiskit_serverless/VERSION.txt index 07feb8234..14a8c2457 100644 --- a/client/qiskit_serverless/VERSION.txt +++ b/client/qiskit_serverless/VERSION.txt @@ -1 +1 @@ -0.17.0 \ No newline at end of file +0.17.1 \ No newline at end of file diff --git a/client/qiskit_serverless/__init__.py b/client/qiskit_serverless/__init__.py index df7d1bb88..5daefbeec 100644 --- a/client/qiskit_serverless/__init__.py +++ b/client/qiskit_serverless/__init__.py @@ -22,20 +22,15 @@ from importlib_metadata import version as metadata_version, PackageNotFoundError from .core import ( - BaseProvider, BaseClient, distribute_task, distribute_qiskit_function, get, put, get_refs_by_status, - ServerlessProvider, ServerlessClient, - IBMServerlessProvider, IBMServerlessClient, - RayProvider, RayClient, - LocalProvider, LocalClient, save_result, Configuration, diff --git a/client/qiskit_serverless/core/__init__.py b/client/qiskit_serverless/core/__init__.py index 84668942a..2852c01d7 100644 --- a/client/qiskit_serverless/core/__init__.py +++ b/client/qiskit_serverless/core/__init__.py @@ -31,11 +31,7 @@ BaseClient RayClient LocalClient - ComputeResource Job - GatewayJobClient - BaseJobClient - RayJobClient save_result QiskitPattern QiskitFunction @@ -51,25 +47,13 @@ """ -from .client import ( - BaseProvider, - BaseClient, - ComputeResource, - ServerlessProvider, - ServerlessClient, - IBMServerlessProvider, - IBMServerlessClient, - LocalProvider, - LocalClient, - RayProvider, - RayClient, -) +from .client import BaseClient + +from .clients.local_client import LocalClient +from .clients.ray_client import RayClient +from .clients.serverless_client import ServerlessClient, IBMServerlessClient from .job import ( - BaseJobClient, - RayJobClient, - GatewayJobClient, - LocalJobClient, Job, save_result, Configuration, diff --git a/client/qiskit_serverless/core/client.py b/client/qiskit_serverless/core/client.py index a8a9f1165..5f04dec32 100644 --- a/client/qiskit_serverless/core/client.py +++ b/client/qiskit_serverless/core/client.py @@ -24,134 +24,23 @@ :toctree: ../stubs/ ComputeResource - ServerlessClient + BaseClient """ -# pylint: disable=duplicate-code -import logging import warnings -import os.path -import os -from dataclasses import dataclass -from typing import Optional, List, Dict, Any, Union - -import ray -import requests -from ray.dashboard.modules.job.sdk import JobSubmissionClient -from opentelemetry import trace -from qiskit_ibm_runtime import QiskitRuntimeService - -from qiskit_serverless.core.constants import ( - REQUESTS_TIMEOUT, - ENV_GATEWAY_PROVIDER_HOST, - ENV_GATEWAY_PROVIDER_VERSION, - ENV_GATEWAY_PROVIDER_TOKEN, - GATEWAY_PROVIDER_VERSION_DEFAULT, - IBM_SERVERLESS_HOST_URL, +from abc import ABC, abstractmethod +from typing import Optional, List + +from qiskit_serverless.core.job import Job, JobService +from qiskit_serverless.core.function import ( + QiskitFunction, + RunnableQiskitFunction, + RunService, ) -from qiskit_serverless.core.files import GatewayFilesClient -from qiskit_serverless.core.job import ( - Job, - RayJobClient, - GatewayJobClient, - LocalJobClient, - BaseJobClient, - Configuration, -) -from qiskit_serverless.core.function import QiskitFunction -from qiskit_serverless.core.tracing import _trace_env_vars -from qiskit_serverless.exception import QiskitServerlessException from qiskit_serverless.utils import JsonSerializable -from qiskit_serverless.utils.json import safe_json_request from qiskit_serverless.visualizaiton import Widget -TIMEOUT = 30 - - -@dataclass -class ComputeResource: - """ComputeResource class. - Args: - name: name of compute_resource - host: host address of compute_resource - namespace: k8s namespace of compute_resource - port_interactive: port of compute_resource for interactive mode - port_job_server: port of compute resource for job server - resources: list of resources - """ - - name: str - host: Optional[str] = None - port_interactive: int = 10001 - port_job_server: int = 8265 - resources: Optional[Dict[str, float]] = None - - def job_client(self) -> Optional[BaseJobClient]: - """Return job client for given compute resource. - - Returns: - job client - """ - if self.host is not None: - connection_url = f"http://{self.host}:{self.port_job_server}" - client = None - try: - client = RayJobClient(JobSubmissionClient(connection_url)) - except ConnectionError: - logging.warning( - "Failed to establish connection with jobs server at %s. " - "You will not be able to run jobs on this provider.", - connection_url, - ) - - return client - return None - - def context(self, **kwargs): - """Return context allocated for this compute_resource.""" - _trace_env_vars({}, location="on context allocation") - - init_args = { - **kwargs, - **{ - "address": kwargs.get( - "address", - self.connection_string_interactive_mode(), - ), - "ignore_reinit_error": kwargs.get("ignore_reinit_error", True), - "logging_level": kwargs.get("logging_level", "warning"), - "resources": kwargs.get("resources", self.resources), - }, - } - - return ray.init(**init_args) - - def connection_string_interactive_mode(self) -> Optional[str]: - """Return connection string to compute_resource.""" - if self.host is not None: - return f"ray://{self.host}:{self.port_interactive}" - return None - - @classmethod - def from_dict(cls, data: dict): - """Create compute_resource object form dict.""" - return ComputeResource( - name=data.get("name"), - host=data.get("host"), - port_interactive=data.get("port_interactive"), - port_job_server=data.get("port_job_server"), - ) - - def __eq__(self, other: object): - if isinstance(other, ComputeResource): - return self.name == other.name and self.host == other.host - return False - - def __repr__(self): - return f"" - - -class BaseClient(JsonSerializable): +class BaseClient(JobService, RunService, JsonSerializable, ABC): """ A client class for specifying custom compute resources. @@ -168,12 +57,7 @@ class BaseClient(JsonSerializable): """ def __init__( # pylint: disable=too-many-positional-arguments - self, - name: str, - host: Optional[str] = None, - token: Optional[str] = None, - compute_resource: Optional[ComputeResource] = None, - available_compute_resources: Optional[List[ComputeResource]] = None, + self, name: str, host: Optional[str] = None, token: Optional[str] = None ): """ Initialize a BaseClient instance. @@ -182,42 +66,18 @@ def __init__( # pylint: disable=too-many-positional-arguments name: name of client host: host of client a.k.a managers host token: authentication token for manager - compute_resource: selected compute_resource from provider - available_compute_resources: available clusters in provider """ self.name = name self.host = host self.token = token - self.compute_resource = compute_resource - if available_compute_resources is None: - if compute_resource is not None: - available_compute_resources = [compute_resource] - else: - available_compute_resources = [] - self.available_compute_resources = available_compute_resources @classmethod + @abstractmethod def from_dict(cls, dictionary: dict): - return BaseProvider(**dictionary) - - def job_client(self): - """Return job client for configured compute resource of provider. - - Returns: - job client - """ - return self.compute_resource.job_client() - - def context(self, **kwargs): - """Allocated context for selected compute_resource for provider.""" - if self.compute_resource is None: - raise QiskitServerlessException( - f"ComputeResource was not selected for provider {self.name}" - ) - return self.compute_resource.context(**kwargs) + """Converts dict to object.""" def __eq__(self, other): - if isinstance(other, BaseProvider): + if isinstance(other, BaseClient): return self.name == other.name return False @@ -225,27 +85,19 @@ def __eq__(self, other): def __repr__(self): return f"<{self.name}>" - def get_compute_resources(self) -> List[ComputeResource]: - """Return compute resources for provider.""" - raise NotImplementedError - - def create_compute_resource(self, resource) -> int: - """Create compute resource for provider.""" - raise NotImplementedError - - def delete_compute_resource(self, resource) -> int: - """Delete compute resource for provider.""" - raise NotImplementedError - - def get_jobs(self, **kwargs) -> List[Job]: + #################### + ####### JOBS ####### + #################### + @abstractmethod + def jobs(self, **kwargs) -> List[Job]: """Return list of jobs. Returns: list of jobs. """ - raise NotImplementedError - def get_job_by_id(self, job_id: str) -> Optional[Job]: + @abstractmethod + def job(self, job_id: str) -> Optional[Job]: """Returns job by job id. Args: @@ -254,506 +106,83 @@ def get_job_by_id(self, job_id: str) -> Optional[Job]: Returns: Job instance """ - job_client = self.job_client() - - if job_client is None: - logging.warning( # pylint: disable=logging-fstring-interpolation - "Job has not been found as no provider " - "with remote host has been configured. " - ) - return None - return Job(job_id=job_id, job_client=job_client) - - def run( - self, - program: Union[QiskitFunction, str], - arguments: Optional[Dict[str, Any]] = None, - config: Optional[Configuration] = None, - ) -> Job: - """Execute a program as a async job. - - Example: - >>> serverless = QiskitServerless() - >>> program = QiskitFunction( - >>> "job.py", - >>> arguments={"arg1": "val1"}, - >>> dependencies=["requests"] - >>> ) - >>> job = serverless.run(program) - >>> # + + def get_job_by_id(self, job_id: str) -> Optional[Job]: + """Returns job by job id. Args: - arguments: arguments to run program with - program: Program object + job_id: job id Returns: - Job + Job instance """ - job_client = self.job_client() - - if job_client is None: - logging.warning( # pylint: disable=logging-fstring-interpolation - f"Job has not been submitted as no provider " - f"with remote host has been configured. " - f"Selected provider: {self}" - ) - return None - - return job_client.run(program, None, arguments, config) - - def upload(self, program: QiskitFunction): - """Uploads program.""" - raise NotImplementedError - - def files(self) -> List[str]: - """Returns list of available files produced by programs to download.""" - raise NotImplementedError - - def download( - self, - file: str, - download_location: str = "./", - ): - """Download file.""" warnings.warn( - "`download` method has been deprecated. " + "`get_job_by_id` method has been deprecated. " "And will be removed in future releases. " - "Please, use `file_download` instead.", + "Please, use `get_job` instead.", DeprecationWarning, ) - return self.file_download(file, download_location) + return self.job(job_id) - def file_download( - self, - file: str, - target_name: Optional[str] = None, - download_location: str = "./", - ): - """Download file.""" - raise NotImplementedError - - def file_delete(self, file: str): - """Deletes file uploaded or produced by the programs,""" - raise NotImplementedError - - def file_upload(self, file: str): - """Upload file.""" - raise NotImplementedError - - def widget(self): - """Widget for information about provider and jobs.""" - return Widget(self).show() + def get_jobs(self, **kwargs) -> List[Job]: + # pylint: disable=duplicate-code + """Return list of jobs. - def get_programs(self, **kwargs): - """[Deprecated] Returns list of available programs.""" + Returns: + list of jobs. + """ warnings.warn( - "`get_programs` method has been deprecated. " + "`get_jobs` method has been deprecated. " "And will be removed in future releases. " - "Please, use `list` instead.", + "Please, use `jobs` instead.", DeprecationWarning, ) - return self.list(**kwargs) - - def list(self, **kwargs) -> List[QiskitFunction]: - """Returns list of available programs.""" - raise NotImplementedError - - def get( - self, title: str, provider: Optional[str] = None - ) -> Optional[QiskitFunction]: - """Returns qiskit function based on title provided.""" - raise NotImplementedError - - -class BaseProvider(BaseClient): - """ - [Deprecated since version 0.10.0] Use :class:`.BaseClient` instead. - - A provider for connecting to a specified host. This class has been - renamed to :class:`.BaseClient`. - """ - - -class ServerlessClient(BaseClient): - """ - A client for connecting to a specified host. - - Example: - >>> client = ServerlessClient( - >>> name="", - >>> host="", - >>> token="", - >>> ) - """ - - def __init__( # pylint: disable=too-many-positional-arguments - self, - name: Optional[str] = None, - host: Optional[str] = None, - version: Optional[str] = None, - token: Optional[str] = None, - verbose: bool = False, - ): - """ - Initializes the ServerlessClient instance. - - Args: - name: name of client - host: host of gateway - version: version of gateway - token: authorization token - """ - name = name or "gateway-client" - host = host or os.environ.get(ENV_GATEWAY_PROVIDER_HOST) - if host is None: - raise QiskitServerlessException("Please provide `host` of gateway.") - - version = version or os.environ.get(ENV_GATEWAY_PROVIDER_VERSION) - if version is None: - version = GATEWAY_PROVIDER_VERSION_DEFAULT - - token = token or os.environ.get(ENV_GATEWAY_PROVIDER_TOKEN) - if token is None: - raise QiskitServerlessException( - "Authentication credentials must be provided in form of `token`." - ) - - super().__init__(name) - self.verbose = verbose - self.host = host - self.version = version - self._verify_token(token) - self._token = token - - self._job_client = GatewayJobClient(self.host, self._token, self.version) - self._files_client = GatewayFilesClient(self.host, self._token, self.version) - - def get_compute_resources(self) -> List[ComputeResource]: - raise NotImplementedError( - "ServerlessClient does not support resources api yet." - ) - - def create_compute_resource(self, resource) -> int: - raise NotImplementedError( - "ServerlessClient does not support resources api yet." - ) - - def delete_compute_resource(self, resource) -> int: - raise NotImplementedError( - "ServerlessClient does not support resources api yet." - ) - - def get_job_by_id(self, job_id: str) -> Optional[Job]: - return self._job_client.get(job_id) - - def run( - self, - program: Union[QiskitFunction, str], - arguments: Optional[Dict[str, Any]] = None, - config: Optional[Configuration] = None, - ) -> Job: - tracer = trace.get_tracer("client.tracer") - with tracer.start_as_current_span("Provider.run"): - warnings.warn( - "`run` method has been deprecated. " - "And will be removed in future releases. " - "Please, use `function.run` instead.", - DeprecationWarning, - ) - if isinstance(program, QiskitFunction) and program.entrypoint is not None: - job = self._job_client.run(program.title, None, arguments, config) - else: - job = self._job_client.run(program, None, arguments, config) - return job - - def upload(self, program: QiskitFunction): - tracer = trace.get_tracer("client.tracer") - with tracer.start_as_current_span("Provider.upload"): - response = self._job_client.upload(program) - return response - - def get_jobs(self, **kwargs) -> List[Job]: - return self._job_client.list(**kwargs) - - def files(self, provider: Optional[str] = None) -> List[str]: - return self._files_client.list(provider) - - def file_download( - self, - file: str, - target_name: Optional[str] = None, - download_location: str = "./", - provider: Optional[str] = None, - ): - return self._files_client.download( - file, download_location, target_name, provider - ) + return self.jobs(**kwargs) - def file_delete(self, file: str, provider: Optional[str] = None): - return self._files_client.delete(file, provider) + ######################### + ####### Functions ####### + ######################### - def file_upload(self, file: str, provider: Optional[str] = None): - return self._files_client.upload(file, provider) + @abstractmethod + def upload(self, program: QiskitFunction) -> Optional[RunnableQiskitFunction]: + """Uploads program.""" - def list(self, **kwargs) -> List[QiskitFunction]: + @abstractmethod + def functions(self, **kwargs) -> List[RunnableQiskitFunction]: """Returns list of available programs.""" - return self._job_client.get_programs(**kwargs) - def get( + @abstractmethod + def function( self, title: str, provider: Optional[str] = None - ) -> Optional[QiskitFunction]: - return self._job_client.get_program(title=title, provider=provider) - - def _verify_token(self, token: str): - """Verify token.""" - try: - safe_json_request( - request=lambda: requests.get( - url=f"{self.host}/api/v1/programs/", - headers={"Authorization": f"Bearer {token}"}, - timeout=REQUESTS_TIMEOUT, - ), - verbose=self.verbose, - ) - except QiskitServerlessException as reason: - raise QiskitServerlessException("Cannot verify token.") from reason - - -class ServerlessProvider(ServerlessClient): - """ - [Deprecated since version 0.10.0] Use :class:`.ServerlessClient` instead. - - A provider for connecting to a specified host. This class has been - renamed to :class:`.ServerlessClient`. - """ - - -class IBMServerlessClient(ServerlessClient): - """ - A client for connecting to the IBM serverless host. - - Credentials can be saved to disk by calling the `save_account()` method:: - - from qiskit_serverless import IBMServerlessClient - IBMServerlessClient.save_account(token=) - - Once the credentials are saved, you can simply instantiate the client with no - constructor args, as shown below. - - from qiskit_serverless import IBMServerlessClient - client = IBMServerlessClient() - - Instead of saving credentials to disk, you can also set the environment variable - ENV_GATEWAY_PROVIDER_TOKEN and then instantiate the client as below:: - - from qiskit_serverless import IBMServerlessClient - client = IBMServerlessClient() - - You can also enable an account just for the current session by instantiating the - provider with the API token:: - - from qiskit_serverless import IBMServerlessClient - client = IBMServerlessClient(token=) - """ + ) -> Optional[RunnableQiskitFunction]: + """Returns program based on parameters.""" - def __init__(self, token: Optional[str] = None, name: Optional[str] = None): - """ - Initialize a client with access to an IBMQ-provided remote cluster. - - If a ``token`` is used to initialize an instance, the ``name`` argument - will be ignored. - - If only a ``name`` is provided, the token for the named account will - be retrieved from the user's local IBM Quantum account config file. - - If neither argument is provided, the token will be searched for in the - environment variables and also in the local IBM Quantum account config - file using the default account name. - - Args: - token: IBM quantum token - name: Name of the account to load - """ - token = token or QiskitRuntimeService(name=name).active_account().get("token") - super().__init__(token=token, host=IBM_SERVERLESS_HOST_URL) - - @staticmethod - def save_account( - token: Optional[str] = None, - name: Optional[str] = None, - overwrite: Optional[bool] = False, - ) -> None: - """ - Save the account to disk for future use. - - Args: - token: IBM Quantum API token - name: Name of the account to save - overwrite: ``True`` if the existing account is to be overwritten - """ - QiskitRuntimeService.save_account(token=token, name=name, overwrite=overwrite) - - def get_compute_resources(self) -> List[ComputeResource]: - raise NotImplementedError( - "IBMServerlessClient does not support resources api yet." - ) - - def create_compute_resource(self, resource) -> int: - raise NotImplementedError( - "IBMServerlessClient does not support resources api yet." - ) - - def delete_compute_resource(self, resource) -> int: - raise NotImplementedError( - "IBMServerlessClient does not support resources api yet." + def get( + self, title: str, provider: Optional[str] = None + ) -> Optional[RunnableQiskitFunction]: + """Returns program based on parameters.""" + warnings.warn( + "`get` method has been deprecated. " + "And will be removed in future releases. " + "Please, use `get_function` instead.", + DeprecationWarning, ) + return self.function(title, provider=provider) - -class IBMServerlessProvider(IBMServerlessClient): - """ - [Deprecated since version 0.10.0] Use :class:`.IBMServerlessClient` instead. - - A provider for connecting to IBM Serverless instance. This class has been - renamed to :class:`.IBMServerlessClient`. - """ - - -class RayClient(BaseClient): - """RayClient.""" - - def __init__(self, host: str): - """Ray client - - Args: - host: ray head node host - - Example: - >>> ray_provider = RayClient("http://localhost:8265") - """ - super().__init__("ray-client", host) - self.client = RayJobClient(JobSubmissionClient(host)) - - def run( - self, - program: Union[QiskitFunction, str], - arguments: Optional[Dict[str, Any]] = None, - config: Optional[Configuration] = None, - ) -> Job: - if isinstance(program, str): - raise NotImplementedError("Ray client only supports full Programs.") - - return self.client.run(program, None, arguments, config) - - def get_job_by_id(self, job_id: str) -> Optional[Job]: - return self.client.get(job_id) - - def get_jobs(self, **kwargs) -> List[Job]: - return self.client.list() - - -class RayProvider(RayClient): - """ - [Deprecated since version 0.10.0] Use :class:`.RayClient` instead. - - A provider for connecting to a ray head node. This class has been - renamed to :class:`.RayClient`. - """ - - -class LocalClient(BaseClient): - """LocalClient.""" - - def __init__(self): - """Local client - - Args: - - Example: - >>> local = LocalClient()) - """ - super().__init__("local-client") - self.client = LocalJobClient() - self.in_test = os.getenv("IN_TEST") - - def run( - self, - program: Union[QiskitFunction, str], - arguments: Optional[Dict[str, Any]] = None, - config: Optional[Configuration] = None, - ) -> Job: + def list(self, **kwargs) -> List[RunnableQiskitFunction]: + """Returns list of available programs.""" warnings.warn( - "`client.run` method has been deprecated. " + "`list` method has been deprecated. " "And will be removed in future releases. " - "Please, use `function.run` instead.", + "Please, use `get_functions` instead.", DeprecationWarning, ) - if isinstance(program, QiskitFunction) and program.entrypoint is not None: - job = self.client.run(program.title, None, arguments, config) - else: - job = self.client.run(program, None, arguments, config) - return job + return self.functions(**kwargs) - def get_job_by_id(self, job_id: str) -> Optional[Job]: - return self.client.get(job_id) - - def get_jobs(self, **kwargs) -> List[Job]: - return self.client.list() - - def upload(self, program: QiskitFunction): - return self.client.upload(program) + ###################### + ####### Widget ####### + ###################### def widget(self): """Widget for information about provider and jobs.""" return Widget(self).show() - - def get_programs(self, **kwargs) -> List[QiskitFunction]: - return self.client.get_programs(**kwargs) - - def files(self) -> List[str]: - if self.in_test: - logging.warning("files method is not implemented in LocalProvider.") - return [] - raise NotImplementedError("files method is not implemented in LocalProvider.") - - def file_upload(self, file: str): - if self.in_test: - logging.warning("file_upload method is not implemented in LocalProvider.") - return - raise NotImplementedError("files method is not implemented in LocalProvider.") - - def file_download( - self, - file: str, - target_name: Optional[str] = None, - download_location: str = "./", - ): - if self.in_test: - logging.warning("file_download method is not implemented in LocalProvider.") - return None - raise NotImplementedError("files method is not implemented in LocalProvider.") - - def file_delete(self, file: str): - if self.in_test: - logging.warning("file_delete method is not implemented in LocalProvider.") - return None - raise NotImplementedError("files method is not implemented in LocalProvider.") - - def list(self, **kwargs): - return self.client.get_programs(**kwargs) - - def get( - self, title: str, provider: Optional[str] = None - ) -> Optional[QiskitFunction]: - functions = { - function.title: function for function in self.client.get_programs() - } - return functions.get(title) - - -class LocalProvider(LocalClient): - """ - [Deprecated since version 0.10.0] Use :class:`.LocalClient` instead. - - A provider for connecting to local job execution instance. This class has been - renamed to :class:`.LocalClient`. - """ diff --git a/client/qiskit_serverless/core/clients/__init__.py b/client/qiskit_serverless/core/clients/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/client/qiskit_serverless/core/clients/local_client.py b/client/qiskit_serverless/core/clients/local_client.py new file mode 100644 index 000000000..c0c28a64e --- /dev/null +++ b/client/qiskit_serverless/core/clients/local_client.py @@ -0,0 +1,195 @@ +# This code is a Qiskit project. +# +# (C) Copyright IBM 2022. +# +# This code is licensed under the Apache License, Version 2.0. You may +# obtain a copy of this license in the LICENSE.txt file in the root directory +# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. +# +# Any modifications or derivative works of this code must retain this +# copyright notice, and modified files need to carry a notice indicating +# that they have been altered from the originals. + +""" +================================================ +Provider (:mod:`qiskit_serverless.core.client`) +================================================ + +.. currentmodule:: qiskit_serverless.core.client + +Qiskit Serverless provider +=========================== + +.. autosummary:: + :toctree: ../stubs/ + + LocalClient +""" +# pylint: disable=duplicate-code +import json +import os.path +import os +import re +import sys +from typing import Optional, List, Dict, Any, Union +from uuid import uuid4 + +import subprocess +from subprocess import Popen + +from qiskit_ibm_runtime import QiskitRuntimeService + +from qiskit_serverless.core.constants import ( + OT_PROGRAM_NAME, + ENV_JOB_ARGUMENTS, +) +from qiskit_serverless.core.client import BaseClient +from qiskit_serverless.core.job import ( + Job, + Configuration, +) +from qiskit_serverless.core.function import QiskitFunction, RunnableQiskitFunction +from qiskit_serverless.exception import QiskitServerlessException +from qiskit_serverless.serializers.program_serializers import ( + QiskitObjectsEncoder, +) + + +class LocalClient(BaseClient): + """LocalClient.""" + + def __init__(self): + """Local client + + Args: + + Example: + >>> local = LocalClient() + """ + super().__init__("local-client") + self.in_test = os.getenv("IN_TEST") + self._jobs = {} + self._patterns = [] + + @classmethod + def from_dict(cls, dictionary: dict): + return LocalClient(**dictionary) + + #################### + ####### JOBS ####### + #################### + + def job(self, job_id: str) -> Optional[Job]: + return self._jobs[job_id]["job"] + + def jobs(self, **kwargs) -> List[Job]: + return [job["job"] for job in list(self._jobs.values())] + + def run( + self, + program: Union[QiskitFunction, str], + arguments: Optional[Dict[str, Any]] = None, + config: Optional[Configuration] = None, + ) -> Job: + # pylint: disable=too-many-locals + title = "" + if isinstance(program, QiskitFunction): + title = program.title + else: + title = str(program) + + for pattern in self._patterns: + if pattern["title"] == title: + saved_program = pattern + if saved_program[ # pylint: disable=possibly-used-before-assignment + "dependencies" + ]: + dept = json.loads(saved_program["dependencies"]) + for dependency in dept: + subprocess.check_call( + [sys.executable, "-m", "pip", "install", dependency] + ) + arguments = arguments or {} + env_vars = { + **(saved_program["env_vars"] or {}), + **{OT_PROGRAM_NAME: saved_program["title"]}, + **{"PATH": os.environ["PATH"]}, + **{ENV_JOB_ARGUMENTS: json.dumps(arguments, cls=QiskitObjectsEncoder)}, + } + + with Popen( + ["python", saved_program["working_dir"] + saved_program["entrypoint"]], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + universal_newlines=True, + env=env_vars, + ) as pipe: + status = "SUCCEEDED" + if pipe.wait(): + status = "FAILED" + output, _ = pipe.communicate() + results = re.search("\nSaved Result:(.+?):End Saved Result\n", output) + result = "" + if results: + result = results.group(1) + + job = Job(job_id=str(uuid4()), job_service=self) + self._jobs[job.job_id] = { + "status": status, + "logs": output, + "result": result, + "job": job, + } + return job + + def status(self, job_id: str): + return self._jobs[job_id]["status"] + + def stop(self, job_id: str, service: Optional[QiskitRuntimeService] = None): + """Stops job/program.""" + return f"job:{job_id} has already stopped" + + def result(self, job_id: str): + return self._jobs[job_id]["result"] + + def logs(self, job_id: str): + return self._jobs[job_id]["logs"] + + def filtered_logs(self, job_id: str, **kwargs): + """Return filtered logs.""" + raise NotImplementedError + + ######################### + ####### Functions ####### + ######################### + + def upload(self, program: QiskitFunction) -> Optional[RunnableQiskitFunction]: + # check if entrypoint exists + if not os.path.exists(os.path.join(program.working_dir, program.entrypoint)): + raise QiskitServerlessException( + f"Entrypoint file [{program.entrypoint}] does not exist " + f"in [{program.working_dir}] working directory." + ) + + pattern = { + "title": program.title, + "provider": program.provider, + "entrypoint": program.entrypoint, + "working_dir": program.working_dir, + "env_vars": program.env_vars, + "arguments": json.dumps({}), + "dependencies": json.dumps(program.dependencies or []), + "client": self, + } + self._patterns.append(pattern) + return RunnableQiskitFunction.from_json(pattern) + + def functions(self, **kwargs) -> List[RunnableQiskitFunction]: + """Returns list of programs.""" + return [RunnableQiskitFunction.from_json(program) for program in self._patterns] + + def function( + self, title: str, provider: Optional[str] = None + ) -> Optional[RunnableQiskitFunction]: + functions = {function.title: function for function in self.functions()} + return functions.get(title) diff --git a/client/qiskit_serverless/core/clients/ray_client.py b/client/qiskit_serverless/core/clients/ray_client.py new file mode 100644 index 000000000..cd2eaaa90 --- /dev/null +++ b/client/qiskit_serverless/core/clients/ray_client.py @@ -0,0 +1,173 @@ +# This code is a Qiskit project. +# +# (C) Copyright IBM 2022. +# +# This code is licensed under the Apache License, Version 2.0. You may +# obtain a copy of this license in the LICENSE.txt file in the root directory +# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. +# +# Any modifications or derivative works of this code must retain this +# copyright notice, and modified files need to carry a notice indicating +# that they have been altered from the originals. + +""" +================================================ +Provider (:mod:`qiskit_serverless.core.client`) +================================================ + +.. currentmodule:: qiskit_serverless.core.client + +Qiskit Serverless provider +=========================== + +.. autosummary:: + :toctree: ../stubs/ + + RayClient +""" +# pylint: disable=duplicate-code +import json +import warnings +from typing import Optional, List, Dict, Any, Union +from uuid import uuid4 + +from ray.dashboard.modules.job.sdk import JobSubmissionClient +from qiskit_ibm_runtime import QiskitRuntimeService + +from qiskit_serverless.core.constants import ( + OT_PROGRAM_NAME, + ENV_JOB_ARGUMENTS, +) +from qiskit_serverless.core.job import ( + Configuration, + Job, +) +from qiskit_serverless.core.function import QiskitFunction, RunnableQiskitFunction +from qiskit_serverless.serializers.program_serializers import ( + QiskitObjectsEncoder, +) + +from qiskit_serverless.core.client import BaseClient + + +class RayClient(BaseClient): + """RayClient.""" + + def __init__(self, host: str): + """Ray client + + Args: + host: ray head node host + + Example: + >>> ray_provider = RayClient("http://localhost:8265") + """ + super().__init__("ray-client", host) + self.job_submission_client = JobSubmissionClient(host) + + @classmethod + def from_dict(cls, dictionary: dict): + return RayClient(**dictionary) + + #################### + ####### JOBS ####### + #################### + + def jobs(self, **kwargs) -> List[Job]: + """Return list of jobs. + + Returns: + list of jobs. + """ + return [ + Job(job.job_id, job_service=self) + for job in self.job_submission_client.list_jobs() + ] + + def job(self, job_id: str) -> Optional[Job]: + """Returns job by job id. + + Args: + job_id: job id + + Returns: + Job instance + """ + return Job( + self.job_submission_client.get_job_info(job_id).submission_id, + job_service=self, + ) + + def run( + self, + program: Union[QiskitFunction, str], + arguments: Optional[Dict[str, Any]] = None, + config: Optional[Configuration] = None, + ) -> Job: + if not isinstance(program, QiskitFunction): + warnings.warn( + "`run` doesn't support program str yet. " + "Send a QiskitFunction instead. " + ) + raise NotImplementedError + + arguments = arguments or {} + entrypoint = f"python {program.entrypoint}" + + # set program name so OT can use it as parent span name + env_vars = { + **(program.env_vars or {}), + **{OT_PROGRAM_NAME: program.title}, + **{ENV_JOB_ARGUMENTS: json.dumps(arguments, cls=QiskitObjectsEncoder)}, + } + + job_id = self.job_submission_client.submit_job( + entrypoint=entrypoint, + submission_id=f"qs_{uuid4()}", + runtime_env={ + "working_dir": program.working_dir, + "pip": program.dependencies, + "env_vars": env_vars, + }, + ) + return Job(job_id=job_id, job_service=self) + + def status(self, job_id: str) -> str: + """Check status.""" + return self.job_submission_client.get_job_status(job_id).value + + def stop( + self, job_id: str, service: Optional[QiskitRuntimeService] = None + ) -> Union[str, bool]: + """Stops job/program.""" + return self.job_submission_client.stop_job(job_id) + + def result(self, job_id: str) -> Any: + """Return results.""" + return self.logs(job_id) + + def logs(self, job_id: str) -> str: + """Return logs.""" + return self.job_submission_client.get_job_logs(job_id) + + def filtered_logs(self, job_id: str, **kwargs) -> str: + """Return filtered logs.""" + raise NotImplementedError + + ######################### + ####### Functions ####### + ######################### + + def upload(self, program: QiskitFunction) -> Optional[RunnableQiskitFunction]: + """Uploads program.""" + raise NotImplementedError("Upload is not available for RayClient.") + + def functions(self, **kwargs) -> List[RunnableQiskitFunction]: + """Returns list of available programs.""" + raise NotImplementedError("get_programs is not available for RayClient.") + + def function( + self, title: str, provider: Optional[str] = None + ) -> Optional[RunnableQiskitFunction]: + """Returns program based on parameters.""" + raise NotImplementedError("get_program is not available for RayClient.") diff --git a/client/qiskit_serverless/core/clients/serverless_client.py b/client/qiskit_serverless/core/clients/serverless_client.py new file mode 100644 index 000000000..3478b9843 --- /dev/null +++ b/client/qiskit_serverless/core/clients/serverless_client.py @@ -0,0 +1,601 @@ +# This code is a Qiskit project. +# +# (C) Copyright IBM 2022. +# +# This code is licensed under the Apache License, Version 2.0. You may +# obtain a copy of this license in the LICENSE.txt file in the root directory +# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. +# +# Any modifications or derivative works of this code must retain this +# copyright notice, and modified files need to carry a notice indicating +# that they have been altered from the originals. + +""" +================================================ +Provider (:mod:`qiskit_serverless.core.client`) +================================================ + +.. currentmodule:: qiskit_serverless.core.client + +Qiskit Serverless provider +=========================== + +.. autosummary:: + :toctree: ../stubs/ + + ServerlessClient +""" +# pylint: disable=duplicate-code +import json +import os.path +import os +import re +import tarfile +from pathlib import Path +from dataclasses import asdict +from typing import Optional, List, Dict, Any, Union + +import requests +from opentelemetry import trace +from qiskit_ibm_runtime import QiskitRuntimeService + +from qiskit_serverless.core.constants import ( + REQUESTS_TIMEOUT, + ENV_GATEWAY_PROVIDER_HOST, + ENV_GATEWAY_PROVIDER_VERSION, + ENV_GATEWAY_PROVIDER_TOKEN, + GATEWAY_PROVIDER_VERSION_DEFAULT, + IBM_SERVERLESS_HOST_URL, + MAX_ARTIFACT_FILE_SIZE_MB, +) +from qiskit_serverless.core.client import BaseClient +from qiskit_serverless.core.files import GatewayFilesClient +from qiskit_serverless.core.job import ( + Job, + Configuration, +) +from qiskit_serverless.core.function import ( + QiskitFunction, + RunService, + RunnableQiskitFunction, +) + +from qiskit_serverless.exception import QiskitServerlessException +from qiskit_serverless.utils.json import ( + safe_json_request_as_dict, + safe_json_request_as_list, + safe_json_request, +) +from qiskit_serverless.utils.formatting import format_provider_name_and_title +from qiskit_serverless.serializers.program_serializers import ( + QiskitObjectsEncoder, + QiskitObjectsDecoder, +) + + +class ServerlessClient(BaseClient): + """ + A client for connecting to a specified host. + + Example: + >>> client = ServerlessClient( + >>> name="", + >>> host="", + >>> token="", + >>> ) + """ + + def __init__( # pylint: disable=too-many-positional-arguments + self, + name: Optional[str] = None, + host: Optional[str] = None, + version: Optional[str] = None, + token: Optional[str] = None, + verbose: bool = False, + ): + """ + Initializes the ServerlessClient instance. + + Args: + name: name of client + host: host of gateway + version: version of gateway + token: authorization token + """ + name = name or "gateway-client" + host = host or os.environ.get(ENV_GATEWAY_PROVIDER_HOST) + if host is None: + raise QiskitServerlessException("Please provide `host` of gateway.") + + version = version or os.environ.get(ENV_GATEWAY_PROVIDER_VERSION) + if version is None: + version = GATEWAY_PROVIDER_VERSION_DEFAULT + + token = token or os.environ.get(ENV_GATEWAY_PROVIDER_TOKEN) + if token is None: + raise QiskitServerlessException( + "Authentication credentials must be provided in form of `token`." + ) + + super().__init__(name, host, token) + self.verbose = verbose + self.version = version + self._verify_token(token) + + self._files_client = GatewayFilesClient(self.host, self.token, self.version) + + @classmethod + def from_dict(cls, dictionary: dict): + return ServerlessClient(**dictionary) + + def _verify_token(self, token: str): + """Verify token.""" + try: + safe_json_request( + request=lambda: requests.get( + url=f"{self.host}/api/v1/programs/", + headers={"Authorization": f"Bearer {token}"}, + timeout=REQUESTS_TIMEOUT, + ), + verbose=self.verbose, + ) + except QiskitServerlessException as reason: + raise QiskitServerlessException("Cannot verify token.") from reason + + #################### + ####### JOBS ####### + #################### + + def jobs(self, **kwargs) -> List[Job]: + tracer = trace.get_tracer("client.tracer") + with tracer.start_as_current_span("job.list"): + limit = kwargs.get("limit", 10) + kwargs["limit"] = limit + offset = kwargs.get("offset", 0) + kwargs["offset"] = offset + + response_data = safe_json_request_as_dict( + request=lambda: requests.get( + f"{self.host}/api/{self.version}/jobs", + params=kwargs, + headers={"Authorization": f"Bearer {self.token}"}, + timeout=REQUESTS_TIMEOUT, + ) + ) + + return [ + Job(job.get("id"), job_service=self, raw_data=job) + for job in response_data.get("results", []) + ] + + def job(self, job_id: str) -> Optional[Job]: + tracer = trace.get_tracer("client.tracer") + with tracer.start_as_current_span("job.get"): + url = f"{self.host}/api/{self.version}/jobs/{job_id}/" + response_data = safe_json_request_as_dict( + request=lambda: requests.get( + url, + headers={"Authorization": f"Bearer {self.token}"}, + timeout=REQUESTS_TIMEOUT, + ) + ) + + job = None + job_id = response_data.get("id") + if job_id is not None: + job = Job( + job_id=job_id, + job_service=self, + ) + + return job + + def run( + self, + program: Union[QiskitFunction, str], + arguments: Optional[Dict[str, Any]] = None, + config: Optional[Configuration] = None, + provider: Optional[str] = None, + ) -> Job: + if isinstance(program, QiskitFunction): + title = program.title + provider = program.provider + else: + title = str(program) + + tracer = trace.get_tracer("client.tracer") + with tracer.start_as_current_span("job.run") as span: + span.set_attribute("program", title) + span.set_attribute("provider", provider) + span.set_attribute("arguments", str(arguments)) + + url = f"{self.host}/api/{self.version}/programs/run/" + + data = { + "title": title, + "provider": provider, + "arguments": json.dumps(arguments or {}, cls=QiskitObjectsEncoder), + } # type: Dict[str, Any] + if config: + data["config"] = asdict(config) + else: + data["config"] = asdict(Configuration()) + + response_data = safe_json_request_as_dict( + request=lambda: requests.post( + url=url, + json=data, + headers={"Authorization": f"Bearer {self.token}"}, + timeout=REQUESTS_TIMEOUT, + ) + ) + job_id = response_data.get("id") + span.set_attribute("job.id", job_id) + + return Job(job_id, job_service=self) + + def status(self, job_id: str): + tracer = trace.get_tracer("client.tracer") + with tracer.start_as_current_span("job.status"): + default_status = "Unknown" + response_data = safe_json_request_as_dict( + request=lambda: requests.get( + f"{self.host}/api/{self.version}/jobs/{job_id}/", + headers={"Authorization": f"Bearer {self.token}"}, + timeout=REQUESTS_TIMEOUT, + ) + ) + + return response_data.get("status", default_status) + + def stop(self, job_id: str, service: Optional[QiskitRuntimeService] = None): + tracer = trace.get_tracer("client.tracer") + with tracer.start_as_current_span("job.stop"): + if service: + data = { + "service": json.dumps(service, cls=QiskitObjectsEncoder), + } + else: + data = { + "service": None, + } + response_data = safe_json_request_as_dict( + request=lambda: requests.post( + f"{self.host}/api/{self.version}/jobs/{job_id}/stop/", + headers={"Authorization": f"Bearer {self.token}"}, + timeout=REQUESTS_TIMEOUT, + json=data, + ) + ) + + return response_data.get("message") + + def result(self, job_id: str): + tracer = trace.get_tracer("client.tracer") + with tracer.start_as_current_span("job.result"): + response_data = safe_json_request_as_dict( + request=lambda: requests.get( + f"{self.host}/api/{self.version}/jobs/{job_id}/", + headers={"Authorization": f"Bearer {self.token}"}, + timeout=REQUESTS_TIMEOUT, + ) + ) + return json.loads( + response_data.get("result", "{}") or "{}", cls=QiskitObjectsDecoder + ) + + def logs(self, job_id: str): + tracer = trace.get_tracer("client.tracer") + with tracer.start_as_current_span("job.logs"): + response_data = safe_json_request_as_dict( + request=lambda: requests.get( + f"{self.host}/api/{self.version}/jobs/{job_id}/logs/", + headers={"Authorization": f"Bearer {self.token}"}, + timeout=REQUESTS_TIMEOUT, + ) + ) + return response_data.get("logs") + + def filtered_logs(self, job_id: str, **kwargs): + all_logs = self.logs(job_id=job_id) + included = "" + include = kwargs.get("include") + if include is not None: + for line in all_logs.split("\n"): + if re.search(include, line) is not None: + included = included + line + "\n" + else: + included = all_logs + + excluded = "" + exclude = kwargs.get("exclude") + if exclude is not None: + for line in included.split("\n"): + if line != "" and re.search(exclude, line) is None: + excluded = excluded + line + "\n" + else: + excluded = included + return excluded + + ######################### + ####### Functions ####### + ######################### + + def upload(self, program: QiskitFunction) -> Optional[RunnableQiskitFunction]: + tracer = trace.get_tracer("client.tracer") + with tracer.start_as_current_span("job.run") as span: + span.set_attribute("program", program.title) + url = f"{self.host}/api/{self.version}/programs/upload/" + + if program.image is not None: + # upload function with custom image + function_uploaded = _upload_with_docker_image( + program=program, url=url, token=self.token, span=span, client=self + ) + elif program.entrypoint is not None: + # upload funciton with artifact + function_uploaded = _upload_with_artifact( + program=program, url=url, token=self.token, span=span, client=self + ) + else: + raise QiskitServerlessException( + "Function must either have `entrypoint` or `image` specified." + ) + + return function_uploaded + + def functions(self, **kwargs) -> List[RunnableQiskitFunction]: + """Returns list of available programs.""" + tracer = trace.get_tracer("client.tracer") + with tracer.start_as_current_span("program.list"): + response_data = safe_json_request_as_list( + request=lambda: requests.get( + f"{self.host}/api/{self.version}/programs", + headers={"Authorization": f"Bearer {self.token}"}, + params=kwargs, + timeout=REQUESTS_TIMEOUT, + ) + ) + + return [ + RunnableQiskitFunction( + client=self, + title=program.get("title"), + provider=program.get("provider", None), + raw_data=program, + description=program.get("description"), + ) + for program in response_data + ] + + def function( + self, title: str, provider: Optional[str] = None + ) -> Optional[RunnableQiskitFunction]: + """Returns program based on parameters.""" + provider, title = format_provider_name_and_title( + request_provider=provider, title=title + ) + + tracer = trace.get_tracer("client.tracer") + with tracer.start_as_current_span("program.get_by_title"): + response_data = safe_json_request_as_dict( + request=lambda: requests.get( + f"{self.host}/api/{self.version}/programs/get_by_title/{title}", + headers={"Authorization": f"Bearer {self.token}"}, + params={"provider": provider}, + timeout=REQUESTS_TIMEOUT, + ) + ) + return RunnableQiskitFunction( + client=self, + title=response_data.get("title"), + provider=response_data.get("provider", None), + raw_data=response_data, + ) + + ##################### + ####### FILES ####### + ##################### + + def files(self, provider: Optional[str] = None) -> List[str]: + """Returns list of available files produced by programs to download.""" + return self._files_client.list(provider) + + def file_download( + self, + file: str, + target_name: Optional[str] = None, + download_location: str = "./", + provider: Optional[str] = None, + ): + """Download file.""" + return self._files_client.download( + file, download_location, target_name, provider + ) + + def file_delete(self, file: str, provider: Optional[str] = None): + """Deletes file uploaded or produced by the programs,""" + return self._files_client.delete(file, provider) + + def file_upload(self, file: str, provider: Optional[str] = None): + """Upload file.""" + return self._files_client.upload(file, provider) + + +class IBMServerlessClient(ServerlessClient): + """ + A client for connecting to the IBM serverless host. + + Credentials can be saved to disk by calling the `save_account()` method:: + + from qiskit_serverless import IBMServerlessClient + IBMServerlessClient.save_account(token=) + + Once the credentials are saved, you can simply instantiate the client with no + constructor args, as shown below. + + from qiskit_serverless import IBMServerlessClient + client = IBMServerlessClient() + + Instead of saving credentials to disk, you can also set the environment variable + ENV_GATEWAY_PROVIDER_TOKEN and then instantiate the client as below:: + + from qiskit_serverless import IBMServerlessClient + client = IBMServerlessClient() + + You can also enable an account just for the current session by instantiating the + provider with the API token:: + + from qiskit_serverless import IBMServerlessClient + client = IBMServerlessClient(token=) + """ + + def __init__(self, token: Optional[str] = None, name: Optional[str] = None): + """ + Initialize a client with access to an IBMQ-provided remote cluster. + + If a ``token`` is used to initialize an instance, the ``name`` argument + will be ignored. + + If only a ``name`` is provided, the token for the named account will + be retrieved from the user's local IBM Quantum account config file. + + If neither argument is provided, the token will be searched for in the + environment variables and also in the local IBM Quantum account config + file using the default account name. + + Args: + token: IBM quantum token + name: Name of the account to load + """ + token = token or QiskitRuntimeService(name=name).active_account().get("token") + super().__init__(token=token, host=IBM_SERVERLESS_HOST_URL) + + @staticmethod + def save_account( + token: Optional[str] = None, + name: Optional[str] = None, + overwrite: Optional[bool] = False, + ) -> None: + """ + Save the account to disk for future use. + + Args: + token: IBM Quantum API token + name: Name of the account to save + overwrite: ``True`` if the existing account is to be overwritten + """ + QiskitRuntimeService.save_account(token=token, name=name, overwrite=overwrite) + + +def _upload_with_docker_image( + program: QiskitFunction, url: str, token: str, span: Any, client: RunService +) -> RunnableQiskitFunction: + """Uploads function with custom docker image. + + Args: + program (QiskitFunction): function instance + url (str): upload gateway url + token (str): auth token + span (Any): tracing span + + Returns: + str: uploaded function name + """ + response_data = safe_json_request_as_dict( + request=lambda: requests.post( + url=url, + data={ + "title": program.title, + "provider": program.provider, + "image": program.image, + "arguments": json.dumps({}), + "dependencies": json.dumps(program.dependencies or []), + "env_vars": json.dumps(program.env_vars or {}), + "description": program.description, + }, + headers={"Authorization": f"Bearer {token}"}, + timeout=REQUESTS_TIMEOUT, + ) + ) + program_title = response_data.get("title", "na") + program_provider = response_data.get("provider", "na") + span.set_attribute("program.title", program_title) + span.set_attribute("program.provider", program_provider) + response_data["client"] = client + return RunnableQiskitFunction.from_json(response_data) + + +def _upload_with_artifact( + program: QiskitFunction, url: str, token: str, span: Any, client: RunService +) -> RunnableQiskitFunction: + """Uploads function with artifact. + + Args: + program (QiskitFunction): function instance + url (str): endpoint for gateway upload + token (str): auth token + span (Any): tracing span + + Raises: + QiskitServerlessException: if no entrypoint or size of artifact is too large. + + Returns: + str: uploaded function name + """ + artifact_file_path = os.path.join(program.working_dir, "artifact.tar") + + # check if entrypoint exists + if ( + not os.path.exists(os.path.join(program.working_dir, program.entrypoint)) + or program.entrypoint[0] == "/" + ): + raise QiskitServerlessException( + f"Entrypoint file [{program.entrypoint}] does not exist " + f"in [{program.working_dir}] working directory." + ) + + try: + with tarfile.open(artifact_file_path, "w", dereference=True) as tar: + for filename in os.listdir(program.working_dir): + fpath = os.path.join(program.working_dir, filename) + tar.add(fpath, arcname=filename) + + # check file size + size_in_mb = Path(artifact_file_path).stat().st_size / 1024**2 + if size_in_mb > MAX_ARTIFACT_FILE_SIZE_MB: + raise QiskitServerlessException( + f"{artifact_file_path} is {int(size_in_mb)} Mb, " + f"which is greater than {MAX_ARTIFACT_FILE_SIZE_MB} allowed. " + f"Try to reduce size of `working_dir`." + ) + + with open(artifact_file_path, "rb") as file: + response_data = safe_json_request_as_dict( + request=lambda: requests.post( + url=url, + data={ + "title": program.title, + "provider": program.provider, + "entrypoint": program.entrypoint, + "arguments": json.dumps({}), + "dependencies": json.dumps(program.dependencies or []), + "env_vars": json.dumps(program.env_vars or {}), + "description": program.description, + }, + files={"artifact": file}, + headers={"Authorization": f"Bearer {token}"}, + timeout=REQUESTS_TIMEOUT, + ) + ) + span.set_attribute("program.title", response_data.get("title", "na")) + span.set_attribute("program.provider", response_data.get("provider", "na")) + response_data["client"] = client + response_function = RunnableQiskitFunction.from_json(response_data) + except Exception as error: # pylint: disable=broad-exception-caught + raise QiskitServerlessException from error + finally: + if os.path.exists(artifact_file_path): + os.remove(artifact_file_path) + + return response_function diff --git a/client/qiskit_serverless/core/constants.py b/client/qiskit_serverless/core/constants.py index 2931873a1..73e2ac51e 100644 --- a/client/qiskit_serverless/core/constants.py +++ b/client/qiskit_serverless/core/constants.py @@ -23,6 +23,7 @@ # request timeout REQUESTS_TIMEOUT: int = 30 +REQUESTS_STREAMING_TIMEOUT: int = 60 REQUESTS_TIMEOUT_OVERRIDE = "REQUESTS_TIMEOUT_OVERRIDE" # gateway diff --git a/client/qiskit_serverless/core/decorators.py b/client/qiskit_serverless/core/decorators.py index 7fb4aa924..ffccc9305 100644 --- a/client/qiskit_serverless/core/decorators.py +++ b/client/qiskit_serverless/core/decorators.py @@ -358,13 +358,13 @@ def distribute_qiskit_function( # pylint: disable=import-outside-toplevel,cyclic-import from qiskit_serverless import QiskitServerlessException from qiskit_serverless.core.function import QiskitFunction - from qiskit_serverless.core.client import ServerlessProvider + from qiskit_serverless import ServerlessClient # create provider if provider is None: # try to create from env vars try: - provider = ServerlessProvider() + provider = ServerlessClient() except QiskitServerlessException as qs_error: raise QiskitServerlessException( "Set provider in arguments for `distribute_program` " diff --git a/client/qiskit_serverless/core/files.py b/client/qiskit_serverless/core/files.py index 2d7939a36..fbb799bcc 100644 --- a/client/qiskit_serverless/core/files.py +++ b/client/qiskit_serverless/core/files.py @@ -33,8 +33,11 @@ from opentelemetry import trace from tqdm import tqdm -from qiskit_serverless.core.constants import REQUESTS_TIMEOUT -from qiskit_serverless.utils.json import safe_json_request +from qiskit_serverless.core.constants import ( + REQUESTS_STREAMING_TIMEOUT, + REQUESTS_TIMEOUT, +) +from qiskit_serverless.utils.json import safe_json_request_as_dict class GatewayFilesClient: @@ -67,7 +70,7 @@ def download( params={"file": file, "provider": provider}, stream=True, headers={"Authorization": f"Bearer {self._token}"}, - timeout=REQUESTS_TIMEOUT, + timeout=REQUESTS_STREAMING_TIMEOUT, ) as req: req.raise_for_status() @@ -95,7 +98,7 @@ def upload(self, file: str, provider: Optional[str] = None) -> Optional[str]: data={"provider": provider}, stream=True, headers={"Authorization": f"Bearer {self._token}"}, - timeout=REQUESTS_TIMEOUT, + timeout=REQUESTS_STREAMING_TIMEOUT, ) as req: if req.ok: return req.text @@ -106,7 +109,7 @@ def list(self, provider: Optional[str] = None) -> List[str]: """Returns list of available files to download produced by programs,""" tracer = trace.get_tracer("client.tracer") with tracer.start_as_current_span("files.list"): - response_data = safe_json_request( + response_data = safe_json_request_as_dict( request=lambda: requests.get( f"{self.host}/api/{self.version}/files/", params={"provider": provider}, @@ -120,7 +123,7 @@ def delete(self, file: str, provider: Optional[str] = None) -> Optional[str]: """Deletes file uploaded or produced by the programs,""" tracer = trace.get_tracer("client.tracer") with tracer.start_as_current_span("files.delete"): - response_data = safe_json_request( + response_data = safe_json_request_as_dict( request=lambda: requests.delete( f"{self.host}/api/{self.version}/files/delete/", data={"file": file, "provider": provider}, diff --git a/client/qiskit_serverless/core/function.py b/client/qiskit_serverless/core/function.py index 6ee3cfa90..7adaac72f 100644 --- a/client/qiskit_serverless/core/function.py +++ b/client/qiskit_serverless/core/function.py @@ -26,10 +26,16 @@ QiskitFunction """ +from abc import ABC, abstractmethod import dataclasses import warnings from dataclasses import dataclass -from typing import Optional, Dict, List, Any, Tuple +from typing import Optional, Dict, List, Any, Tuple, Union + +from qiskit_serverless.core.job import ( + Job, + Configuration, +) @dataclass @@ -58,7 +64,6 @@ class QiskitFunction: # pylint: disable=too-many-instance-attributes version: Optional[str] = None tags: Optional[List[str]] = None raw_data: Optional[Dict[str, Any]] = None - job_client: Optional[Any] = None image: Optional[str] = None validate: bool = True schema: Optional[str] = None @@ -91,6 +96,70 @@ def __str__(self): def __repr__(self): return self.__str__() + def _validate_function(self) -> Tuple[bool, List[str]]: + """Validate function arguments using schema provided. + + Returns: + Tuple[bool, List[str]]: + boolean specifiying if function arguments are valid + list of validation errors, if any + """ + return True, [] + + +class RunService(ABC): + """Provide access to run a function and retrieve the jobs associated to that function""" + + @abstractmethod + def jobs(self, **kwargs) -> List[Job]: + """Return list of jobs. + + Returns: + list of jobs. + """ + + @abstractmethod + def run( + self, + program: Union[QiskitFunction, str], + arguments: Optional[Dict[str, Any]] = None, + config: Optional[Configuration] = None, + ) -> Job: + """Run a function and return its job.""" + + +class RunnableQiskitFunction(QiskitFunction): + """Serverless QiskitPattern. + + Args: + title: program name + provider: Qiskit Function provider reference + entrypoint: is a script that will be executed as a job + ex: job.py + env_vars: env vars + dependencies: list of python dependencies to execute a program + working_dir: directory where entrypoint file is located (max size 50MB) + description: description of a program + version: version of a program + """ + + _run_service: RunService = None + + def __init__( # pylint: disable=too-many-positional-arguments + self, client: RunService, **kwargs + ): + self._run_service = client + super().__init__(**kwargs) + + @classmethod + def from_json(cls, data: Dict[str, Any]): + """Reconstructs QiskitPattern from dictionary.""" + field_names = set(f.name for f in dataclasses.fields(QiskitFunction)) + client = data["client"] + return RunnableQiskitFunction( + client, **{k: v for k, v in data.items() if k in field_names} + ) + def run(self, **kwargs): """Run function @@ -100,7 +169,7 @@ def run(self, **kwargs): Returns: Job: job handler for function execution """ - if self.job_client is None: + if self._run_service is None: raise ValueError("No clients specified for a function.") if self.validate: @@ -112,14 +181,14 @@ def run(self, **kwargs): ) config = kwargs.pop("config", None) - return self.job_client.run( - program=self.title, - provider=self.provider, + return self._run_service.run( + program=self, arguments=kwargs, config=config, ) def get_jobs(self): + # pylint: disable=duplicate-code """List of jobs created in this function. Raises: @@ -145,11 +214,8 @@ def jobs(self): Returns: [Job] : list of jobs """ - from qiskit_serverless.core.job import ( # pylint: disable=import-outside-toplevel - Job, - ) - if self.job_client is None: + if self._run_service is None: raise ValueError("No clients specified for a function.") if self.validate: @@ -160,28 +226,15 @@ def jobs(self): f"Function validation failed. Validation errors:\n {error_string}", ) - response = self.job_client.get_jobs( + jobs = self._run_service.jobs( title=self.title, provider=self.provider, ) - jobs = [ - Job(job_id=job.get("id"), job_client=self.job_client, raw_data=job) - for job in response - ] return jobs - def _validate_function(self) -> Tuple[bool, List[str]]: - """Validate function arguments using schema provided. - - Returns: - Tuple[bool, List[str]]: - boolean specifiying if function arguments are valid - list of validation errors, if any - """ - return True, [] - # pylint: disable=abstract-method +# pylint: disable=too-few-public-methods class QiskitPattern(QiskitFunction): """ [Deprecated since version 0.10.0] Use :class:`.QiskitFunction` instead. diff --git a/client/qiskit_serverless/core/job.py b/client/qiskit_serverless/core/job.py index 3f3735a08..7b113e364 100644 --- a/client/qiskit_serverless/core/job.py +++ b/client/qiskit_serverless/core/job.py @@ -28,49 +28,34 @@ Job """ # pylint: disable=duplicate-code +from abc import ABC, abstractmethod import json import logging import os -import re -import tarfile import time -import sys import warnings -from pathlib import Path -from typing import Dict, Any, Optional, List, Union -from uuid import uuid4 -from dataclasses import asdict, dataclass - -import subprocess -from subprocess import Popen +from typing import Dict, Any, Optional, Union +from dataclasses import dataclass import ray.runtime_env import requests -from ray.dashboard.modules.job.sdk import JobSubmissionClient -from opentelemetry import trace from qiskit_ibm_runtime import QiskitRuntimeService from qiskit_serverless.core.constants import ( - OT_PROGRAM_NAME, REQUESTS_TIMEOUT, ENV_JOB_GATEWAY_TOKEN, ENV_JOB_GATEWAY_HOST, ENV_JOB_ID_GATEWAY, ENV_GATEWAY_PROVIDER_VERSION, GATEWAY_PROVIDER_VERSION_DEFAULT, - MAX_ARTIFACT_FILE_SIZE_MB, - ENV_JOB_ARGUMENTS, ) -from qiskit_serverless.core.function import QiskitFunction -from qiskit_serverless.exception import QiskitServerlessException from qiskit_serverless.serializers.program_serializers import ( QiskitObjectsEncoder, QiskitObjectsDecoder, ) -from qiskit_serverless.utils.json import is_jsonable, safe_json_request -from qiskit_serverless.utils.formatting import format_provider_name_and_title +from qiskit_serverless.utils.json import is_jsonable RuntimeEnv = ray.runtime_env.RuntimeEnv @@ -92,554 +77,35 @@ class Configuration: # pylint: disable=too-many-instance-attributes auto_scaling: Optional[bool] = False -class BaseJobClient: - """Base class for Job clients.""" +class JobService(ABC): + """Provide access to job information""" - def run( - self, - program: Union[str, QiskitFunction], - provider: Optional[str] = None, - arguments: Optional[Dict[str, Any]] = None, - config: Optional[Configuration] = None, - ) -> "Job": - """Runs program.""" - raise NotImplementedError - - def upload(self, program: QiskitFunction): - """Uploads program.""" - raise NotImplementedError - - def get(self, job_id) -> Optional["Job"]: - """Returns job by job id""" - raise NotImplementedError - - def list(self, **kwargs) -> List["Job"]: - """Returns list of jobs.""" - raise NotImplementedError - - def status(self, job_id: str): + @abstractmethod + def status(self, job_id: str) -> str: """Check status.""" - raise NotImplementedError - def stop(self, job_id: str, service: Optional[QiskitRuntimeService] = None): + @abstractmethod + def stop( + self, job_id: str, service: Optional[QiskitRuntimeService] = None + ) -> Union[str, bool]: """Stops job/program.""" - raise NotImplementedError - - def logs(self, job_id: str): - """Return logs.""" - raise NotImplementedError - - def filtered_logs(self, job_id: str, **kwargs): - """Return filtered logs.""" - raise NotImplementedError - def result(self, job_id: str): + @abstractmethod + def result(self, job_id: str) -> Any: """Return results.""" - raise NotImplementedError - - def get_programs(self, **kwargs): - """Returns list of programs.""" - raise NotImplementedError - - def get_program( - self, title: str, provider: Optional[str] = None - ) -> Optional[QiskitFunction]: - """Returns program based on parameters.""" - raise NotImplementedError - - def get_jobs(self, title: str, provider: Optional[str] = None): - """Returns job ids of executed program based on parameters.""" - raise NotImplementedError - - -class RayJobClient(BaseJobClient): - """RayJobClient.""" - - def __init__(self, client: JobSubmissionClient): - """Ray job client. - Wrapper around JobSubmissionClient - - Args: - client: JobSubmissionClient - """ - self._job_client = client - - def status(self, job_id: str): - return self._job_client.get_job_status(job_id).value - - def stop(self, job_id: str, service: Optional[QiskitRuntimeService] = None): - return self._job_client.stop_job(job_id) - - def logs(self, job_id: str): - return self._job_client.get_job_logs(job_id) - - def filtered_logs(self, job_id: str, **kwargs): - raise NotImplementedError - - def result(self, job_id: str): - return self.logs(job_id) - - def get(self, job_id) -> Optional["Job"]: - return Job(self._job_client.get_job_info(job_id).job_id, job_client=self) - - def list(self, **kwargs) -> List["Job"]: - return [ - Job(job.job_id, job_client=self) for job in self._job_client.list_jobs() - ] - - def run( - self, - program: Union[str, QiskitFunction], - provider: Optional[str] = None, - arguments: Optional[Dict[str, Any]] = None, - config: Optional[Configuration] = None, - ): - if not isinstance(program, QiskitFunction): - warnings.warn( - "`run` doesn't support program str yet. " - "Send a QiskitFunction instead. " - ) - return NotImplementedError - - arguments = arguments or {} - entrypoint = f"python {program.entrypoint}" - - # set program name so OT can use it as parent span name - env_vars = { - **(program.env_vars or {}), - **{OT_PROGRAM_NAME: program.title}, - **{ENV_JOB_ARGUMENTS: json.dumps(arguments, cls=QiskitObjectsEncoder)}, - } - - job_id = self._job_client.submit_job( - entrypoint=entrypoint, - submission_id=f"qs_{uuid4()}", - runtime_env={ - "working_dir": program.working_dir, - "pip": program.dependencies, - "env_vars": env_vars, - }, - ) - return Job(job_id=job_id, job_client=self) - - def upload(self, program: QiskitFunction): - raise NotImplementedError("Upload is not available for RayJobClient.") - - -class LocalJobClient(BaseJobClient): - """LocalJobClient.""" - - def __init__(self): - """Local job client. - - Args: - """ - self._jobs = {} - self._patterns = [] - - def status(self, job_id: str): - return self._jobs[job_id]["status"] - - def stop(self, job_id: str, service: Optional[QiskitRuntimeService] = None): - """Stops job/program.""" - return f"job:{job_id} has already stopped" - - def logs(self, job_id: str): - return self._jobs[job_id]["logs"] - - def result(self, job_id: str): - return self._jobs[job_id]["result"] - - def get(self, job_id) -> Optional["Job"]: - return self._jobs[job_id]["job"] - def list(self, **kwargs) -> List["Job"]: - return [job["job"] for job in list(self._jobs.values())] - - def filtered_logs(self, job_id: str, **kwargs): - """Return filtered logs.""" - raise NotImplementedError - - def run( # pylint: disable=too-many-locals - self, - program: Union[str, QiskitFunction], - provider: Optional[str] = None, - arguments: Optional[Dict[str, Any]] = None, - config: Optional[Configuration] = None, - ): - if isinstance(program, QiskitFunction): - title = program.title - else: - title = str(program) - - for pattern in self._patterns: - if pattern["title"] == title: - saved_program = pattern - if saved_program[ # pylint: disable=possibly-used-before-assignment - "dependencies" - ]: - dept = json.loads(saved_program["dependencies"]) - for dependency in dept: - subprocess.check_call( - [sys.executable, "-m", "pip", "install", dependency] - ) - arguments = arguments or {} - env_vars = { - **(saved_program["env_vars"] or {}), - **{OT_PROGRAM_NAME: saved_program["title"]}, - **{"PATH": os.environ["PATH"]}, - **{ENV_JOB_ARGUMENTS: json.dumps(arguments, cls=QiskitObjectsEncoder)}, - } - - with Popen( - ["python", saved_program["working_dir"] + saved_program["entrypoint"]], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - universal_newlines=True, - env=env_vars, - ) as pipe: - status = "SUCCEEDED" - if pipe.wait(): - status = "FAILED" - output, _ = pipe.communicate() - results = re.search("\nSaved Result:(.+?):End Saved Result\n", output) - result = "" - if results: - result = results.group(1) - - job = Job(job_id=str(uuid4()), job_client=self) - entry = {"status": status, "logs": output, "result": result, "job": job} - self._jobs[job.job_id] = entry - return job - - def upload(self, program: QiskitFunction): - # check if entrypoint exists - if not os.path.exists(os.path.join(program.working_dir, program.entrypoint)): - raise QiskitServerlessException( - f"Entrypoint file [{program.entrypoint}] does not exist " - f"in [{program.working_dir}] working directory." - ) - self._patterns.append( - { - "title": program.title, - "provider": program.provider, - "entrypoint": program.entrypoint, - "working_dir": program.working_dir, - "env_vars": program.env_vars, - "arguments": json.dumps({}), - "dependencies": json.dumps(program.dependencies or []), - } - ) - return program.title - - def get_programs(self, **kwargs): - """Returns list of programs.""" - return [ - QiskitFunction( - program.get("title"), - provider=program.get("provider", None), - raw_data=program, - job_client=self, - ) - for program in self._patterns - ] - - def get_program( - self, title: str, provider: Optional[str] = None - ) -> Optional[QiskitFunction]: - """Returns program based on parameters.""" - all_programs = { - program.get("title"): QiskitFunction( - program.get("title"), - provider=program.get("provider", None), - raw_data=program, - job_client=self, - ) - for program in self._patterns - } - return all_programs.get("title") - - -class GatewayJobClient(BaseJobClient): - """GatewayJobClient.""" - - def __init__(self, host: str, token: str, version: str): - """Job client for Gateway service. + @abstractmethod + def logs(self, job_id: str) -> str: + """Return logs.""" + @abstractmethod + def filtered_logs(self, job_id: str, **kwargs) -> str: + """Returns logs of the job. Args: - host: gateway host - version: gateway version - token: authorization token + job_id: The job's logs + include: rex expression finds match in the log line to be included + exclude: rex expression finds match in the log line to be excluded """ - self.host = host - self.version = version - self._token = token - - def run( # pylint: disable=too-many-locals - self, - program: Union[str, QiskitFunction], - provider: Optional[str] = None, - arguments: Optional[Dict[str, Any]] = None, - config: Optional[Configuration] = None, - ) -> "Job": - if isinstance(program, QiskitFunction): - title = program.title - else: - title = str(program) - - tracer = trace.get_tracer("client.tracer") - with tracer.start_as_current_span("job.run") as span: - span.set_attribute("program", title) - span.set_attribute("provider", provider) - span.set_attribute("arguments", str(arguments)) - - url = f"{self.host}/api/{self.version}/programs/run/" - - data = { - "title": title, - "provider": provider, - "arguments": json.dumps(arguments or {}, cls=QiskitObjectsEncoder), - } # type: Dict[str, Any] - if config: - data["config"] = asdict(config) - else: - data["config"] = asdict(Configuration()) - - response_data = safe_json_request( - request=lambda: requests.post( - url=url, - json=data, - headers={"Authorization": f"Bearer {self._token}"}, - timeout=REQUESTS_TIMEOUT, - ) - ) - job_id = response_data.get("id") - span.set_attribute("job.id", job_id) - - return Job(job_id, job_client=self) - - def upload(self, program: QiskitFunction): - tracer = trace.get_tracer("client.tracer") - with tracer.start_as_current_span("job.run") as span: - span.set_attribute("program", program.title) - url = f"{self.host}/api/{self.version}/programs/upload/" - - if program.image is not None: - # upload function with custom image - program_title = _upload_with_docker_image( - program=program, url=url, token=self._token, span=span - ) - elif program.entrypoint is not None: - # upload funciton with artifact - program_title = _upload_with_artifact( - program=program, url=url, token=self._token, span=span - ) - else: - raise QiskitServerlessException( - "Function must either have `entrypoint` or `image` specified." - ) - - return program_title - - def status(self, job_id: str): - tracer = trace.get_tracer("client.tracer") - with tracer.start_as_current_span("job.status"): - default_status = "Unknown" - response_data = safe_json_request( - request=lambda: requests.get( - f"{self.host}/api/{self.version}/jobs/{job_id}/", - headers={"Authorization": f"Bearer {self._token}"}, - timeout=REQUESTS_TIMEOUT, - ) - ) - - return response_data.get("status", default_status) - - def stop(self, job_id: str, service: Optional[QiskitRuntimeService] = None): - tracer = trace.get_tracer("client.tracer") - with tracer.start_as_current_span("job.stop"): - if service: - data = { - "service": json.dumps(service, cls=QiskitObjectsEncoder), - } - else: - data = { - "service": None, - } - response_data = safe_json_request( - request=lambda: requests.post( - f"{self.host}/api/{self.version}/jobs/{job_id}/stop/", - headers={"Authorization": f"Bearer {self._token}"}, - timeout=REQUESTS_TIMEOUT, - json=data, - ) - ) - - return response_data.get("message") - - def logs(self, job_id: str): - tracer = trace.get_tracer("client.tracer") - with tracer.start_as_current_span("job.logs"): - response_data = safe_json_request( - request=lambda: requests.get( - f"{self.host}/api/{self.version}/jobs/{job_id}/logs/", - headers={"Authorization": f"Bearer {self._token}"}, - timeout=REQUESTS_TIMEOUT, - ) - ) - return response_data.get("logs") - - def filtered_logs(self, job_id: str, **kwargs): - all_logs = self.logs(job_id=job_id) - included = "" - include = kwargs.get("include") - if include is not None: - for line in all_logs.split("\n"): - if re.search(include, line) is not None: - included = included + line + "\n" - else: - included = all_logs - - excluded = "" - exclude = kwargs.get("exclude") - if exclude is not None: - for line in included.split("\n"): - if line != "" and re.search(exclude, line) is None: - excluded = excluded + line + "\n" - else: - excluded = included - return excluded - - def result(self, job_id: str): - tracer = trace.get_tracer("client.tracer") - with tracer.start_as_current_span("job.result"): - response_data = safe_json_request( - request=lambda: requests.get( - f"{self.host}/api/{self.version}/jobs/{job_id}/", - headers={"Authorization": f"Bearer {self._token}"}, - timeout=REQUESTS_TIMEOUT, - ) - ) - return json.loads( - response_data.get("result", "{}") or "{}", cls=QiskitObjectsDecoder - ) - - def get(self, job_id) -> Optional["Job"]: - tracer = trace.get_tracer("client.tracer") - with tracer.start_as_current_span("job.get"): - url = f"{self.host}/api/{self.version}/jobs/{job_id}/" - response_data = safe_json_request( - request=lambda: requests.get( - url, - headers={"Authorization": f"Bearer {self._token}"}, - timeout=REQUESTS_TIMEOUT, - ) - ) - - job = None - job_id = response_data.get("id") - if job_id is not None: - job = Job( - job_id=response_data.get("id"), - job_client=self, - ) - - return job - - def list(self, **kwargs) -> List["Job"]: - tracer = trace.get_tracer("client.tracer") - with tracer.start_as_current_span("job.list"): - limit = kwargs.get("limit", 10) - kwargs["limit"] = limit - offset = kwargs.get("offset", 0) - kwargs["offset"] = offset - response_data = safe_json_request( - request=lambda: requests.get( - f"{self.host}/api/{self.version}/jobs", - params=kwargs, - headers={"Authorization": f"Bearer {self._token}"}, - timeout=REQUESTS_TIMEOUT, - ) - ) - return [ - Job(job.get("id"), job_client=self, raw_data=job) - for job in response_data.get("results", []) - ] - - def get_programs(self, **kwargs): - tracer = trace.get_tracer("client.tracer") - with tracer.start_as_current_span("program.list"): - response_data = safe_json_request( - request=lambda: requests.get( - f"{self.host}/api/{self.version}/programs", - headers={"Authorization": f"Bearer {self._token}"}, - params=kwargs, - timeout=REQUESTS_TIMEOUT, - ) - ) - return [ - QiskitFunction( - program.get("title"), - provider=program.get("provider", None), - raw_data=program, - job_client=self, - description=program.get("description"), - ) - for program in response_data - ] - - def get_program( - self, title: str, provider: Optional[str] = None - ) -> Optional[QiskitFunction]: - """Returns program based on parameters.""" - provider, title = format_provider_name_and_title( - request_provider=provider, title=title - ) - - tracer = trace.get_tracer("client.tracer") - with tracer.start_as_current_span("program.get_by_title"): - response_data = safe_json_request( - request=lambda: requests.get( - f"{self.host}/api/{self.version}/programs/get_by_title/{title}", - headers={"Authorization": f"Bearer {self._token}"}, - params={"provider": provider}, - timeout=REQUESTS_TIMEOUT, - ) - ) - return QiskitFunction( - response_data.get("title"), - provider=response_data.get("provider", None), - raw_data=response_data, - job_client=self, - ) - - def get_jobs(self, title: str, provider: Optional[str] = None): - """Returns job ids executed the program based on parameters.""" - provider, title = format_provider_name_and_title( - request_provider=provider, title=title - ) - - tracer = trace.get_tracer("client.tracer") - with tracer.start_as_current_span("program.get_by_title"): - response_data = safe_json_request( - request=lambda: requests.get( - f"{self.host}/api/{self.version}/programs/get_by_title/{title}", - headers={"Authorization": f"Bearer {self._token}"}, - params={"provider": provider}, - timeout=REQUESTS_TIMEOUT, - ) - ) - program_id = response_data.get("id", None) - if not program_id: - return None - response_data = safe_json_request( - request=lambda: requests.get( - f"{self.host}/api/{self.version}/programs/{program_id}/get_jobs/", - headers={"Authorization": f"Bearer {self._token}"}, - timeout=REQUESTS_TIMEOUT, - ) - ) - return response_data class Job: @@ -648,22 +114,22 @@ class Job: def __init__( self, job_id: str, - job_client: BaseJobClient, + job_service: JobService, raw_data: Optional[Dict[str, Any]] = None, ): """Job class for async script execution. Args: job_id: if of the job - job_client: job client + client: client """ self.job_id = job_id - self._job_client = job_client + self._job_service = job_service self.raw_data = raw_data or {} def status(self): """Returns status of the job.""" - return _map_status_to_serverless(self._job_client.status(self.job_id)) + return _map_status_to_serverless(self._job_service.status(self.job_id)) def stop(self, service: Optional[QiskitRuntimeService] = None): """Stops the job from running.""" @@ -677,11 +143,11 @@ def stop(self, service: Optional[QiskitRuntimeService] = None): def cancel(self, service: Optional[QiskitRuntimeService] = None): """Cancels the job.""" - return self._job_client.stop(self.job_id, service=service) + return self._job_service.stop(self.job_id, service=service) def logs(self) -> str: """Returns logs of the job.""" - return self._job_client.logs(self.job_id) + return self._job_service.logs(self.job_id) def filtered_logs(self, **kwargs) -> str: """Returns logs of the job. @@ -689,7 +155,7 @@ def filtered_logs(self, **kwargs) -> str: include: rex expression finds match in the log line to be included exclude: rex expression finds match in the log line to be excluded """ - return self._job_client.filtered_logs(job_id=self.job_id, **kwargs) + return self._job_service.filtered_logs(job_id=self.job_id, **kwargs) def result(self, wait=True, cadence=5, verbose=False, maxwait=0): """Return results of the job. @@ -712,7 +178,7 @@ def result(self, wait=True, cadence=5, verbose=False, maxwait=0): logging.info(count) # Retrieve the results. If they're string format, try to decode to a dictionary. - results = self._job_client.result(self.job_id) + results = self._job_service.result(self.job_id) if isinstance(results, str): try: results = json.loads(results, cls=QiskitObjectsDecoder) @@ -813,115 +279,3 @@ def _map_status_to_serverless(status: str) -> str: return status_map[status] except KeyError: return status - - -def _upload_with_docker_image( - program: QiskitFunction, url: str, token: str, span: Any -) -> str: - """Uploads function with custom docker image. - - Args: - program (QiskitFunction): function instance - url (str): upload gateway url - token (str): auth token - span (Any): tracing span - - Returns: - str: uploaded function name - """ - response_data = safe_json_request( - request=lambda: requests.post( - url=url, - data={ - "title": program.title, - "provider": program.provider, - "image": program.image, - "arguments": json.dumps({}), - "dependencies": json.dumps(program.dependencies or []), - "env_vars": json.dumps(program.env_vars or {}), - "description": program.description, - }, - headers={"Authorization": f"Bearer {token}"}, - timeout=REQUESTS_TIMEOUT, - ) - ) - program_title = response_data.get("title", "na") - program_provider = response_data.get("provider", "na") - span.set_attribute("program.title", program_title) - span.set_attribute("program.provider", program_provider) - return program_title - - -def _upload_with_artifact( - program: QiskitFunction, url: str, token: str, span: Any -) -> str: - """Uploads function with artifact. - - Args: - program (QiskitFunction): function instance - url (str): endpoint for gateway upload - token (str): auth token - span (Any): tracing span - - Raises: - QiskitServerlessException: if no entrypoint or size of artifact is too large. - - Returns: - str: uploaded function name - """ - artifact_file_path = os.path.join(program.working_dir, "artifact.tar") - - # check if entrypoint exists - if ( - not os.path.exists(os.path.join(program.working_dir, program.entrypoint)) - or program.entrypoint[0] == "/" - ): - raise QiskitServerlessException( - f"Entrypoint file [{program.entrypoint}] does not exist " - f"in [{program.working_dir}] working directory." - ) - - try: - with tarfile.open(artifact_file_path, "w") as tar: - for filename in os.listdir(program.working_dir): - fpath = os.path.join(program.working_dir, filename) - tar.add(fpath, arcname=filename) - - # check file size - size_in_mb = Path(artifact_file_path).stat().st_size / 1024**2 - if size_in_mb > MAX_ARTIFACT_FILE_SIZE_MB: - raise QiskitServerlessException( - f"{artifact_file_path} is {int(size_in_mb)} Mb, " - f"which is greater than {MAX_ARTIFACT_FILE_SIZE_MB} allowed. " - f"Try to reduce size of `working_dir`." - ) - - with open(artifact_file_path, "rb") as file: - response_data = safe_json_request( - request=lambda: requests.post( - url=url, - data={ - "title": program.title, - "provider": program.provider, - "entrypoint": program.entrypoint, - "arguments": json.dumps({}), - "dependencies": json.dumps(program.dependencies or []), - "env_vars": json.dumps(program.env_vars or {}), - "description": program.description, - }, - files={"artifact": file}, - headers={"Authorization": f"Bearer {token}"}, - timeout=REQUESTS_TIMEOUT, - ) - ) - program_title = response_data.get("title", "na") - program_provider = response_data.get("provider", "na") - span.set_attribute("program.title", program_title) - span.set_attribute("program.provider", program_provider) - except Exception as error: # pylint: disable=broad-exception-caught - raise QiskitServerlessException from error - finally: - if os.path.exists(artifact_file_path): - os.remove(artifact_file_path) - - return program_title diff --git a/client/qiskit_serverless/utils/json.py b/client/qiskit_serverless/utils/json.py index f611b9f6a..61ab5baeb 100644 --- a/client/qiskit_serverless/utils/json.py +++ b/client/qiskit_serverless/utils/json.py @@ -26,9 +26,9 @@ JsonSerializable """ import json -from abc import ABC +from abc import ABC, abstractmethod from json import JSONEncoder -from typing import Optional, Type, Callable, Dict, Any +from typing import List, Optional, Type, Callable, Dict, Any, Union import requests @@ -40,9 +40,9 @@ class JsonSerializable(ABC): """Classes that can be serialized as json.""" @classmethod + @abstractmethod def from_dict(cls, dictionary: dict): """Converts dict to object.""" - raise NotImplementedError def to_dict(self) -> dict: """Converts class to dict.""" @@ -74,7 +74,49 @@ def is_jsonable(data, cls: Optional[Type[JSONEncoder]] = None): return False -def safe_json_request(request: Callable, verbose: bool = False) -> Dict[str, Any]: +def safe_json_request_as_list(request: Callable, verbose: bool = False) -> List[Any]: + """Returns parsed json data from request. + + Args: + request: callable for request. + verbose: post reason in error message + + Example: + >>> safe_json_request(request=lambda: requests.get("https://ibm.com")) + + Returns: + parsed json response as list structure + """ + response = safe_json_request(request, verbose) + if isinstance(response, List): + return response + raise TypeError("JSON is not a List") + + +def safe_json_request_as_dict( + request: Callable, verbose: bool = False +) -> Dict[str, Any]: + """Returns parsed json data from request. + + Args: + request: callable for request. + verbose: post reason in error message + + Example: + >>> safe_json_request(request=lambda: requests.get("https://ibm.com")) + + Returns: + parsed json response as dict structure + """ + response = safe_json_request(request, verbose) + if isinstance(response, Dict): + return response + raise TypeError("JSON is not a Dict") + + +def safe_json_request( + request: Callable, verbose: bool = False +) -> Union[Dict[str, Any], List[Any]]: """Returns parsed json data from request. Args: diff --git a/client/tests/core/test_job.py b/client/tests/core/test_job.py index 08485f226..f0edcce0d 100644 --- a/client/tests/core/test_job.py +++ b/client/tests/core/test_job.py @@ -1,19 +1,29 @@ """Tests job.""" + +# pylint: disable=too-few-public-methods import os from unittest import TestCase -from unittest.mock import MagicMock +from unittest.mock import MagicMock, Mock, patch import numpy as np import requests_mock from qiskit.circuit.random import random_circuit +from qiskit_serverless import ServerlessClient from qiskit_serverless.core.constants import ( ENV_JOB_GATEWAY_HOST, ENV_JOB_ID_GATEWAY, ENV_JOB_GATEWAY_TOKEN, ) -from qiskit_serverless.core.job import save_result, GatewayJobClient +from qiskit_serverless.core.job import save_result + + +class ResponseMock: + """Utility class to mock request.get response with a json""" + + ok = True + text = "{}" class TestJob(TestCase): @@ -40,9 +50,10 @@ def test_save_result(self): ) self.assertTrue(result) + @patch("requests.get", Mock(return_value=ResponseMock())) def test_filtered_logs(self): """Tests job filtered log.""" - client = GatewayJobClient("host", "token", "version") + client = ServerlessClient(host="host", token="token", version="version") client.logs = MagicMock( return_value="This is the line 1\nThis is the second line\nOK. This is the last line.\n", # pylint: disable=line-too-long ) diff --git a/client/tests/core/test_pattern.py b/client/tests/core/test_pattern.py index 79909448f..99cbd2dd0 100644 --- a/client/tests/core/test_pattern.py +++ b/client/tests/core/test_pattern.py @@ -3,11 +3,9 @@ from testcontainers.compose import DockerCompose -from qiskit_serverless import BaseClient -from qiskit_serverless.core import ComputeResource +from qiskit_serverless import RayClient, QiskitFunction from qiskit_serverless.core.job import Job -from qiskit_serverless.core.function import QiskitFunction -from tests.utils import wait_for_job_client, wait_for_job_completion +from tests.utils import wait_for_ray_ready, wait_for_job_completion resources_path = os.path.join( os.path.dirname(os.path.abspath(__file__)), "../resources" @@ -22,14 +20,11 @@ def test_program(): ) as compose: host = compose.get_service_host("testrayhead", 8265) port = compose.get_service_port("testrayhead", 8265) + connection_url = f"http://{host}:{port}" - serverless = BaseClient( - name="docker", - compute_resource=ComputeResource( - name="docker", host=host, port_job_server=port - ), - ) - wait_for_job_client(serverless) + wait_for_ray_ready(connection_url) + + serverless = RayClient(host=connection_url) program = QiskitFunction( title="simple_job", @@ -49,7 +44,7 @@ def test_program(): assert job.in_terminal_state() assert job.status() == "DONE" - recovered_job = serverless.get_job_by_id(job.job_id) + recovered_job = serverless.job(job.job_id) assert recovered_job.job_id == job.job_id assert "42" in recovered_job.logs() assert recovered_job.in_terminal_state() diff --git a/client/tests/utils.py b/client/tests/utils.py index 90ae4b16a..1fe95cc89 100644 --- a/client/tests/utils.py +++ b/client/tests/utils.py @@ -1,17 +1,20 @@ """Test utils.""" import time -from qiskit_serverless import BaseClient +from ray.dashboard.modules.job.sdk import JobSubmissionClient + from qiskit_serverless.core.job import Job -def wait_for_job_client(serverless: BaseClient, timeout: int = 60): - """Utility function that wait for job client to awake.""" +def wait_for_ray_ready(connection_url: str, timeout: int = 60): + """Utility function that waits for ray to be up.""" + client = None must_finish = time.time() + timeout - while time.time() < must_finish: - if serverless.job_client() is not None: - break - time.sleep(1) + while time.time() < must_finish and not client: + try: + client = JobSubmissionClient(connection_url) + except ConnectionError: + time.sleep(1) def wait_for_job_completion(job: Job, timeout: int = 60): diff --git a/docker-compose.yaml b/docker-compose.yaml index ae4213500..30ccd331e 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -3,7 +3,7 @@ services: ray-head: user: "0" container_name: ray-head - image: icr.io/quantum-public/qiskit-serverless/ray-node:${VERSION:-0.17.0} + image: icr.io/quantum-public/qiskit-serverless/ray-node:${VERSION:-0.17.1} entrypoint: [ "ray", "start", "--head", "--port=6379", "--dashboard-host=0.0.0.0", "--block" @@ -28,7 +28,7 @@ services: always gateway: container_name: gateway - image: icr.io/quantum-public/qiskit-serverless/gateway:${VERSION:-0.17.0} + image: icr.io/quantum-public/qiskit-serverless/gateway:${VERSION:-0.17.1} command: gunicorn main.wsgi:application --bind 0.0.0.0:8000 --workers=4 ports: - 8000:8000 @@ -53,7 +53,7 @@ services: - postgres scheduler: container_name: scheduler - image: icr.io/quantum-public/qiskit-serverless/gateway:${VERSION:-0.17.0} + image: icr.io/quantum-public/qiskit-serverless/gateway:${VERSION:-0.17.1} entrypoint: "./scripts/scheduler.sh" environment: - DEBUG=0 diff --git a/docs/deployment/cloud.rst b/docs/deployment/cloud.rst index 079743515..845292a88 100644 --- a/docs/deployment/cloud.rst +++ b/docs/deployment/cloud.rst @@ -84,7 +84,7 @@ Once your cluster is ready, the installation is relatively straightforward with and run the next commands: .. code-block:: - :caption: run this commands with the release version like 0.17.0 in x.y.z (2 places) + :caption: run this commands with the release version like 0.17.1 in x.y.z (2 places) $ helm -n install qiskit-serverless --create-namespace https://github.com/Qiskit/qiskit-serverless/releases/download/vx.y.z/qiskit-serverless-x.y.z.tgz @@ -119,6 +119,6 @@ with the configuration of your domain and provider. Optionally, you can install an observability package to handle logging and monitoring on your cluster by running the following command: .. code-block:: - :caption: run this commands with the release version like 0.17.0 in x.y.z (2 places) using the same namespace as in the previous helm command + :caption: run this commands with the release version like 0.17.1 in x.y.z (2 places) using the same namespace as in the previous helm command $ helm -n install qs-observability https://github.com/Qiskit/qiskit-serverless/releases/download/vx.y.z/qs-observability-x.y.z.tgz diff --git a/docs/deployment/custom_function/Sample-Dockerfile b/docs/deployment/custom_function/Sample-Dockerfile index 87e2419ac..5f8bd4a57 100644 --- a/docs/deployment/custom_function/Sample-Dockerfile +++ b/docs/deployment/custom_function/Sample-Dockerfile @@ -1,4 +1,4 @@ -FROM icr.io/quantum-public/qiskit-serverless/ray-node:0.17.0 +FROM icr.io/quantum-public/qiskit-serverless/ray-node:0.17.1 # install all necessary dependencies for your custom image diff --git a/docs/deployment/deploying_custom_image_function.rst b/docs/deployment/deploying_custom_image_function.rst index f86ab7752..29ba0a51f 100644 --- a/docs/deployment/deploying_custom_image_function.rst +++ b/docs/deployment/deploying_custom_image_function.rst @@ -50,7 +50,7 @@ In our simple case it will look something like this: .. code-block:: :caption: Dockerfile for custom image function. - FROM icr.io/quantum-public/qiskit-serverless/ray-node:0.17.0 + FROM icr.io/quantum-public/qiskit-serverless/ray-node:0.17.1 # install all necessary dependencies for your custom image diff --git a/docs/deployment/example_custom_image_function.rst b/docs/deployment/example_custom_image_function.rst index 5ccba3a60..81b03ba88 100644 --- a/docs/deployment/example_custom_image_function.rst +++ b/docs/deployment/example_custom_image_function.rst @@ -49,7 +49,7 @@ Dockerfile .. code-block:: :caption: Dockerfile - FROM icr.io/quantum-public/qiskit-serverless/ray-node:0.17.0 + FROM icr.io/quantum-public/qiskit-serverless/ray-node:0.17.1 # install all necessary dependencies for your custom image diff --git a/gateway/Dockerfile b/gateway/Dockerfile index ac92454e8..e95df3aa3 100644 --- a/gateway/Dockerfile +++ b/gateway/Dockerfile @@ -1,37 +1,61 @@ -FROM registry.access.redhat.com/ubi9-minimal:9.4@sha256:104cf11d890aeb7dd5728b7d7732e175a0e4018f1bb00d2faebcc8f6bf29bd52 -RUN microdnf install -y python3.11-3.11.7 python3.11-pip-22.3.1 python3.11-devel-3.11.7 vim-enhanced-8.2.2637 &&\ - microdnf clean all -RUN ln -s /usr/bin/python3.11 /usr/local/bin/python3 && \ - ln -s /usr/bin/python3.11 /usr/local/bin/python &&\ - ln -s /usr/bin/pip3.11 /usr/local/bin/pip3 &&\ - ln -s /usr/bin/pip3.11 /usr/local/bin/pip +ARG MICRO_IMAGE_DIR=/ubi-micro-img +# BASE image using UBI 9 micro where the +# application and requirements will be installed +FROM registry.access.redhat.com/ubi9-micro:9.4-15 AS BASE + +# BUILD image using UBI 9 where the dependencies that +# require installing with a package manager will be installed +FROM registry.access.redhat.com/ubi9:9.4-1214.1726694543 AS BUILD +ARG MICRO_IMAGE_DIR + +# Copy the BASE image into the BUILD image +RUN mkdir ${MICRO_IMAGE_DIR} +COPY --from=BASE / ${MICRO_IMAGE_DIR} + +# Install Python inside the BASE image +# hadolint ignore=DL3041 +RUN dnf install --installroot ${MICRO_IMAGE_DIR} --nodocs -y \ + python3.11-3.11.7 \ + python3.11-devel-3.11.7 \ + libstdc++ &&\ + dnf upgrade --installroot ${MICRO_IMAGE_DIR} --nodocs -y && \ + dnf clean all --installroot ${MICRO_IMAGE_DIR} + +# APP image from `scratch` which will be the final image +# and remaining application requirements will be installed +FROM scratch AS APP +ARG MICRO_IMAGE_DIR +# hadolint ignore=DL3045 +COPY --from=BUILD ${MICRO_IMAGE_DIR}/ . + +# create symlinks for python +RUN ln -s /usr/bin/python3.11 /usr/bin/python + +# Create project dir WORKDIR /usr/src/app # set environment variables -ENV PYTHONDONTWRITEBYTECODE 1 -ENV PYTHONUNBUFFERED 1 +ENV PYTHONDONTWRITEBYTECODE=1 +ENV PYTHONUNBUFFERED=1 -USER 0 COPY gateway/requirements.txt . -RUN pip install -r requirements.txt --no-cache-dir &&\ +# Install pip +RUN python3.11 -m ensurepip --upgrade +# Install dependencies and update then uninstall pip (not needed in final image) +RUN python3.11 -m pip install -r requirements.txt --no-cache-dir --upgrade && \ cp -r -n /usr/local/lib64/python3.11/site-packages/symengine /usr/local/lib/python3.11/site-packages &&\ - cp -r -n /usr/local/lib/python3.11/site-packages/symengine /usr/local/lib64/python3.11/site-packages + cp -r -n /usr/local/lib/python3.11/site-packages/symengine /usr/local/lib64/python3.11/site-packages &&\ + python3.11 -m pip uninstall -y pip + COPY gateway . RUN chown -R 1000:100 /usr/src/app &&\ mkdir /usr/src/app/media && chown 1000:100 /usr/src/app/media -# Need versions of pip/setuptools more recent than provided by UBI image -RUN python3.11 -m ensurepip --upgrade - -# hadolint ignore=DL3013 -RUN pip install --upgrade --no-cache-dir pip>=24.2 &&\ - pip install --upgrade --no-cache-dir setuptools>=72.1.0 - -USER 1000:100 RUN sed -i 's/\r$//g' /usr/src/app/entrypoint.sh &&\ chmod +x /usr/src/app/entrypoint.sh EXPOSE 8000 +USER 1000:100 # run entrypoint.sh ENTRYPOINT ["/usr/src/app/entrypoint.sh"] diff --git a/gateway/api/admin.py b/gateway/api/admin.py index f52962e93..32dc19c36 100644 --- a/gateway/api/admin.py +++ b/gateway/api/admin.py @@ -23,6 +23,7 @@ class ProgramAdmin(admin.ModelAdmin): search_fields = ["title", "author__username"] list_filter = ["provider", "type"] exclude = ["env_vars"] + filter_horizontal = ["instances"] @admin.register(ComputeResource) diff --git a/gateway/api/management/commands/schedule_queued_jobs.py b/gateway/api/management/commands/schedule_queued_jobs.py index d90f5708e..b9ff95cb5 100644 --- a/gateway/api/management/commands/schedule_queued_jobs.py +++ b/gateway/api/management/commands/schedule_queued_jobs.py @@ -30,9 +30,30 @@ class Command(BaseCommand): def handle(self, *args, **options): max_ray_clusters_possible = settings.LIMITS_MAX_CLUSTERS - number_of_clusters_running = ComputeResource.objects.filter(active=True).count() + max_gpu_clusters_possible = settings.LIMITS_GPU_CLUSTERS + number_of_clusters_running = ComputeResource.objects.filter( + active=True, gpu=False + ).count() + number_of_gpu_clusters_running = ComputeResource.objects.filter( + active=True, gpu=True + ).count() + + self.schedule_jobs_if_slots_available( + max_ray_clusters_possible, number_of_clusters_running, False + ) + self.schedule_jobs_if_slots_available( + max_gpu_clusters_possible, number_of_gpu_clusters_running, True + ) + + def schedule_jobs_if_slots_available( + self, max_ray_clusters_possible, number_of_clusters_running, gpu_job + ): + """Schedule jobs depending on free cluster slots.""" free_clusters_slots = max_ray_clusters_possible - number_of_clusters_running - logger.info("%s free cluster slots.", free_clusters_slots) + if gpu_job: + logger.info("%s free GPU cluster slots.", free_clusters_slots) + else: + logger.info("%s free CPU cluster slots.", free_clusters_slots) if free_clusters_slots < 1: # no available resources @@ -45,6 +66,9 @@ def handle(self, *args, **options): # we have available resources jobs = get_jobs_to_schedule_fair_share(slots=free_clusters_slots) + # only process jobs of the appropriate compute type + jobs = [job for job in jobs if job.gpu is gpu_job] + for job in jobs: # only for local mode if settings.RAY_CLUSTER_MODE.get( diff --git a/gateway/api/migrations/0032_computeresource_gpu_job_gpu.py b/gateway/api/migrations/0032_computeresource_gpu_job_gpu.py new file mode 100644 index 000000000..59ba67a16 --- /dev/null +++ b/gateway/api/migrations/0032_computeresource_gpu_job_gpu.py @@ -0,0 +1,23 @@ +# Generated by Django 4.2.15 on 2024-10-09 20:15 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("api", "0031_program_readable_title_provider_readable_name"), + ] + + operations = [ + migrations.AddField( + model_name="computeresource", + name="gpu", + field=models.BooleanField(default=False), + ), + migrations.AddField( + model_name="job", + name="gpu", + field=models.BooleanField(default=False), + ), + ] diff --git a/gateway/api/models.py b/gateway/api/models.py index a14f7d1b2..92dc7e878 100644 --- a/gateway/api/models.py +++ b/gateway/api/models.py @@ -143,6 +143,8 @@ class ComputeResource(models.Model): blank=True, ) + gpu = models.BooleanField(default=False, null=False) + def __str__(self): return self.title @@ -201,6 +203,8 @@ class Job(models.Model): blank=True, ) + gpu = models.BooleanField(default=False, null=False) + def __str__(self): return f"" diff --git a/gateway/api/ray.py b/gateway/api/ray.py index 0dd03e3af..b9d5e0164 100644 --- a/gateway/api/ray.py +++ b/gateway/api/ray.py @@ -249,6 +249,14 @@ def create_ray_cluster( # pylint: disable=too-many-branches job_config.auto_scaling = settings.RAY_CLUSTER_WORKER_AUTO_SCALING node_image = settings.RAY_NODE_IMAGE + # cpu job settings + node_selector_label = settings.RAY_CLUSTER_CPU_NODE_SELECTOR_LABEL + gpu_request = 0 + # if gpu job, use gpu nodes and resources + if job.gpu: + node_selector_label = settings.RAY_CLUSTER_GPU_NODE_SELECTOR_LABEL + gpu_request = settings.LIMITS_GPU_PER_TASK + # if user specified image use specified image function_data = user.username if job.program.image is not None: @@ -268,6 +276,8 @@ def create_ray_cluster( # pylint: disable=too-many-branches "max_workers": job_config.max_workers, "auto_scaling": job_config.auto_scaling, "user": user.username, + "node_selector_label": node_selector_label, + "gpu_request": gpu_request, } ) cluster_data = yaml.safe_load(manifest) @@ -292,6 +302,8 @@ def create_ray_cluster( # pylint: disable=too-many-branches resource.owner = user resource.title = cluster_name resource.host = host + if job.gpu: + resource.gpu = True resource.save() else: raise RuntimeError("Something went wrong during cluster creation") diff --git a/gateway/api/schedule.py b/gateway/api/schedule.py index ec1e19eb6..2dd3897f2 100644 --- a/gateway/api/schedule.py +++ b/gateway/api/schedule.py @@ -15,7 +15,7 @@ from api.models import Job, ComputeResource from api.ray import submit_job, create_ray_cluster, kill_ray_cluster -from api.utils import generate_cluster_name +from api.utils import generate_cluster_name, create_gpujob_allowlist from main import settings as config @@ -26,6 +26,7 @@ def execute_job(job: Job) -> Job: """Executes program. + 0. configure compute resource type 1. check if cluster exists 1.1 if not: create cluster 2. connect to cluster @@ -41,6 +42,16 @@ def execute_job(job: Job) -> Job: tracer = trace.get_tracer("scheduler.tracer") with tracer.start_as_current_span("execute.job") as span: + # configure functions to use gpus + gpujobs = create_gpujob_allowlist() + if ( + job.program.provider + and job.program.provider.name in gpujobs["gpu-functions"].keys() + ): + logger.debug("Job %s will be run on GPU nodes", job.id) + job.gpu = True + job.save() + compute_resource = ComputeResource.objects.filter( owner=job.author, active=True ).first() diff --git a/gateway/api/serializers.py b/gateway/api/serializers.py index 1ac2b0a4c..f2e3a9795 100644 --- a/gateway/api/serializers.py +++ b/gateway/api/serializers.py @@ -117,7 +117,11 @@ def update(self, instance, validated_data): instance.artifact = validated_data.get("artifact") instance.author = validated_data.get("author") instance.image = validated_data.get("image") - instance.description = validated_data.get("description") + + description = validated_data.get("description") + if description is not None: + instance.description = description + instance.save() return instance diff --git a/gateway/api/utils.py b/gateway/api/utils.py index 8c71ddf8c..9565033cc 100644 --- a/gateway/api/utils.py +++ b/gateway/api/utils.py @@ -428,3 +428,23 @@ def sanitize_name(name: str): sanitized_name += c return sanitized_name return name + + +def create_gpujob_allowlist(): + """ + Create dictionary of jobs allowed to run on gpu nodes. + + Sample format of json: + { "gpu-functions": { "mockprovider": [ "my-first-pattern" ] } } + """ + try: + with open(settings.GATEWAY_GPU_JOBS_CONFIG, encoding="utf-8", mode="r") as f: + gpujobs = json.load(f) + except IOError as e: + logger.error("Unable to open gpu job config file: %s", e) + raise ValueError("Unable to open gpu job config file") from e + except ValueError as e: + logger.error("Unable to decode gpu job allowlist: %s", e) + raise ValueError("Unable to decode gpujob allowlist") from e + + return gpujobs diff --git a/gateway/api/v1/gpu-jobs.json b/gateway/api/v1/gpu-jobs.json new file mode 100644 index 000000000..497427f7f --- /dev/null +++ b/gateway/api/v1/gpu-jobs.json @@ -0,0 +1,3 @@ +{ + "gpu-functions": {} +} diff --git a/gateway/api/v1/views/catalog.py b/gateway/api/v1/views/catalog.py index cab4c690f..437a73962 100644 --- a/gateway/api/v1/views/catalog.py +++ b/gateway/api/v1/views/catalog.py @@ -3,6 +3,7 @@ """ # pylint: disable=duplicate-code +from drf_yasg import openapi from drf_yasg.utils import swagger_auto_schema from rest_framework import permissions, status from rest_framework.decorators import action @@ -42,6 +43,15 @@ def retrieve(self, request, pk=None): @swagger_auto_schema( operation_description="Get a specific public function in the catalog by title", + manual_parameters=[ + openapi.Parameter( + "title", + openapi.IN_QUERY, + description="A title of the function", + type=openapi.TYPE_STRING, + required=True, + ) + ], responses={ status.HTTP_200_OK: v1_serializers.RetrieveCatalogSerializer(many=False) }, diff --git a/gateway/api/v1/views/files.py b/gateway/api/v1/views/files.py index 45f3e7e15..faa5a3150 100644 --- a/gateway/api/v1/views/files.py +++ b/gateway/api/v1/views/files.py @@ -2,7 +2,10 @@ Files view api for V1. """ +from drf_yasg import openapi +from drf_yasg.utils import swagger_auto_schema from rest_framework import permissions +from rest_framework.decorators import action from api.permissions import IsOwner from api import views @@ -14,3 +17,77 @@ class FilesViewSet(views.FilesViewSet): """ permission_classes = [permissions.IsAuthenticated, IsOwner] + + @swagger_auto_schema( + operation_description="List of available for user files", + manual_parameters=[ + openapi.Parameter( + "provider", + openapi.IN_QUERY, + description="provider name", + type=openapi.TYPE_STRING, + required=False, + ), + ], + ) + def list(self, request): + return super().list(request) + + @swagger_auto_schema( + operation_description="Download a specific file", + manual_parameters=[ + openapi.Parameter( + "file", + openapi.IN_QUERY, + description="file name", + type=openapi.TYPE_STRING, + required=True, + ), + openapi.Parameter( + "provider", + openapi.IN_QUERY, + description="provider name", + type=openapi.TYPE_STRING, + required=False, + ), + ], + ) + @action(methods=["GET"], detail=False) + def download(self, request): + return super().download(request) + + @swagger_auto_schema( + operation_description="Deletes file uploaded or produced by the programs", + request_body=openapi.Schema( + type=openapi.TYPE_OBJECT, + properties={ + "file": openapi.Schema( + type=openapi.TYPE_STRING, description="file name" + ), + "provider": openapi.Schema( + type=openapi.TYPE_STRING, description="provider name" + ), + }, + required=["file"], + ), + ) + @action(methods=["DELETE"], detail=False) + def delete(self, request): + return super().delete(request) + + @swagger_auto_schema( + operation_description="Upload selected file", + request_body=openapi.Schema( + type=openapi.TYPE_OBJECT, + properties={ + "file": openapi.Schema(type=openapi.TYPE_FILE, description="file name"), + "provider": openapi.Schema( + type=openapi.TYPE_STRING, description="provider name" + ), + }, + required=["file"], + ), + ) + @action(methods=["POST"], detail=False) + def upload(self, request): + return super().upload(request) diff --git a/gateway/api/v1/views/jobs.py b/gateway/api/v1/views/jobs.py index 858d3a8a9..236da91d3 100644 --- a/gateway/api/v1/views/jobs.py +++ b/gateway/api/v1/views/jobs.py @@ -46,4 +46,11 @@ def list(self, request): def result(self, request, pk=None): return super().result(request, pk) + @swagger_auto_schema( + operation_description="Stop a job", + ) + @action(methods=["POST"], detail=True) + def stop(self, request, pk=None): + return super().stop(request, pk) + ### We are not returning serializers in the rest of the end-points diff --git a/gateway/api/views/programs.py b/gateway/api/views/programs.py index ac3bb847d..21cb5e6c1 100644 --- a/gateway/api/views/programs.py +++ b/gateway/api/views/programs.py @@ -319,27 +319,6 @@ def _get_program_queryset_for_title_and_provider( codename=VIEW_PROGRAM_PERMISSION ) - # Groups logic - if type_filter: - if type_filter == "catalog": - view_permission_criteria = Q(permissions=view_program_permission) - groups_with_view_permissions = Group.objects.filter( - view_permission_criteria - ) - groups_with_view_permissions_criteria = Q( - instances__in=groups_with_view_permissions - ) - provider_exists_criteria = ~Q(provider=None) - result_queryset = Program.objects.filter( - groups_with_view_permissions_criteria & provider_exists_criteria - ) - return result_queryset - if type_filter == "serverless": - result_queryset = Program.objects.filter( - Q(author=author) & Q(provider=None) - ) - return result_queryset - user_criteria = Q(user=author) view_permission_criteria = Q(permissions=view_program_permission) author_groups_with_view_permissions = Group.objects.filter( @@ -354,11 +333,30 @@ def _get_program_queryset_for_title_and_provider( author_groups_with_view_permissions_count, ) - # Programs logic author_criteria = Q(author=author) author_groups_with_view_permissions_criteria = Q( instances__in=author_groups_with_view_permissions ) + + # Serverless filter only returns functions created by the author with the next criterias: + # user is the author of the function and there is no provider + if type_filter == "serverless": + provider_criteria = Q(provider=None) + result_queryset = Program.objects.filter( + author_criteria & provider_criteria + ) + return result_queryset + + # Catalog filter only returns providers functions that user has access: + # author has view permissions and the function has a provider assigned + if type_filter == "catalog": + provider_exists_criteria = ~Q(provider=None) + result_queryset = Program.objects.filter( + author_groups_with_view_permissions_criteria & provider_exists_criteria + ) + return result_queryset + + # If filter is not applied we return author and providers functions together title = sanitize_name(title) provider_name = sanitize_name(provider_name) if title: diff --git a/gateway/main/settings.py b/gateway/main/settings.py index bde0f5129..5229d2646 100644 --- a/gateway/main/settings.py +++ b/gateway/main/settings.py @@ -324,7 +324,9 @@ # resources limitations LIMITS_JOBS_PER_USER = int(os.environ.get("LIMITS_JOBS_PER_USER", "2")) LIMITS_MAX_CLUSTERS = int(os.environ.get("LIMITS_MAX_CLUSTERS", "6")) +LIMITS_GPU_CLUSTERS = int(os.environ.get("LIMITS_MAX_GPU_CLUSTERS", "1")) LIMITS_CPU_PER_TASK = int(os.environ.get("LIMITS_CPU_PER_TASK", "4")) +LIMITS_GPU_PER_TASK = int(os.environ.get("LIMITS_GPU_PER_TASK", "1")) LIMITS_MEMORY_PER_TASK = int(os.environ.get("LIMITS_MEMORY_PER_TASK", "8")) # ray cluster management @@ -336,7 +338,7 @@ ), } RAY_NODE_IMAGE = os.environ.get( - "RAY_NODE_IMAGE", "icr.io/quantum-public/qiskit-serverless/ray-node:0.17.0" + "RAY_NODE_IMAGE", "icr.io/quantum-public/qiskit-serverless/ray-node:0.17.1" ) RAY_CLUSTER_WORKER_REPLICAS = int(os.environ.get("RAY_CLUSTER_WORKER_REPLICAS", "1")) RAY_CLUSTER_WORKER_REPLICAS_MAX = int( @@ -367,12 +369,26 @@ os.environ.get("RAY_CLUSTER_NO_DELETE_ON_COMPLETE", False) ) +RAY_CLUSTER_CPU_NODE_SELECTOR_LABEL = os.environ.get( + "RAY_CLUSTER_CPU_NODE_SELECTOR_LABEL", + "ibm-cloud.kubernetes.io/worker-pool-name: default", +) + +RAY_CLUSTER_GPU_NODE_SELECTOR_LABEL = os.environ.get( + "RAY_CLUSTER_GPU_NODE_SELECTOR_LABEL", + "ibm-cloud.kubernetes.io/worker-pool-name: gpu-workers", +) + PROGRAM_TIMEOUT = int(os.environ.get("PROGRAM_TIMEOUT", "14")) GATEWAY_ALLOWLIST_CONFIG = str( os.environ.get("GATEWAY_ALLOWLIST_CONFIG", "api/v1/allowlist.json") ) +GATEWAY_GPU_JOBS_CONFIG = str( + os.environ.get("GATEWAY_GPU_JOBS_CONFIG", "api/v1/gpu-jobs.json") +) + # qiskit runtime QISKIT_IBM_CHANNEL = os.environ.get("QISKIT_IBM_CHANNEL", "ibm_quantum") QISKIT_IBM_URL = os.environ.get( diff --git a/gateway/main/urls.py b/gateway/main/urls.py index f066c3e6f..8abcae3da 100644 --- a/gateway/main/urls.py +++ b/gateway/main/urls.py @@ -50,20 +50,18 @@ re_path(r"^api/v1/", include(("api.v1.urls", "api"), namespace="v1")), ] +urlpatterns += [ + re_path( + r"^swagger(?P\.json|\.yaml)$", + schema.without_ui(cache_timeout=0), + name="schema-json", + ), + re_path( + r"^swagger/$", + schema.with_ui("swagger", cache_timeout=0), + name="schema-swagger-ui", + ), + re_path(r"^redoc/$", schema.with_ui("redoc", cache_timeout=0), name="schema-redoc"), +] if settings.DEBUG: - urlpatterns += [ - re_path( - r"^swagger(?P\.json|\.yaml)$", - schema.without_ui(cache_timeout=0), - name="schema-json", - ), - re_path( - r"^swagger/$", - schema.with_ui("swagger", cache_timeout=0), - name="schema-swagger-ui", - ), - re_path( - r"^redoc/$", schema.with_ui("redoc", cache_timeout=0), name="schema-redoc" - ), - ] urlpatterns += static(settings.MEDIA_URL, document_root=settings.MEDIA_ROOT) diff --git a/gateway/requirements.txt b/gateway/requirements.txt index ccf9e62cd..853b65909 100644 --- a/gateway/requirements.txt +++ b/gateway/requirements.txt @@ -1,4 +1,4 @@ -cryptography>=42.0.5, <43 +cryptography>=43.0.1, <44 djangorestframework>=3.15.2, <4 django-allauth[socialaccount]>=0.61.1, <1 django-allow-cidr>=0.7.1, <1 @@ -6,7 +6,7 @@ dj-rest-auth>=5.0.2, <6 django-csp>=3.8, <4 djangorestframework-simplejwt>=5.3.1, <6 django_prometheus>=2.3.1, <3 -ray[default]>=2.30.0, <3 +ray[default]>=2.30.0, <2.35.0 Django>=4.2.11, <5 gunicorn>=22.0.0, <23 requests>=2.32.2, <3 diff --git a/gateway/tests/api/test_v1_program.py b/gateway/tests/api/test_v1_program.py index a5391fc02..4676cd9f2 100644 --- a/gateway/tests/api/test_v1_program.py +++ b/gateway/tests/api/test_v1_program.py @@ -75,7 +75,7 @@ def test_provider_programs_catalog_list(self): ) self.assertEqual( programs_response.data[0].get("title"), - "Docker-Image-Program-2", + "Docker-Image-Program-3", ) def test_provider_programs_serverless_list(self): @@ -485,3 +485,49 @@ def test_get_jobs(self): ) self.assertEqual(len(response.data), 1) self.assertEqual(response.status_code, status.HTTP_200_OK) + + def test_upload_private_function_update_without_description(self): + """Tests upload end-point authorized.""" + + fake_file = ContentFile(b"print('Hello World')") + fake_file.name = "test_run.tar" + + user = models.User.objects.get(username="test_user") + self.client.force_authenticate(user=user) + programs_response = self.client.post( + "/api/v1/programs/upload/", + data={ + "title": "Program", + "entrypoint": "test_user_2_program.py", + "dependencies": "[]", + "artifact": fake_file, + }, + ) + + self.assertEqual(programs_response.status_code, status.HTTP_200_OK) + self.assertEqual( + programs_response.data.get("description"), "Program description test" + ) + + def test_upload_private_function_update_description(self): + """Tests upload end-point authorized.""" + + fake_file = ContentFile(b"print('Hello World')") + fake_file.name = "test_run.tar" + + user = models.User.objects.get(username="test_user") + self.client.force_authenticate(user=user) + description = "New program description test" + programs_response = self.client.post( + "/api/v1/programs/upload/", + data={ + "title": "Program", + "entrypoint": "test_user_2_program.py", + "description": description, + "dependencies": "[]", + "artifact": fake_file, + }, + ) + + self.assertEqual(programs_response.status_code, status.HTTP_200_OK) + self.assertEqual(programs_response.data.get("description"), description) diff --git a/gateway/tests/api/test_v1_serializers.py b/gateway/tests/api/test_v1_serializers.py index 76fafaeaa..23ba62cbd 100644 --- a/gateway/tests/api/test_v1_serializers.py +++ b/gateway/tests/api/test_v1_serializers.py @@ -323,3 +323,48 @@ def test_upload_program_serializer_blocked_dependency(self): serializer = UploadProgramSerializer(data=data) self.assertFalse(serializer.is_valid()) + + def test_upload_program_serializer_updates_program_without_description(self): + path_to_resource_artifact = os.path.join( + os.path.dirname(os.path.abspath(__file__)), + "..", + "resources", + "artifact.tar", + ) + file_data = File(open(path_to_resource_artifact, "rb")) + upload_file = SimpleUploadedFile( + "artifact.tar", file_data.read(), content_type="multipart/form-data" + ) + + user = models.User.objects.get(username="test_user") + + title = "Hello world" + entrypoint = "pattern.py" + arguments = "{}" + dependencies = "[]" + description = "This is my old description" + + data = {} + data["title"] = title + data["entrypoint"] = entrypoint + data["arguments"] = arguments + data["dependencies"] = dependencies + data["description"] = description + data["artifact"] = upload_file + + serializer = UploadProgramSerializer(data=data) + serializer.is_valid() + program: Program = serializer.save(author=user) + self.assertEqual(description, program.description) + + data_without_description = {} + data_without_description["title"] = title + data_without_description["entrypoint"] = entrypoint + data_without_description["arguments"] = arguments + data_without_description["dependencies"] = dependencies + data_without_description["artifact"] = upload_file + + serializer_2 = UploadProgramSerializer(program, data=data_without_description) + serializer_2.is_valid() + program_2: Program = serializer_2.save(author=user) + self.assertEqual(description, program_2.description) diff --git a/gateway/tests/fixtures/fixtures.json b/gateway/tests/fixtures/fixtures.json index ed80f732d..0aea2f7d1 100644 --- a/gateway/tests/fixtures/fixtures.json +++ b/gateway/tests/fixtures/fixtures.json @@ -59,6 +59,7 @@ "fields": { "created": "2023-02-01T15:30:43.281796Z", "title": "Program", + "description": "Program description test", "entrypoint": "program.py", "artifact": "path", "author": 1, @@ -91,6 +92,21 @@ "author": 3, "env_vars": "{\"PROGRAM_ENV1\": \"VALUE1\", \"PROGRAM_ENV2\": \"VALUE2\"}", "provider": "bfe8aa6a-2127-4123-bf57-5b547293cbeb", + "instances": [ + 100 + ] + } + }, + { + "model": "api.program", + "pk": "74d300c1-f3f2-4c79-9b3e-2faab5f76d76", + "fields": { + "created": "2023-02-01T15:30:43.281796Z", + "title": "Docker-Image-Program-3", + "image": "icr.io/awesome-namespace/awesome-title", + "author": 3, + "env_vars": "{\"PROGRAM_ENV1\": \"VALUE1\", \"PROGRAM_ENV2\": \"VALUE2\"}", + "provider": "bfe8aa6a-2127-4123-bf57-5b547293cbeb", "instances": [ 100, 101