Skip to content

Commit

Permalink
Rename tasks to executions
Browse files Browse the repository at this point in the history
  • Loading branch information
pelletier committed Feb 2, 2024
1 parent 8f8a8ee commit 682eb80
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 39 deletions.
41 changes: 21 additions & 20 deletions src/dispatch/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,25 +12,26 @@

import grpc
import google.protobuf
import google.protobuf.any_pb2

import dispatch.sdk.v1.endpoint_pb2 as endpoint_pb
import dispatch.sdk.v1.endpoint_pb2_grpc as endpoint_grpc
import dispatch.coroutine


__all__ = ["Client", "TaskID", "TaskInput", "TaskDef"]
__all__ = ["Client", "ExecutionID", "ExecutionInput", "ExecutionDef"]


TaskID: TypeAlias = str
"""Unique task identifier in Dispatch.
ExecutionID: TypeAlias = str
"""Unique execution identifier in Dispatch.
It should be treated as an opaque value.
"""


@dataclass(frozen=True)
class TaskInput:
"""Definition of a task to be created on Dispatch.
class ExecutionInput:
"""Definition of an execution to be created on Dispatch.
Attributes:
coroutine_uri: The URI of the coroutine to execute.
Expand All @@ -44,18 +45,18 @@ class TaskInput:
input: Any


TaskDef: TypeAlias = TaskInput | dispatch.coroutine.Call
"""Definition of a task to be created on Dispatch.
ExecutionDef: TypeAlias = ExecutionInput | dispatch.coroutine.Call
"""Definition of an execution to be ran on Dispatch.
Can be either a TaskInput or a Call. TaskInput can be created manually, likely
to call a coroutine outside the current code base. Call is created by the
`dispatch.coroutine` module and is used to call a coroutine defined in the
current code base.
Can be either an ExecutionInput or a Call. ExecutionInput can be created
manually, likely to call a coroutine outside the current code base. Call is
created by the `dispatch.coroutine` module and is used to call a coroutine
defined in the current code base.
"""


def _taskdef_to_proto(taskdef: TaskDef) -> endpoint_pb.Execution:
input = taskdef.input
def _executiondef_to_proto(execdef: ExecutionDef) -> endpoint_pb.Execution:
input = execdef.input
match input:
case google.protobuf.any_pb2.Any():
input_any = input
Expand All @@ -66,7 +67,7 @@ def _taskdef_to_proto(taskdef: TaskDef) -> endpoint_pb.Execution:
pickled = pickle.dumps(input)
input_any = google.protobuf.any_pb2.Any()
input_any.Pack(google.protobuf.wrappers_pb2.BytesValue(value=pickled))
return endpoint_pb.Execution(coroutine_uri=taskdef.coroutine_uri, input=input_any)
return endpoint_pb.Execution(coroutine_uri=execdef.coroutine_uri, input=input_any)


class Client:
Expand Down Expand Up @@ -106,14 +107,14 @@ def __init__(

self._stub = endpoint_grpc.EndpointServiceStub(channel)

def create_tasks(self, tasks: Iterable[TaskDef]) -> Iterable[TaskID]:
"""Create tasks on Dispatch using the provided inputs.
def execute(self, executions: Iterable[ExecutionDef]) -> Iterable[ExecutionID]:
"""Execute on Dispatch using the provided inputs.
Returns:
The ID of the created tasks, in the same order as the inputs.
The ID of the created executions, in the same order as the inputs.
"""
req = endpoint_pb.CreateExecutionsRequest()
for task in tasks:
req.executions.append(_taskdef_to_proto(task))
for e in executions:
req.executions.append(_executiondef_to_proto(e))
resp = self._stub.CreateExecutions(req)
return [TaskID(x) for x in resp.ids]
return [ExecutionID(x) for x in resp.ids]
4 changes: 2 additions & 2 deletions tests/task_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import dispatch.sdk.v1.executor_pb2 as executor_pb
import dispatch.sdk.v1.executor_pb2_grpc as executor_grpc

from dispatch import Client, TaskInput, TaskID
from dispatch import Client, ExecutionInput, ExecutionID


_test_auth_token = "THIS_IS_A_TEST_AUTH_TOKEN"
Expand All @@ -26,7 +26,7 @@ def __init__(self):

self.pending_tasks = []

def _make_task_id(self) -> TaskID:
def _make_task_id(self) -> ExecutionID:
parts = [
self.current_partition,
self.current_block_id,
Expand Down
20 changes: 10 additions & 10 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import grpc
from google.protobuf import wrappers_pb2, any_pb2

from dispatch import Client, TaskInput, TaskID
from dispatch import Client, ExecutionInput, ExecutionID
from dispatch.coroutine import _any_unpickle as any_unpickle
from .task_service import ServerTest

Expand All @@ -25,8 +25,8 @@ def test_api_key_from_env(self):
client = Client(api_url=f"http://127.0.0.1:{self.server.port}")

with self.assertRaises(grpc._channel._InactiveRpcError) as mc:
client.create_tasks(
[TaskInput(coroutine_uri="my-cool-coroutine", input=42)]
client.execute(
[ExecutionInput(coroutine_uri="my-cool-coroutine", input=42)]
)
self.assertTrue("got 'Bearer WHATEVER'" in str(mc.exception))

Expand All @@ -45,9 +45,9 @@ def test_can_be_constructed_on_https(self):
# around to actually test this.
Client(api_url="https://example.com", api_key="foo")

def test_create_one_task_pickle(self):
results = self.client.create_tasks(
[TaskInput(coroutine_uri="my-cool-coroutine", input=42)]
def test_create_one_execution_pickle(self):
results = self.client.execute(
[ExecutionInput(coroutine_uri="my-cool-coroutine", input=42)]
)
self.assertEqual(len(results), 1)
id = results[0]
Expand All @@ -62,8 +62,8 @@ def test_create_one_task_pickle(self):

def test_create_one_task_proto(self):
proto = wrappers_pb2.Int32Value(value=42)
results = self.client.create_tasks(
[TaskInput(coroutine_uri="my-cool-coroutine", input=proto)]
results = self.client.execute(
[ExecutionInput(coroutine_uri="my-cool-coroutine", input=proto)]
)
id = results[0]
created_tasks = self.servicer.created_tasks
Expand All @@ -78,8 +78,8 @@ def test_create_one_task_proto_any(self):
proto = wrappers_pb2.Int32Value(value=42)
proto_any = any_pb2.Any()
proto_any.Pack(proto)
results = self.client.create_tasks(
[TaskInput(coroutine_uri="my-cool-coroutine", input=proto_any)]
results = self.client.execute(
[ExecutionInput(coroutine_uri="my-cool-coroutine", input=proto_any)]
)
id = results[0]
created_tasks = self.servicer.created_tasks
Expand Down
14 changes: 7 additions & 7 deletions tests/test_full.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import fastapi
from fastapi.testclient import TestClient

from dispatch import Client, TaskInput, TaskID
from dispatch import Client, ExecutionInput, ExecutionID
from dispatch.coroutine import Input, Output, Error, Status
from dispatch.coroutine import _any_unpickle as any_unpickle
import dispatch.fastapi
Expand All @@ -27,7 +27,7 @@ def setUp(self):
def tearDown(self):
self.server.stop()

def execute_tasks(self):
def execute(self):
self.server.execute(self.app_client)

def test_simple_end_to_end(self):
Expand All @@ -37,12 +37,12 @@ def my_cool_coroutine(input: Input) -> Output:
return Output.value(f"Hello world: {input.input}")

# The client.
[task_id] = self.client.create_tasks(
[TaskInput(coroutine_uri=my_cool_coroutine.uri, input=52)]
[task_id] = self.client.execute(
[ExecutionInput(coroutine_uri=my_cool_coroutine.uri, input=52)]
)

# Simulate execution for testing purposes.
self.execute_tasks()
self.execute()

# Validate results.
resp = self.servicer.responses[task_id]
Expand All @@ -53,7 +53,7 @@ def test_simple_call_with(self):
def my_cool_coroutine(input: Input) -> Output:
return Output.value(f"Hello world: {input.input}")

[task_id] = self.client.create_tasks([my_cool_coroutine.call_with(52)])
self.execute_tasks()
[task_id] = self.client.execute([my_cool_coroutine.call_with(52)])
self.execute()
resp = self.servicer.responses[task_id]
self.assertEqual(any_unpickle(resp.exit.result.output), "Hello world: 52")

0 comments on commit 682eb80

Please sign in to comment.