Skip to content

Commit

Permalink
[RPC] Release the sources of TaskGraph when all executors are ready.
Browse files Browse the repository at this point in the history
  • Loading branch information
sukritkalra committed Feb 18, 2024
1 parent 602f80c commit 595def4
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 2 deletions.
55 changes: 54 additions & 1 deletion rpc/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,6 @@ async def RegisterTaskGraph(self, request, context):
),
deadline=EventTime(request.deadline, EventTime.Unit.S),
)
task.release(EventTime(request.timestamp, EventTime.Unit.S))
task_graph = TaskGraph(
name=request.id,
tasks={task: []},
Expand All @@ -367,6 +366,60 @@ async def RegisterTaskGraph(self, request, context):
num_executors=FLAGS.initial_executors,
)

async def RegisterEnvironmentReady(self, request, context):
"""Registers that the environment (i.e., executors) are ready for the given
TaskGraph at the specified time.
This is intended to release the sources of the TaskGraph to the scheduling
backend, to consider the application in this scheduling cycle.
"""
if not self._initialized:
self._logger.warning(
"Trying to register that the environment is ready for the TaskGraph "
"with ID %s, but no framework is registered yet.",
request.id,
)
return erdos_scheduler_pb2.RegisterEnvironmentReadyResponse(
success=False, message="Framework not registered yet."
)

task_graph = self._workload.get_task_graph(request.id)
if task_graph is None:
self._logger.warning(
"Trying to register that the environment is ready for the TaskGraph "
"with ID %s, but no TaskGraph with that ID is registered.",
request.id,
)
return erdos_scheduler_pb2.RegisterEnvironmentReadyResponse(
success=False,
message=f"TaskGraph with ID {request.id} not registered yet.",
)

if request.num_executors != FLAGS.initial_executors:
self._logger.warning(
"The TaskGraph %s requires %s executors, but the environment is ready "
"with %s executors.",
request.id,
FLAGS.initial_executors,
request.num_executors,
)
return erdos_scheduler_pb2.RegisterEnvironmentReadyResponse(
success=False,
message=f"Number of executors not {FLAGS.initial_executors}.",
)

# Release all the sources of the TaskGraph at the given time.
for source_task in task_graph.get_source_tasks():
source_task.release(EventTime(request.timestamp, EventTime.Unit.S))

# TODO (Sukrit): A new application has arrived, we need to queue up the
# execution of the scheduler.

return erdos_scheduler_pb2.RegisterEnvironmentReadyResponse(
success=True,
message=f"Environment ready for TaskGraph with ID {request.id}!",
)

async def DeregisterFramework(self, request, context):
"""Deregisters the framework with the backend scheduler.
This is the exit point for a running instance of Spark / Flink to deregister"""
Expand Down
2 changes: 1 addition & 1 deletion workload/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ def release(self, time: Optional[EventTime] = None):
"""
if time and type(time) != EventTime:
raise ValueError(f"Invalid type received for time: {type(time)}")
if time is None and self._release_time == EventTime(-1, EventTime.Unit.US):
if time is None and self._release_time.is_invalid():
raise ValueError(
"Release time should be specified either while "
"creating the Task or when releasing it."
Expand Down

0 comments on commit 595def4

Please sign in to comment.