Skip to content

Commit

Permalink
Function and jobs refactor - resolve circular dependencies (#1517)
Browse files Browse the repository at this point in the history
* refactor client and fix tests

* remove client/arguments.serverless

* remove unused TIMEOUTs

* functions deprecation warnings

* added get_job_by_id with deprecation warning

* added __init__.py

* deprecated list function correction

* replace deleted _token with token

* **kwargs

* add duplicate code exception

* fixed run function

* fixed code quality

* fix linter

* delete file management from client. Use it only in serverless

* fix docs

* skip file download and manage data directory tests

* change docker build

* change docker build

* fix from_dict

* refactor function and job and resolve circular dependency

* move _upload_with_artifact changes

* rename get_XXXX methods as XXXX

* rename get_XXXX methods as XXXX on local client

* fix upload return on serverless

* fix upload return on local client

* fix lint

* revert tox file

* fix functions return

* JobClient -> JobService

* jobService variable camelCase to snake_case
  • Loading branch information
korgan00 authored Oct 28, 2024
1 parent fbd66dc commit c3b8446
Show file tree
Hide file tree
Showing 6 changed files with 173 additions and 132 deletions.
74 changes: 12 additions & 62 deletions client/qiskit_serverless/core/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,19 @@
"""
import warnings
from abc import ABC, abstractmethod
from typing import Optional, List, Dict, Any, Union
from typing import Optional, List

from qiskit_ibm_runtime import QiskitRuntimeService

from qiskit_serverless.core.job import (
Job,
Configuration,
from qiskit_serverless.core.job import Job, JobService
from qiskit_serverless.core.function import (
QiskitFunction,
RunnableQiskitFunction,
RunService,
)
from qiskit_serverless.core.function import QiskitFunction
from qiskit_serverless.utils import JsonSerializable
from qiskit_serverless.visualizaiton import Widget


class BaseClient(JsonSerializable, ABC):
class BaseClient(JobService, RunService, JsonSerializable, ABC):
"""
A client class for specifying custom compute resources.
Expand Down Expand Up @@ -140,76 +139,27 @@ def get_jobs(self, **kwargs) -> List[Job]:
)
return self.jobs(**kwargs)

@abstractmethod
def run(
self,
program: Union[QiskitFunction, str],
arguments: Optional[Dict[str, Any]] = None,
config: Optional[Configuration] = None,
) -> Job:
"""Execute a program as a async job.
Example:
>>> serverless = QiskitServerless()
>>> program = QiskitFunction(
>>> "job.py",
>>> arguments={"arg1": "val1"},
>>> dependencies=["requests"]
>>> )
>>> job = serverless.run(program)
>>> # <Job | ...>
Args:
arguments: arguments to run program with
program: Program object
Returns:
Job
"""

@abstractmethod
def status(self, job_id: str) -> str:
"""Check status."""

@abstractmethod
def stop(
self, job_id: str, service: Optional[QiskitRuntimeService] = None
) -> Union[str, bool]:
"""Stops job/program."""

@abstractmethod
def result(self, job_id: str) -> Any:
"""Return results."""

@abstractmethod
def logs(self, job_id: str) -> str:
"""Return logs."""

@abstractmethod
def filtered_logs(self, job_id: str, **kwargs) -> str:
"""Return filtered logs."""

#########################
####### Functions #######
#########################

@abstractmethod
def upload(self, program: QiskitFunction) -> Optional[QiskitFunction]:
def upload(self, program: QiskitFunction) -> Optional[RunnableQiskitFunction]:
"""Uploads program."""

@abstractmethod
def functions(self, **kwargs) -> List[QiskitFunction]:
def functions(self, **kwargs) -> List[RunnableQiskitFunction]:
"""Returns list of available programs."""

@abstractmethod
def function(
self, title: str, provider: Optional[str] = None
) -> Optional[QiskitFunction]:
) -> Optional[RunnableQiskitFunction]:
"""Returns program based on parameters."""

def get(
self, title: str, provider: Optional[str] = None
) -> Optional[QiskitFunction]:
) -> Optional[RunnableQiskitFunction]:
"""Returns program based on parameters."""
warnings.warn(
"`get` method has been deprecated. "
Expand All @@ -219,7 +169,7 @@ def get(
)
return self.function(title, provider=provider)

def list(self, **kwargs) -> List[QiskitFunction]:
def list(self, **kwargs) -> List[RunnableQiskitFunction]:
"""Returns list of available programs."""
warnings.warn(
"`list` method has been deprecated. "
Expand Down
14 changes: 7 additions & 7 deletions client/qiskit_serverless/core/clients/local_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
Job,
Configuration,
)
from qiskit_serverless.core.function import QiskitFunction
from qiskit_serverless.core.function import QiskitFunction, RunnableQiskitFunction
from qiskit_serverless.exception import QiskitServerlessException
from qiskit_serverless.serializers.program_serializers import (
QiskitObjectsEncoder,
Expand Down Expand Up @@ -133,7 +133,7 @@ def run(
if results:
result = results.group(1)

job = Job(job_id=str(uuid4()), client=self)
job = Job(job_id=str(uuid4()), job_service=self)
self._jobs[job.job_id] = {
"status": status,
"logs": output,
Expand Down Expand Up @@ -163,7 +163,7 @@ def filtered_logs(self, job_id: str, **kwargs):
####### Functions #######
#########################

def upload(self, program: QiskitFunction) -> Optional[QiskitFunction]:
def upload(self, program: QiskitFunction) -> Optional[RunnableQiskitFunction]:
# check if entrypoint exists
if not os.path.exists(os.path.join(program.working_dir, program.entrypoint)):
raise QiskitServerlessException(
Expand All @@ -182,14 +182,14 @@ def upload(self, program: QiskitFunction) -> Optional[QiskitFunction]:
"client": self,
}
self._patterns.append(pattern)
return QiskitFunction.from_json(pattern)
return RunnableQiskitFunction.from_json(pattern)

def functions(self, **kwargs) -> List[QiskitFunction]:
def functions(self, **kwargs) -> List[RunnableQiskitFunction]:
"""Returns list of programs."""
return [QiskitFunction.from_json(program) for program in self._patterns]
return [RunnableQiskitFunction.from_json(program) for program in self._patterns]

def function(
self, title: str, provider: Optional[str] = None
) -> Optional[QiskitFunction]:
) -> Optional[RunnableQiskitFunction]:
functions = {function.title: function for function in self.functions()}
return functions.get(title)
15 changes: 8 additions & 7 deletions client/qiskit_serverless/core/clients/ray_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
Configuration,
Job,
)
from qiskit_serverless.core.function import QiskitFunction
from qiskit_serverless.core.function import QiskitFunction, RunnableQiskitFunction
from qiskit_serverless.serializers.program_serializers import (
QiskitObjectsEncoder,
)
Expand Down Expand Up @@ -80,7 +80,7 @@ def jobs(self, **kwargs) -> List[Job]:
list of jobs.
"""
return [
Job(job.job_id, client=self)
Job(job.job_id, job_service=self)
for job in self.job_submission_client.list_jobs()
]

Expand All @@ -94,7 +94,8 @@ def job(self, job_id: str) -> Optional[Job]:
Job instance
"""
return Job(
self.job_submission_client.get_job_info(job_id).submission_id, client=self
self.job_submission_client.get_job_info(job_id).submission_id,
job_service=self,
)

def run(
Expand Down Expand Up @@ -129,7 +130,7 @@ def run(
"env_vars": env_vars,
},
)
return Job(job_id=job_id, client=self)
return Job(job_id=job_id, job_service=self)

def status(self, job_id: str) -> str:
"""Check status."""
Expand Down Expand Up @@ -157,16 +158,16 @@ def filtered_logs(self, job_id: str, **kwargs) -> str:
####### Functions #######
#########################

def upload(self, program: QiskitFunction) -> Optional[QiskitFunction]:
def upload(self, program: QiskitFunction) -> Optional[RunnableQiskitFunction]:
"""Uploads program."""
raise NotImplementedError("Upload is not available for RayClient.")

def functions(self, **kwargs) -> List[QiskitFunction]:
def functions(self, **kwargs) -> List[RunnableQiskitFunction]:
"""Returns list of available programs."""
raise NotImplementedError("get_programs is not available for RayClient.")

def function(
self, title: str, provider: Optional[str] = None
) -> Optional[QiskitFunction]:
) -> Optional[RunnableQiskitFunction]:
"""Returns program based on parameters."""
raise NotImplementedError("get_program is not available for RayClient.")
55 changes: 30 additions & 25 deletions client/qiskit_serverless/core/clients/serverless_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,12 @@
Job,
Configuration,
)
from qiskit_serverless.core.function import QiskitFunction
from qiskit_serverless.core.function import (
QiskitFunction,
RunService,
RunnableQiskitFunction,
)

from qiskit_serverless.exception import QiskitServerlessException
from qiskit_serverless.utils.json import (
safe_json_request_as_dict,
Expand Down Expand Up @@ -159,7 +164,7 @@ def jobs(self, **kwargs) -> List[Job]:
)

return [
Job(job.get("id"), client=self, raw_data=job)
Job(job.get("id"), job_service=self, raw_data=job)
for job in response_data.get("results", [])
]

Expand All @@ -180,7 +185,7 @@ def job(self, job_id: str) -> Optional[Job]:
if job_id is not None:
job = Job(
job_id=job_id,
client=self,
job_service=self,
)

return job
Expand Down Expand Up @@ -227,7 +232,7 @@ def run(
job_id = response_data.get("id")
span.set_attribute("job.id", job_id)

return Job(job_id, client=self)
return Job(job_id, job_service=self)

def status(self, job_id: str):
tracer = trace.get_tracer("client.tracer")
Expand Down Expand Up @@ -316,7 +321,7 @@ def filtered_logs(self, job_id: str, **kwargs):
####### Functions #######
#########################

def upload(self, program: QiskitFunction) -> Optional[QiskitFunction]:
def upload(self, program: QiskitFunction) -> Optional[RunnableQiskitFunction]:
tracer = trace.get_tracer("client.tracer")
with tracer.start_as_current_span("job.run") as span:
span.set_attribute("program", program.title)
Expand All @@ -325,12 +330,12 @@ def upload(self, program: QiskitFunction) -> Optional[QiskitFunction]:
if program.image is not None:
# upload function with custom image
function_uploaded = _upload_with_docker_image(
program=program, url=url, token=self.token, span=span
program=program, url=url, token=self.token, span=span, client=self
)
elif program.entrypoint is not None:
# upload funciton with artifact
function_uploaded = _upload_with_artifact(
program=program, url=url, token=self.token, span=span
program=program, url=url, token=self.token, span=span, client=self
)
else:
raise QiskitServerlessException(
Expand All @@ -339,7 +344,7 @@ def upload(self, program: QiskitFunction) -> Optional[QiskitFunction]:

return function_uploaded

def functions(self, **kwargs) -> List[QiskitFunction]:
def functions(self, **kwargs) -> List[RunnableQiskitFunction]:
"""Returns list of available programs."""
tracer = trace.get_tracer("client.tracer")
with tracer.start_as_current_span("program.list"):
Expand All @@ -353,19 +358,19 @@ def functions(self, **kwargs) -> List[QiskitFunction]:
)

return [
QiskitFunction(
program.get("title"),
RunnableQiskitFunction(
client=self,
title=program.get("title"),
provider=program.get("provider", None),
raw_data=program,
client=self,
description=program.get("description"),
)
for program in response_data
]

def function(
self, title: str, provider: Optional[str] = None
) -> Optional[QiskitFunction]:
) -> Optional[RunnableQiskitFunction]:
"""Returns program based on parameters."""
provider, title = format_provider_name_and_title(
request_provider=provider, title=title
Expand All @@ -381,11 +386,11 @@ def function(
timeout=REQUESTS_TIMEOUT,
)
)
return QiskitFunction(
response_data.get("title"),
return RunnableQiskitFunction(
client=self,
title=response_data.get("title"),
provider=response_data.get("provider", None),
raw_data=response_data,
client=self,
)

#####################
Expand Down Expand Up @@ -484,8 +489,8 @@ def save_account(


def _upload_with_docker_image(
program: QiskitFunction, url: str, token: str, span: Any
) -> QiskitFunction:
program: QiskitFunction, url: str, token: str, span: Any, client: RunService
) -> RunnableQiskitFunction:
"""Uploads function with custom docker image.
Args:
Expand Down Expand Up @@ -517,12 +522,13 @@ def _upload_with_docker_image(
program_provider = response_data.get("provider", "na")
span.set_attribute("program.title", program_title)
span.set_attribute("program.provider", program_provider)
return QiskitFunction.from_json(response_data)
response_data["client"] = client
return RunnableQiskitFunction.from_json(response_data)


def _upload_with_artifact(
program: QiskitFunction, url: str, token: str, span: Any
) -> QiskitFunction:
program: QiskitFunction, url: str, token: str, span: Any, client: RunService
) -> RunnableQiskitFunction:
"""Uploads function with artifact.
Args:
Expand Down Expand Up @@ -582,11 +588,10 @@ def _upload_with_artifact(
timeout=REQUESTS_TIMEOUT,
)
)
program_title = response_data.get("title", "na")
program_provider = response_data.get("provider", "na")
span.set_attribute("program.title", program_title)
span.set_attribute("program.provider", program_provider)
response_function = QiskitFunction.from_json(response_data)
span.set_attribute("program.title", response_data.get("title", "na"))
span.set_attribute("program.provider", response_data.get("provider", "na"))
response_data["client"] = client
response_function = RunnableQiskitFunction.from_json(response_data)
except Exception as error: # pylint: disable=broad-exception-caught
raise QiskitServerlessException from error
finally:
Expand Down
Loading

0 comments on commit c3b8446

Please sign in to comment.