Skip to content

Commit

Permalink
fix: restructure depends_on flag. No using dicts to store dependent p…
Browse files Browse the repository at this point in the history
…rocesses and only start process if all dependent processes are in RUNNING state
  • Loading branch information
Michael Hammann committed Sep 6, 2021
1 parent ba25802 commit 2c4e13c
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 9 deletions.
6 changes: 0 additions & 6 deletions supervisor/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,12 +195,6 @@ def spawn(self):
Return the process id. If the fork() call fails, return None.
"""
# check if the process is dependent upon any other process and if so make sure that one is in the RUNNING state
if self.config.depends_on is not None:
for process in enumerate(self.config.depends_on):
if self.config.depends_on[process[0]].state not in RUNNING_STATES:
self.config.depends_on[process[0]].spawn()

options = self.config.options
processname = as_string(self.config.name)

Expand Down
25 changes: 25 additions & 0 deletions supervisor/rpcinterface.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,31 @@ def startProcess(self, name, wait=True):
@return boolean result Always true unless error
"""
# check if the process is dependent upon any other process and if so make sure that one is in the RUNNING state
group, process = self._getGroupAndProcess(name)
if process.config.depends_on is not None:
# keep track of RUNNING childs
running_childs = set()
# wait/loop until all childs are running
while set(process.config.depends_on.keys()) != running_childs:
for child in process.config.depends_on.values():
if child.get_state() != ProcessStates.RUNNING:
# potentially remove child, if it is in running list
if child.config.name in running_childs:
running_childs.remove(child.config.name)
# check if it needs to be started
if child.state is not (ProcessStates.STARTING or ProcessStates.RUNNING):
self.startProcess(child.config.name)
else:
child.transition()
msg = ("waiting on dependee process {} to reach running state - currently in {}"
.format(child.config.name, getProcessStateDescription(child.state)))
self.supervisord.options.logger.warn(msg)
else:
# child is running - add to set
running_childs.add(child.config.name)
time.sleep(0.5)

self._update('startProcess')
group, process = self._getGroupAndProcess(name)
if process is None:
Expand Down
6 changes: 3 additions & 3 deletions supervisor/supervisord.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def run(self):
# check dependencies for all programs in group:
for conf in enumerate(config.process_configs):
if config.process_configs[conf[0]].depends_on is not None:
processes=[]
process_dict=dict({})
# split to get all processes in case there are multiple dependencies
dependent_processes = (config.process_configs[conf[0]].depends_on).split()
for process in dependent_processes:
Expand All @@ -102,8 +102,8 @@ def run(self):
except:
dependent_group=dependent_process=process
g.addEdge(config.process_configs[conf[0]].name, dependent_process)
processes.append(self.process_groups[dependent_group].processes[dependent_process])
config.process_configs[conf[0]].depends_on = processes
process_dict[dependent_process] = self.process_groups[dependent_group].processes[dependent_process]
config.process_configs[conf[0]].depends_on = process_dict
# check for cyclical process dependencies
if g.cyclic() == 1:
raise AttributeError('Process config contains dependeny cycle(s)! Check config files again!')
Expand Down

0 comments on commit 2c4e13c

Please sign in to comment.