From 595def4680eeaa420d0ceb73d2fcf7f5b893264d Mon Sep 17 00:00:00 2001 From: Sukrit Kalra Date: Sun, 18 Feb 2024 12:04:51 -0800 Subject: [PATCH] [RPC] Release the sources of TaskGraph when all executors are ready. --- rpc/service.py | 55 ++++++++++++++++++++++++++++++++++++++++++++++- workload/tasks.py | 2 +- 2 files changed, 55 insertions(+), 2 deletions(-) diff --git a/rpc/service.py b/rpc/service.py index be9256d2..02ac3da1 100644 --- a/rpc/service.py +++ b/rpc/service.py @@ -344,7 +344,6 @@ async def RegisterTaskGraph(self, request, context): ), deadline=EventTime(request.deadline, EventTime.Unit.S), ) - task.release(EventTime(request.timestamp, EventTime.Unit.S)) task_graph = TaskGraph( name=request.id, tasks={task: []}, @@ -367,6 +366,60 @@ async def RegisterTaskGraph(self, request, context): num_executors=FLAGS.initial_executors, ) + async def RegisterEnvironmentReady(self, request, context): + """Registers that the environment (i.e., executors) are ready for the given + TaskGraph at the specified time. + + This is intended to release the sources of the TaskGraph to the scheduling + backend, to consider the application in this scheduling cycle. + """ + if not self._initialized: + self._logger.warning( + "Trying to register that the environment is ready for the TaskGraph " + "with ID %s, but no framework is registered yet.", + request.id, + ) + return erdos_scheduler_pb2.RegisterEnvironmentReadyResponse( + success=False, message="Framework not registered yet." + ) + + task_graph = self._workload.get_task_graph(request.id) + if task_graph is None: + self._logger.warning( + "Trying to register that the environment is ready for the TaskGraph " + "with ID %s, but no TaskGraph with that ID is registered.", + request.id, + ) + return erdos_scheduler_pb2.RegisterEnvironmentReadyResponse( + success=False, + message=f"TaskGraph with ID {request.id} not registered yet.", + ) + + if request.num_executors != FLAGS.initial_executors: + self._logger.warning( + "The TaskGraph %s requires %s executors, but the environment is ready " + "with %s executors.", + request.id, + FLAGS.initial_executors, + request.num_executors, + ) + return erdos_scheduler_pb2.RegisterEnvironmentReadyResponse( + success=False, + message=f"Number of executors not {FLAGS.initial_executors}.", + ) + + # Release all the sources of the TaskGraph at the given time. + for source_task in task_graph.get_source_tasks(): + source_task.release(EventTime(request.timestamp, EventTime.Unit.S)) + + # TODO (Sukrit): A new application has arrived, we need to queue up the + # execution of the scheduler. + + return erdos_scheduler_pb2.RegisterEnvironmentReadyResponse( + success=True, + message=f"Environment ready for TaskGraph with ID {request.id}!", + ) + async def DeregisterFramework(self, request, context): """Deregisters the framework with the backend scheduler. This is the exit point for a running instance of Spark / Flink to deregister""" diff --git a/workload/tasks.py b/workload/tasks.py index 94e08d4a..6a0081d8 100644 --- a/workload/tasks.py +++ b/workload/tasks.py @@ -156,7 +156,7 @@ def release(self, time: Optional[EventTime] = None): """ if time and type(time) != EventTime: raise ValueError(f"Invalid type received for time: {type(time)}") - if time is None and self._release_time == EventTime(-1, EventTime.Unit.US): + if time is None and self._release_time.is_invalid(): raise ValueError( "Release time should be specified either while " "creating the Task or when releasing it."