diff --git a/client/qiskit_serverless/core/client.py b/client/qiskit_serverless/core/client.py index 147ab9baf..5f04dec32 100644 --- a/client/qiskit_serverless/core/client.py +++ b/client/qiskit_serverless/core/client.py @@ -28,20 +28,19 @@ """ import warnings from abc import ABC, abstractmethod -from typing import Optional, List, Dict, Any, Union +from typing import Optional, List -from qiskit_ibm_runtime import QiskitRuntimeService - -from qiskit_serverless.core.job import ( - Job, - Configuration, +from qiskit_serverless.core.job import Job, JobService +from qiskit_serverless.core.function import ( + QiskitFunction, + RunnableQiskitFunction, + RunService, ) -from qiskit_serverless.core.function import QiskitFunction from qiskit_serverless.utils import JsonSerializable from qiskit_serverless.visualizaiton import Widget -class BaseClient(JsonSerializable, ABC): +class BaseClient(JobService, RunService, JsonSerializable, ABC): """ A client class for specifying custom compute resources. @@ -140,76 +139,27 @@ def get_jobs(self, **kwargs) -> List[Job]: ) return self.jobs(**kwargs) - @abstractmethod - def run( - self, - program: Union[QiskitFunction, str], - arguments: Optional[Dict[str, Any]] = None, - config: Optional[Configuration] = None, - ) -> Job: - """Execute a program as a async job. - - Example: - >>> serverless = QiskitServerless() - >>> program = QiskitFunction( - >>> "job.py", - >>> arguments={"arg1": "val1"}, - >>> dependencies=["requests"] - >>> ) - >>> job = serverless.run(program) - >>> # - - Args: - arguments: arguments to run program with - program: Program object - - Returns: - Job - """ - - @abstractmethod - def status(self, job_id: str) -> str: - """Check status.""" - - @abstractmethod - def stop( - self, job_id: str, service: Optional[QiskitRuntimeService] = None - ) -> Union[str, bool]: - """Stops job/program.""" - - @abstractmethod - def result(self, job_id: str) -> Any: - """Return results.""" - - @abstractmethod - def logs(self, job_id: str) -> str: - """Return logs.""" - - @abstractmethod - def filtered_logs(self, job_id: str, **kwargs) -> str: - """Return filtered logs.""" - ######################### ####### Functions ####### ######################### @abstractmethod - def upload(self, program: QiskitFunction) -> Optional[QiskitFunction]: + def upload(self, program: QiskitFunction) -> Optional[RunnableQiskitFunction]: """Uploads program.""" @abstractmethod - def functions(self, **kwargs) -> List[QiskitFunction]: + def functions(self, **kwargs) -> List[RunnableQiskitFunction]: """Returns list of available programs.""" @abstractmethod def function( self, title: str, provider: Optional[str] = None - ) -> Optional[QiskitFunction]: + ) -> Optional[RunnableQiskitFunction]: """Returns program based on parameters.""" def get( self, title: str, provider: Optional[str] = None - ) -> Optional[QiskitFunction]: + ) -> Optional[RunnableQiskitFunction]: """Returns program based on parameters.""" warnings.warn( "`get` method has been deprecated. " @@ -219,7 +169,7 @@ def get( ) return self.function(title, provider=provider) - def list(self, **kwargs) -> List[QiskitFunction]: + def list(self, **kwargs) -> List[RunnableQiskitFunction]: """Returns list of available programs.""" warnings.warn( "`list` method has been deprecated. " diff --git a/client/qiskit_serverless/core/clients/local_client.py b/client/qiskit_serverless/core/clients/local_client.py index 0dd03faba..c0c28a64e 100644 --- a/client/qiskit_serverless/core/clients/local_client.py +++ b/client/qiskit_serverless/core/clients/local_client.py @@ -48,7 +48,7 @@ Job, Configuration, ) -from qiskit_serverless.core.function import QiskitFunction +from qiskit_serverless.core.function import QiskitFunction, RunnableQiskitFunction from qiskit_serverless.exception import QiskitServerlessException from qiskit_serverless.serializers.program_serializers import ( QiskitObjectsEncoder, @@ -133,7 +133,7 @@ def run( if results: result = results.group(1) - job = Job(job_id=str(uuid4()), client=self) + job = Job(job_id=str(uuid4()), job_service=self) self._jobs[job.job_id] = { "status": status, "logs": output, @@ -163,7 +163,7 @@ def filtered_logs(self, job_id: str, **kwargs): ####### Functions ####### ######################### - def upload(self, program: QiskitFunction) -> Optional[QiskitFunction]: + def upload(self, program: QiskitFunction) -> Optional[RunnableQiskitFunction]: # check if entrypoint exists if not os.path.exists(os.path.join(program.working_dir, program.entrypoint)): raise QiskitServerlessException( @@ -182,14 +182,14 @@ def upload(self, program: QiskitFunction) -> Optional[QiskitFunction]: "client": self, } self._patterns.append(pattern) - return QiskitFunction.from_json(pattern) + return RunnableQiskitFunction.from_json(pattern) - def functions(self, **kwargs) -> List[QiskitFunction]: + def functions(self, **kwargs) -> List[RunnableQiskitFunction]: """Returns list of programs.""" - return [QiskitFunction.from_json(program) for program in self._patterns] + return [RunnableQiskitFunction.from_json(program) for program in self._patterns] def function( self, title: str, provider: Optional[str] = None - ) -> Optional[QiskitFunction]: + ) -> Optional[RunnableQiskitFunction]: functions = {function.title: function for function in self.functions()} return functions.get(title) diff --git a/client/qiskit_serverless/core/clients/ray_client.py b/client/qiskit_serverless/core/clients/ray_client.py index b0f2c9204..cd2eaaa90 100644 --- a/client/qiskit_serverless/core/clients/ray_client.py +++ b/client/qiskit_serverless/core/clients/ray_client.py @@ -42,7 +42,7 @@ Configuration, Job, ) -from qiskit_serverless.core.function import QiskitFunction +from qiskit_serverless.core.function import QiskitFunction, RunnableQiskitFunction from qiskit_serverless.serializers.program_serializers import ( QiskitObjectsEncoder, ) @@ -80,7 +80,7 @@ def jobs(self, **kwargs) -> List[Job]: list of jobs. """ return [ - Job(job.job_id, client=self) + Job(job.job_id, job_service=self) for job in self.job_submission_client.list_jobs() ] @@ -94,7 +94,8 @@ def job(self, job_id: str) -> Optional[Job]: Job instance """ return Job( - self.job_submission_client.get_job_info(job_id).submission_id, client=self + self.job_submission_client.get_job_info(job_id).submission_id, + job_service=self, ) def run( @@ -129,7 +130,7 @@ def run( "env_vars": env_vars, }, ) - return Job(job_id=job_id, client=self) + return Job(job_id=job_id, job_service=self) def status(self, job_id: str) -> str: """Check status.""" @@ -157,16 +158,16 @@ def filtered_logs(self, job_id: str, **kwargs) -> str: ####### Functions ####### ######################### - def upload(self, program: QiskitFunction) -> Optional[QiskitFunction]: + def upload(self, program: QiskitFunction) -> Optional[RunnableQiskitFunction]: """Uploads program.""" raise NotImplementedError("Upload is not available for RayClient.") - def functions(self, **kwargs) -> List[QiskitFunction]: + def functions(self, **kwargs) -> List[RunnableQiskitFunction]: """Returns list of available programs.""" raise NotImplementedError("get_programs is not available for RayClient.") def function( self, title: str, provider: Optional[str] = None - ) -> Optional[QiskitFunction]: + ) -> Optional[RunnableQiskitFunction]: """Returns program based on parameters.""" raise NotImplementedError("get_program is not available for RayClient.") diff --git a/client/qiskit_serverless/core/clients/serverless_client.py b/client/qiskit_serverless/core/clients/serverless_client.py index 1889f61e4..3478b9843 100644 --- a/client/qiskit_serverless/core/clients/serverless_client.py +++ b/client/qiskit_serverless/core/clients/serverless_client.py @@ -54,7 +54,12 @@ Job, Configuration, ) -from qiskit_serverless.core.function import QiskitFunction +from qiskit_serverless.core.function import ( + QiskitFunction, + RunService, + RunnableQiskitFunction, +) + from qiskit_serverless.exception import QiskitServerlessException from qiskit_serverless.utils.json import ( safe_json_request_as_dict, @@ -159,7 +164,7 @@ def jobs(self, **kwargs) -> List[Job]: ) return [ - Job(job.get("id"), client=self, raw_data=job) + Job(job.get("id"), job_service=self, raw_data=job) for job in response_data.get("results", []) ] @@ -180,7 +185,7 @@ def job(self, job_id: str) -> Optional[Job]: if job_id is not None: job = Job( job_id=job_id, - client=self, + job_service=self, ) return job @@ -227,7 +232,7 @@ def run( job_id = response_data.get("id") span.set_attribute("job.id", job_id) - return Job(job_id, client=self) + return Job(job_id, job_service=self) def status(self, job_id: str): tracer = trace.get_tracer("client.tracer") @@ -316,7 +321,7 @@ def filtered_logs(self, job_id: str, **kwargs): ####### Functions ####### ######################### - def upload(self, program: QiskitFunction) -> Optional[QiskitFunction]: + def upload(self, program: QiskitFunction) -> Optional[RunnableQiskitFunction]: tracer = trace.get_tracer("client.tracer") with tracer.start_as_current_span("job.run") as span: span.set_attribute("program", program.title) @@ -325,12 +330,12 @@ def upload(self, program: QiskitFunction) -> Optional[QiskitFunction]: if program.image is not None: # upload function with custom image function_uploaded = _upload_with_docker_image( - program=program, url=url, token=self.token, span=span + program=program, url=url, token=self.token, span=span, client=self ) elif program.entrypoint is not None: # upload funciton with artifact function_uploaded = _upload_with_artifact( - program=program, url=url, token=self.token, span=span + program=program, url=url, token=self.token, span=span, client=self ) else: raise QiskitServerlessException( @@ -339,7 +344,7 @@ def upload(self, program: QiskitFunction) -> Optional[QiskitFunction]: return function_uploaded - def functions(self, **kwargs) -> List[QiskitFunction]: + def functions(self, **kwargs) -> List[RunnableQiskitFunction]: """Returns list of available programs.""" tracer = trace.get_tracer("client.tracer") with tracer.start_as_current_span("program.list"): @@ -353,11 +358,11 @@ def functions(self, **kwargs) -> List[QiskitFunction]: ) return [ - QiskitFunction( - program.get("title"), + RunnableQiskitFunction( + client=self, + title=program.get("title"), provider=program.get("provider", None), raw_data=program, - client=self, description=program.get("description"), ) for program in response_data @@ -365,7 +370,7 @@ def functions(self, **kwargs) -> List[QiskitFunction]: def function( self, title: str, provider: Optional[str] = None - ) -> Optional[QiskitFunction]: + ) -> Optional[RunnableQiskitFunction]: """Returns program based on parameters.""" provider, title = format_provider_name_and_title( request_provider=provider, title=title @@ -381,11 +386,11 @@ def function( timeout=REQUESTS_TIMEOUT, ) ) - return QiskitFunction( - response_data.get("title"), + return RunnableQiskitFunction( + client=self, + title=response_data.get("title"), provider=response_data.get("provider", None), raw_data=response_data, - client=self, ) ##################### @@ -484,8 +489,8 @@ def save_account( def _upload_with_docker_image( - program: QiskitFunction, url: str, token: str, span: Any -) -> QiskitFunction: + program: QiskitFunction, url: str, token: str, span: Any, client: RunService +) -> RunnableQiskitFunction: """Uploads function with custom docker image. Args: @@ -517,12 +522,13 @@ def _upload_with_docker_image( program_provider = response_data.get("provider", "na") span.set_attribute("program.title", program_title) span.set_attribute("program.provider", program_provider) - return QiskitFunction.from_json(response_data) + response_data["client"] = client + return RunnableQiskitFunction.from_json(response_data) def _upload_with_artifact( - program: QiskitFunction, url: str, token: str, span: Any -) -> QiskitFunction: + program: QiskitFunction, url: str, token: str, span: Any, client: RunService +) -> RunnableQiskitFunction: """Uploads function with artifact. Args: @@ -582,11 +588,10 @@ def _upload_with_artifact( timeout=REQUESTS_TIMEOUT, ) ) - program_title = response_data.get("title", "na") - program_provider = response_data.get("provider", "na") - span.set_attribute("program.title", program_title) - span.set_attribute("program.provider", program_provider) - response_function = QiskitFunction.from_json(response_data) + span.set_attribute("program.title", response_data.get("title", "na")) + span.set_attribute("program.provider", response_data.get("provider", "na")) + response_data["client"] = client + response_function = RunnableQiskitFunction.from_json(response_data) except Exception as error: # pylint: disable=broad-exception-caught raise QiskitServerlessException from error finally: diff --git a/client/qiskit_serverless/core/function.py b/client/qiskit_serverless/core/function.py index 10ea2f0b3..7adaac72f 100644 --- a/client/qiskit_serverless/core/function.py +++ b/client/qiskit_serverless/core/function.py @@ -26,10 +26,16 @@ QiskitFunction """ +from abc import ABC, abstractmethod import dataclasses import warnings from dataclasses import dataclass -from typing import Optional, Dict, List, Any, Tuple +from typing import Optional, Dict, List, Any, Tuple, Union + +from qiskit_serverless.core.job import ( + Job, + Configuration, +) @dataclass @@ -58,7 +64,6 @@ class QiskitFunction: # pylint: disable=too-many-instance-attributes version: Optional[str] = None tags: Optional[List[str]] = None raw_data: Optional[Dict[str, Any]] = None - client: Optional[Any] = None image: Optional[str] = None validate: bool = True schema: Optional[str] = None @@ -91,6 +96,70 @@ def __str__(self): def __repr__(self): return self.__str__() + def _validate_function(self) -> Tuple[bool, List[str]]: + """Validate function arguments using schema provided. + + Returns: + Tuple[bool, List[str]]: + boolean specifiying if function arguments are valid + list of validation errors, if any + """ + return True, [] + + +class RunService(ABC): + """Provide access to run a function and retrieve the jobs associated to that function""" + + @abstractmethod + def jobs(self, **kwargs) -> List[Job]: + """Return list of jobs. + + Returns: + list of jobs. + """ + + @abstractmethod + def run( + self, + program: Union[QiskitFunction, str], + arguments: Optional[Dict[str, Any]] = None, + config: Optional[Configuration] = None, + ) -> Job: + """Run a function and return its job.""" + + +class RunnableQiskitFunction(QiskitFunction): + """Serverless QiskitPattern. + + Args: + title: program name + provider: Qiskit Function provider reference + entrypoint: is a script that will be executed as a job + ex: job.py + env_vars: env vars + dependencies: list of python dependencies to execute a program + working_dir: directory where entrypoint file is located (max size 50MB) + description: description of a program + version: version of a program + """ + + _run_service: RunService = None + + def __init__( # pylint: disable=too-many-positional-arguments + self, client: RunService, **kwargs + ): + self._run_service = client + super().__init__(**kwargs) + + @classmethod + def from_json(cls, data: Dict[str, Any]): + """Reconstructs QiskitPattern from dictionary.""" + field_names = set(f.name for f in dataclasses.fields(QiskitFunction)) + client = data["client"] + return RunnableQiskitFunction( + client, **{k: v for k, v in data.items() if k in field_names} + ) + def run(self, **kwargs): """Run function @@ -100,7 +169,7 @@ def run(self, **kwargs): Returns: Job: job handler for function execution """ - if self.client is None: + if self._run_service is None: raise ValueError("No clients specified for a function.") if self.validate: @@ -112,7 +181,7 @@ def run(self, **kwargs): ) config = kwargs.pop("config", None) - return self.client.run( + return self._run_service.run( program=self, arguments=kwargs, config=config, @@ -145,11 +214,8 @@ def jobs(self): Returns: [Job] : list of jobs """ - from qiskit_serverless.core.job import ( # pylint: disable=import-outside-toplevel - Job, - ) - if self.client is None: + if self._run_service is None: raise ValueError("No clients specified for a function.") if self.validate: @@ -160,28 +226,15 @@ def jobs(self): f"Function validation failed. Validation errors:\n {error_string}", ) - response = self.client.get_jobs( + jobs = self._run_service.jobs( title=self.title, provider=self.provider, ) - jobs = [ - Job(job_id=job.get("id"), client=self.client, raw_data=job) - for job in response - ] return jobs - def _validate_function(self) -> Tuple[bool, List[str]]: - """Validate function arguments using schema provided. - - Returns: - Tuple[bool, List[str]]: - boolean specifiying if function arguments are valid - list of validation errors, if any - """ - return True, [] - # pylint: disable=abstract-method +# pylint: disable=too-few-public-methods class QiskitPattern(QiskitFunction): """ [Deprecated since version 0.10.0] Use :class:`.QiskitFunction` instead. diff --git a/client/qiskit_serverless/core/job.py b/client/qiskit_serverless/core/job.py index 03e6f6d42..7b113e364 100644 --- a/client/qiskit_serverless/core/job.py +++ b/client/qiskit_serverless/core/job.py @@ -28,12 +28,13 @@ Job """ # pylint: disable=duplicate-code +from abc import ABC, abstractmethod import json import logging import os import time import warnings -from typing import Dict, Any, Optional +from typing import Dict, Any, Optional, Union from dataclasses import dataclass import ray.runtime_env @@ -76,13 +77,44 @@ class Configuration: # pylint: disable=too-many-instance-attributes auto_scaling: Optional[bool] = False +class JobService(ABC): + """Provide access to job information""" + + @abstractmethod + def status(self, job_id: str) -> str: + """Check status.""" + + @abstractmethod + def stop( + self, job_id: str, service: Optional[QiskitRuntimeService] = None + ) -> Union[str, bool]: + """Stops job/program.""" + + @abstractmethod + def result(self, job_id: str) -> Any: + """Return results.""" + + @abstractmethod + def logs(self, job_id: str) -> str: + """Return logs.""" + + @abstractmethod + def filtered_logs(self, job_id: str, **kwargs) -> str: + """Returns logs of the job. + Args: + job_id: The job's logs + include: rex expression finds match in the log line to be included + exclude: rex expression finds match in the log line to be excluded + """ + + class Job: """Job.""" def __init__( self, job_id: str, - client: Any, + job_service: JobService, raw_data: Optional[Dict[str, Any]] = None, ): """Job class for async script execution. @@ -92,12 +124,12 @@ def __init__( client: client """ self.job_id = job_id - self._client = client + self._job_service = job_service self.raw_data = raw_data or {} def status(self): """Returns status of the job.""" - return _map_status_to_serverless(self._client.status(self.job_id)) + return _map_status_to_serverless(self._job_service.status(self.job_id)) def stop(self, service: Optional[QiskitRuntimeService] = None): """Stops the job from running.""" @@ -111,11 +143,11 @@ def stop(self, service: Optional[QiskitRuntimeService] = None): def cancel(self, service: Optional[QiskitRuntimeService] = None): """Cancels the job.""" - return self._client.stop(self.job_id, service=service) + return self._job_service.stop(self.job_id, service=service) def logs(self) -> str: """Returns logs of the job.""" - return self._client.logs(self.job_id) + return self._job_service.logs(self.job_id) def filtered_logs(self, **kwargs) -> str: """Returns logs of the job. @@ -123,7 +155,7 @@ def filtered_logs(self, **kwargs) -> str: include: rex expression finds match in the log line to be included exclude: rex expression finds match in the log line to be excluded """ - return self._client.filtered_logs(job_id=self.job_id, **kwargs) + return self._job_service.filtered_logs(job_id=self.job_id, **kwargs) def result(self, wait=True, cadence=5, verbose=False, maxwait=0): """Return results of the job. @@ -146,7 +178,7 @@ def result(self, wait=True, cadence=5, verbose=False, maxwait=0): logging.info(count) # Retrieve the results. If they're string format, try to decode to a dictionary. - results = self._client.result(self.job_id) + results = self._job_service.result(self.job_id) if isinstance(results, str): try: results = json.loads(results, cls=QiskitObjectsDecoder)