Skip to content

Commit

Permalink
Enhance launcher executor and subprocess launcher
Browse files Browse the repository at this point in the history
  • Loading branch information
YuanTingHsieh committed Jul 30, 2024
1 parent 1ef5207 commit a16b814
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 3 deletions.
9 changes: 6 additions & 3 deletions nvflare/app_common/executors/launcher_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ def _init_converter(self, fl_ctx: FLContext):
def _initialize_external_execution(
self, task_name: str, shareable: Shareable, fl_ctx: FLContext, abort_signal: Signal
) -> bool:
self.reset_peer_is_up_or_dead()
with self._lock:
self._abort_signal = abort_signal
self._current_task = task_name
Expand All @@ -242,13 +243,15 @@ def _initialize_external_execution(
abort_signal.trigger("launch task failed")
return False

self.log_info(fl_ctx, f"External execution for task ({task_name}) is launched.")
self.log_info(fl_ctx, f"launcher launch_task for ({task_name}) succeed.")
# wait for external execution to set up their pipe_handler
setup_success = self._wait_external_setup(task_name, fl_ctx, abort_signal)
if not setup_success:
self.log_error(fl_ctx, "External execution set up failed.")
abort_signal.trigger("External execution set up failed.")
error = f"External execution set up for task ({task_name}) failed."
self.log_error(fl_ctx, error)
abort_signal.trigger(error)
return False
self.log_info(fl_ctx, f"External execution set up for task ({task_name}) succeed.")
return True

def _execute_launcher_method_in_thread_executor(self, method_name: str, **kwargs) -> Any:
Expand Down
3 changes: 3 additions & 0 deletions nvflare/app_common/executors/task_exchanger.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,9 @@ def ask_peer_to_end(self, fl_ctx: FLContext) -> bool:
def peer_is_up_or_dead(self) -> bool:
return self.pipe_handler.peer_is_up_or_dead.is_set()

def reset_peer_is_up_or_dead(self):
self.pipe_handler.peer_is_up_or_dead.clear()

def pause_pipe_handler(self):
"""Stops pipe_handler heartbeat."""
self.pipe_handler.pause()
Expand Down

0 comments on commit a16b814

Please sign in to comment.