Skip to content

Commit

Permalink
Merge pull request #17 from stealthrocket/update-protos
Browse files Browse the repository at this point in the history
Update protos and names everywhere
  • Loading branch information
pelletier authored Feb 1, 2024
2 parents c46e254 + 5bd2d47 commit 2c11d89
Show file tree
Hide file tree
Showing 71 changed files with 645 additions and 2,597 deletions.
12 changes: 5 additions & 7 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,15 @@ test: typecheck unittest
.proto:
mkdir -p $@

.proto/ring: .proto
buf export buf.build/stealthrocket/ring --output=.proto/ring

.proto/dispatch: .proto
buf export buf.build/stealthrocket/dispatch --output=.proto/dispatch
.proto/dispatch-sdk: .proto
buf export buf.build/stealthrocket/dispatch-sdk --output=.proto/dispatch-sdk

update-proto:
$(MAKE) clean
find . -type f -name '*_pb2*.py*' -exec rm {} \;
$(MAKE) generate

generate: .proto/ring .proto/dispatch
generate: .proto/dispatch-sdk
buf generate --template buf.gen.yaml
cd src && find . -type d | while IFS= read -r dir; do touch $$dir/__init__.py; done
rm src/__init__.py
Expand All @@ -49,5 +47,5 @@ clean:
rm -rf .proto
rm -rf .coverage
rm -rf .coverage-html
find . -type f -name '*.pyc' -delete
find . -type f -name '*.pyc' -exec rm {} \;
find . -type d -name '__pycache__' -exec rm -r {} \;
3 changes: 1 addition & 2 deletions buf.work.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
version: v1
directories:
- .proto/ring
- .proto/dispatch
- .proto/dispatch-sdk
62 changes: 13 additions & 49 deletions src/dispatch/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,55 +13,19 @@
import grpc
import google.protobuf

import ring.record.v1.record_pb2 as record_pb
import ring.task.v1.service_pb2 as service
import ring.task.v1.service_pb2_grpc as service_grpc
import dispatch.sdk.v1.endpoint_pb2 as endpoint_pb
import dispatch.sdk.v1.endpoint_pb2_grpc as endpoint_grpc
import dispatch.coroutine


__all__ = ["Client", "TaskID", "TaskInput", "TaskDef"]


@dataclass(frozen=True, repr=False)
class TaskID:
"""Unique task identifier in Dispatch.
TaskID: TypeAlias = str
"""Unique task identifier in Dispatch.
It should be treated as an opaque value.
"""

partition_number: int
block_id: int
record_offset: int
record_size: int

@classmethod
def _from_proto(cls, proto: record_pb.ID) -> TaskID:
return cls(
partition_number=proto.partition_number,
block_id=proto.block_id,
record_offset=proto.record_offset,
record_size=proto.record_size,
)

def _to_proto(self) -> record_pb.ID:
return record_pb.ID(
partition_number=self.partition_number,
block_id=self.block_id,
record_offset=self.record_offset,
record_size=self.record_size,
)

def __str__(self) -> str:
parts = [
self.partition_number,
self.block_id,
self.record_offset,
self.record_size,
]
return "".join("{:08x}".format(a) for a in parts)

def __repr__(self) -> str:
return f"TaskID({self})"
It should be treated as an opaque value.
"""


@dataclass(frozen=True)
Expand Down Expand Up @@ -90,7 +54,7 @@ class TaskInput:
"""


def _taskdef_to_proto(taskdef: TaskDef) -> service.CreateTaskInput:
def _taskdef_to_proto(taskdef: TaskDef) -> endpoint_pb.Execution:
input = taskdef.input
match input:
case google.protobuf.any_pb2.Any():
Expand All @@ -102,7 +66,7 @@ def _taskdef_to_proto(taskdef: TaskDef) -> service.CreateTaskInput:
pickled = pickle.dumps(input)
input_any = google.protobuf.any_pb2.Any()
input_any.Pack(google.protobuf.wrappers_pb2.BytesValue(value=pickled))
return service.CreateTaskInput(coroutine_uri=taskdef.coroutine_uri, input=input_any)
return endpoint_pb.Execution(coroutine_uri=taskdef.coroutine_uri, input=input_any)


class Client:
Expand Down Expand Up @@ -140,16 +104,16 @@ def __init__(
creds = grpc.composite_channel_credentials(creds, call_creds)
channel = grpc.secure_channel(result.netloc, creds)

self._stub = service_grpc.ServiceStub(channel)
self._stub = endpoint_grpc.EndpointServiceStub(channel)

def create_tasks(self, tasks: Iterable[TaskDef]) -> Iterable[TaskID]:
"""Create tasks on Dispatch using the provided inputs.
Returns:
The ID of the created tasks, in the same order as the inputs.
"""
req = service.CreateTasksRequest()
req = endpoint_pb.CreateExecutionsRequest()
for task in tasks:
req.tasks.append(_taskdef_to_proto(task))
resp = self._stub.CreateTasks(req)
return [TaskID._from_proto(x.id) for x in resp.tasks]
req.executions.append(_taskdef_to_proto(task))
resp = self._stub.CreateExecutions(req)
return [TaskID(x) for x in resp.ids]
76 changes: 37 additions & 39 deletions src/dispatch/coroutine.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

import google.protobuf.message

from ring.coroutine.v1 import coroutine_pb2
from ring.status.v1 import status_pb2
from dispatch.sdk.v1 import executor_pb2 as executor_pb
from dispatch.sdk.v1 import status_pb2 as status_pb


# Most types in this package are thin wrappers around the various protobuf
Expand All @@ -33,46 +33,46 @@ class Status(int, enum.Enum):
"""

UNSPECIFIED = status_pb2.STATUS_UNSPECIFIED
OK = status_pb2.STATUS_OK
TIMEOUT = status_pb2.STATUS_TIMEOUT
THROTTLED = status_pb2.STATUS_THROTTLED
INVALID_ARGUMENT = status_pb2.STATUS_INVALID_ARGUMENT
INVALID_RESPONSE = status_pb2.STATUS_INVALID_RESPONSE
TEMPORARY_ERROR = status_pb2.STATUS_TEMPORARY_ERROR
PERMANENT_ERROR = status_pb2.STATUS_PERMANENT_ERROR
INCOMPATIBLE_STATE = status_pb2.STATUS_INCOMPATIBLE_STATE
UNSPECIFIED = status_pb.STATUS_UNSPECIFIED
OK = status_pb.STATUS_OK
TIMEOUT = status_pb.STATUS_TIMEOUT
THROTTLED = status_pb.STATUS_THROTTLED
INVALID_ARGUMENT = status_pb.STATUS_INVALID_ARGUMENT
INVALID_RESPONSE = status_pb.STATUS_INVALID_RESPONSE
TEMPORARY_ERROR = status_pb.STATUS_TEMPORARY_ERROR
PERMANENT_ERROR = status_pb.STATUS_PERMANENT_ERROR
INCOMPATIBLE_STATE = status_pb.STATUS_INCOMPATIBLE_STATE

_proto: status_pb2.Status
_proto: status_pb.Status


# Maybe we should find a better way to define that enum. It's that way to please
# Mypy and provide documentation for the enum values.

Status.UNSPECIFIED.__doc__ = "Status not specified (default)"
Status.UNSPECIFIED._proto = status_pb2.STATUS_UNSPECIFIED
Status.UNSPECIFIED._proto = status_pb.STATUS_UNSPECIFIED
Status.OK.__doc__ = "Coroutine returned as expected"
Status.OK._proto = status_pb2.STATUS_OK
Status.OK._proto = status_pb.STATUS_OK
Status.TIMEOUT.__doc__ = "Coroutine encountered a timeout and may be retried"
Status.TIMEOUT._proto = status_pb2.STATUS_TIMEOUT
Status.TIMEOUT._proto = status_pb.STATUS_TIMEOUT
Status.THROTTLED.__doc__ = "Coroutine was throttled and may be retried later"
Status.THROTTLED._proto = status_pb2.STATUS_THROTTLED
Status.THROTTLED._proto = status_pb.STATUS_THROTTLED
Status.INVALID_ARGUMENT.__doc__ = "Coroutine was provided an invalid type of input"
Status.INVALID_ARGUMENT._proto = status_pb2.STATUS_INVALID_ARGUMENT
Status.INVALID_ARGUMENT._proto = status_pb.STATUS_INVALID_ARGUMENT
Status.INVALID_RESPONSE.__doc__ = "Coroutine was provided an unexpected reponse"
Status.INVALID_RESPONSE._proto = status_pb2.STATUS_INVALID_RESPONSE
Status.INVALID_RESPONSE._proto = status_pb.STATUS_INVALID_RESPONSE
Status.TEMPORARY_ERROR.__doc__ = (
"Coroutine encountered a temporary error, may be retried"
)
Status.TEMPORARY_ERROR._proto = status_pb2.STATUS_TEMPORARY_ERROR
Status.TEMPORARY_ERROR._proto = status_pb.STATUS_TEMPORARY_ERROR
Status.PERMANENT_ERROR.__doc__ = (
"Coroutine encountered a permanent error, should not be retried"
)
Status.PERMANENT_ERROR._proto = status_pb2.STATUS_PERMANENT_ERROR
Status.PERMANENT_ERROR._proto = status_pb.STATUS_PERMANENT_ERROR
Status.INCOMPATIBLE_STATE.__doc__ = (
"Coroutine was provided an incompatible state. May be restarted from scratch"
)
Status.INCOMPATIBLE_STATE._proto = status_pb2.STATUS_INCOMPATIBLE_STATE
Status.INCOMPATIBLE_STATE._proto = status_pb.STATUS_INCOMPATIBLE_STATE


class Coroutine:
Expand Down Expand Up @@ -127,7 +127,7 @@ class Input:
# TODO: first implementation with a single Input type, but we should
# consider using some dynamic filling positional and keyword arguments.

def __init__(self, req: coroutine_pb2.ExecuteRequest):
def __init__(self, req: executor_pb.ExecuteRequest):
self._has_input = req.HasField("input")
if self._has_input:
input_pb = google.protobuf.wrappers_pb2.BytesValue()
Expand Down Expand Up @@ -184,28 +184,28 @@ class Output:
"""

def __init__(self, proto: coroutine_pb2.ExecuteResponse):
def __init__(self, proto: executor_pb.ExecuteResponse):
self._message = proto

@classmethod
def value(cls, value: Any, status: Status = Status.OK) -> Output:
"""Terminally exit the coroutine with the provided return value."""
output_any = _pb_any_pickle(value)
return Output(
coroutine_pb2.ExecuteResponse(
executor_pb.ExecuteResponse(
status=status._proto,
exit=coroutine_pb2.Exit(result=coroutine_pb2.Result(output=output_any)),
exit=executor_pb.Exit(result=executor_pb.Result(output=output_any)),
)
)

@classmethod
def error(cls, error: Error) -> Output:
"""Terminally exit the coroutine with the provided error."""
return Output(
coroutine_pb2.ExecuteResponse(
executor_pb.ExecuteResponse(
status=error.status._proto,
exit=coroutine_pb2.Exit(
result=coroutine_pb2.Result(error=error._as_proto())
exit=executor_pb.Exit(
result=executor_pb.Result(error=error._as_proto())
),
)
)
Expand All @@ -216,35 +216,33 @@ def callback(cls, state: Any, calls: None | list[Call] = None) -> Output:
coroutine with the provided state. The state will be made available in
Input.state."""
state_bytes = pickle.dumps(state)
poll = coroutine_pb2.Poll(state=state_bytes)
poll = executor_pb.Poll(state=state_bytes)

if calls is not None:
for c in calls:
input_bytes = _pb_any_pickle(c.input)
x = coroutine_pb2.Call(
x = executor_pb.Call(
coroutine_uri=c.coroutine_uri,
coroutine_version=c.coroutine_version,
correlation_id=c.correlation_id,
input=input_bytes,
)
poll.calls.append(x)

return Output(coroutine_pb2.ExecuteResponse(poll=poll))
return Output(executor_pb.ExecuteResponse(poll=poll))

@classmethod
def tailcall(cls, call: Call) -> Output:
"""Exit the coroutine instructing the orchestrator to call the provided
coroutine."""
input_bytes = _pb_any_pickle(call.input)
x = coroutine_pb2.Call(
x = executor_pb.Call(
coroutine_uri=call.coroutine_uri,
coroutine_version=call.coroutine_version,
correlation_id=call.correlation_id,
input=input_bytes,
)
return Output(
coroutine_pb2.ExecuteResponse(exit=coroutine_pb2.Exit(tail_call=x))
)
return Output(executor_pb.ExecuteResponse(exit=executor_pb.Exit(tail_call=x)))


# Note: contrary to other classes here Call is not just a wrapper around its
Expand Down Expand Up @@ -276,7 +274,7 @@ class CallResult:
This class is not meant to be instantiated directly.
"""

def __init__(self, proto: coroutine_pb2.CallResult):
def __init__(self, proto: executor_pb.CallResult):
self.coroutine_uri = proto.coroutine_uri
self.coroutine_version = proto.coroutine_version
self.correlation_id = proto.correlation_id
Expand Down Expand Up @@ -344,11 +342,11 @@ def from_exception(cls, ex: Exception, status: Status | None = None) -> Error:
return Error(status, ex.__class__.__qualname__, str(ex))

@classmethod
def _from_proto(cls, proto: coroutine_pb2.Error) -> Error:
def _from_proto(cls, proto: executor_pb.Error) -> Error:
return cls(Status.UNSPECIFIED, proto.type, proto.message)

def _as_proto(self) -> coroutine_pb2.Error:
return coroutine_pb2.Error(type=self.type, message=self.message)
def _as_proto(self) -> executor_pb.Error:
return executor_pb.Error(type=self.type, message=self.message)


def _any_unpickle(any: google.protobuf.any_pb2.Any) -> Any:
Expand Down
6 changes: 3 additions & 3 deletions src/dispatch/fastapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def read_root():
import fastapi.responses
from httpx import _urlparse

import ring.coroutine.v1.coroutine_pb2
from dispatch.sdk.v1 import executor_pb2 as executor_pb
import dispatch.coroutine


Expand Down Expand Up @@ -65,7 +65,7 @@ def configure(
dispatch_app = _new_app(public_url)

app.__setattr__("dispatch_coroutine", dispatch_app.dispatch_coroutine)
app.mount("/ring.coroutine.v1.ExecutorService", dispatch_app)
app.mount("/dispatch.sdk.v1.ExecutorService", dispatch_app)


class _DispatchAPI(fastapi.FastAPI):
Expand Down Expand Up @@ -117,7 +117,7 @@ async def execute(request: fastapi.Request):
# forcing execute() to be async.
data: bytes = await request.body()

req = ring.coroutine.v1.coroutine_pb2.ExecuteRequest.FromString(data)
req = executor_pb.ExecuteRequest.FromString(data)

if not req.coroutine_uri:
raise fastapi.HTTPException(
Expand Down
41 changes: 0 additions & 41 deletions src/dispatch/http/v1/http_pb2.py

This file was deleted.

Loading

0 comments on commit 2c11d89

Please sign in to comment.