Skip to content

Commit

Permalink
fix: reimplement depends_on flag using dicts
Browse files Browse the repository at this point in the history
  • Loading branch information
Michael Hammann committed Oct 9, 2021
2 parents 18a7a8d + d9b812c commit 3338fd3
Show file tree
Hide file tree
Showing 5 changed files with 151 additions and 301 deletions.
29 changes: 0 additions & 29 deletions CHANGELOG.md

This file was deleted.

10 changes: 1 addition & 9 deletions supervisor/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -932,13 +932,6 @@ def get(section, opt, *args, **kwargs):
serverurl = get(section, 'serverurl', None)
if serverurl and serverurl.strip().upper() == 'AUTO':
serverurl = None
runningregex = get(section, 'runningregex', None)
if runningregex:
try:
runningregex = re.compile(r'.*'+ runningregex)
except Exception as e:
raise ValueError(
f"program section {section} has invalid runningregex value. Error {e}")
depends_on = get(section, 'depends_on', None)
spawn_timeout = int(get(section, 'spawn_timeout', 60))

Expand Down Expand Up @@ -1067,7 +1060,6 @@ def get(section, opt, *args, **kwargs):
redirect_stderr=redirect_stderr,
environment=environment,
serverurl=serverurl,
runningregex=runningregex,
depends_on=depends_on,
spawn_timeout=spawn_timeout,
)
Expand Down Expand Up @@ -1889,7 +1881,7 @@ class ProcessConfig(Config):
'stopsignal', 'stopwaitsecs', 'stopasgroup', 'killasgroup',
'exitcodes', 'redirect_stderr' ]
optional_param_names = [ 'environment', 'serverurl',
'runningregex', 'depends_on', 'spawn_timeout' ]
'depends_on', 'spawn_timeout' ]

def __init__(self, options, **params):
self.options = options
Expand Down
185 changes: 74 additions & 111 deletions supervisor/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,6 @@

from supervisor.socket_manager import SocketManager

from supervisor.rpcinterface import SupervisorNamespaceRPCInterface
from supervisor.xmlrpc import (
capped_int,
Faults,
RPCError,
)

@functools.total_ordering
class Subprocess(object):

Expand Down Expand Up @@ -200,96 +193,95 @@ def record_spawnerr(self, msg):
self.config.options.logger.info("spawnerr: %s" % msg)

def queue_all_dependee_processes(self, supervisor):
if self.config.name not in supervisor.process_spawn_set:
supervisor.process_spawn_queue.append(self)
supervisor.process_spawn_set.add(self.config.name)
# all dependees that are not in queue and not in STARTING need to be added to queue.
if (self.config.name not in supervisor.process_spawn_dict.keys() or
self.config.name not in supervisor.process_started_dict.keys()):
supervisor.process_spawn_dict[self.config.name] = self
if self.config.depends_on is not None:
for dependee in self.config.depends_on.values():
if dependee.state is not ProcessStates.RUNNING:
ready_to_be_spawned = False
if dependee.state is not ProcessStates.STARTING:
if dependee.config.name not in supervisor.process_spawn_set:
supervisor.process_spawn_queue.append(dependee)
supervisor.process_spawn_set.add(dependee.config.name)
if dependee.state is not ProcessStates.RUNNING and dependee.state is not ProcessStates.STARTING:
if (dependee.config.name not in supervisor.process_spawn_dict.keys() or
dependee.config.name not in supervisor.process_started_dict.keys()):
supervisor.process_spawn_dict[dependee.config.name] = dependee
dependee.queue_all_dependee_processes(supervisor)


def spawn(self, supervisor=None):
"""Start the subprocess. It must not be running already.
Return the process id. If the fork() call fails, return None.
"""
# spawn if all dependees are running - else add to queue if not in queue already
ready_to_be_spawned = True

options = self.config.options
processname = as_string(self.config.name)

if self.pid:
msg = 'process \'%s\' already running' % processname
options.logger.warn(msg)
if self.config.depends_on is not None:
if any([dependee.state is not ProcessStates.RUNNING for dependee in
self.config.depends_on.values()]):
self.queue_all_dependee_processes(supervisor)
return

self.killing = False
self.spawnerr = None
self.exitstatus = None
self.system_stop = False
self.administrative_stop = False
options = self.config.options
processname = as_string(self.config.name)

self.laststart = time.time()
if self.pid:
msg = 'process \'%s\' already running' % processname
options.logger.warn(msg)
return

self._assertInState(ProcessStates.EXITED, ProcessStates.FATAL,
ProcessStates.BACKOFF, ProcessStates.STOPPED)
self.killing = False
self.spawnerr = None
self.exitstatus = None
self.system_stop = False
self.administrative_stop = False

self.change_state(ProcessStates.STARTING)
self.laststart = time.time()

try:
filename, argv = self.get_execv_args()
except ProcessException as what:
self.record_spawnerr(what.args[0])
self._assertInState(ProcessStates.STARTING)
self.change_state(ProcessStates.BACKOFF)
return
self._assertInState(ProcessStates.EXITED, ProcessStates.FATAL,
ProcessStates.BACKOFF, ProcessStates.STOPPED)

try:
self.dispatchers, self.pipes = self.config.make_dispatchers(self)
except (OSError, IOError) as why:
code = why.args[0]
if code == errno.EMFILE:
# too many file descriptors open
msg = 'too many open files to spawn \'%s\'' % processname
else:
msg = 'unknown error making dispatchers for \'%s\': %s' % (
processname, errno.errorcode.get(code, code))
self.record_spawnerr(msg)
self._assertInState(ProcessStates.STARTING)
self.change_state(ProcessStates.BACKOFF)
return
self.change_state(ProcessStates.STARTING)

try:
pid = options.fork()
except OSError as why:
code = why.args[0]
if code == errno.EAGAIN:
# process table full
msg = ('Too many processes in process table to spawn \'%s\'' %
processname)
else:
msg = 'unknown error during fork for \'%s\': %s' % (
processname, errno.errorcode.get(code, code))
self.record_spawnerr(msg)
self._assertInState(ProcessStates.STARTING)
self.change_state(ProcessStates.BACKOFF)
options.close_parent_pipes(self.pipes)
options.close_child_pipes(self.pipes)
return
try:
filename, argv = self.get_execv_args()
except ProcessException as what:
self.record_spawnerr(what.args[0])
self._assertInState(ProcessStates.STARTING)
self.change_state(ProcessStates.BACKOFF)
return

if pid != 0:
return self._spawn_as_parent(pid)
try:
self.dispatchers, self.pipes = self.config.make_dispatchers(self)
except (OSError, IOError) as why:
code = why.args[0]
if code == errno.EMFILE:
# too many file descriptors open
msg = 'too many open files to spawn \'%s\'' % processname
else:
msg = 'unknown error making dispatchers for \'%s\': %s' % (
processname, errno.errorcode.get(code, code))
self.record_spawnerr(msg)
self._assertInState(ProcessStates.STARTING)
self.change_state(ProcessStates.BACKOFF)
return

try:
pid = options.fork()
except OSError as why:
code = why.args[0]
if code == errno.EAGAIN:
# process table full
msg = ('Too many processes in process table to spawn \'%s\'' %
processname)
else:
return self._spawn_as_child(filename, argv)
msg = 'unknown error during fork for \'%s\': %s' % (
processname, errno.errorcode.get(code, code))
self.record_spawnerr(msg)
self._assertInState(ProcessStates.STARTING)
self.change_state(ProcessStates.BACKOFF)
options.close_parent_pipes(self.pipes)
options.close_child_pipes(self.pipes)
return

if pid != 0:
return self._spawn_as_parent(pid)

else:
return self._spawn_as_child(filename, argv)

def _spawn_as_parent(self, pid):
# Parent
Expand Down Expand Up @@ -677,7 +669,7 @@ def __repr__(self):
def get_state(self):
return self.state

def transition(self, supervisord_instance=None):
def transition(self, supervisor=None):
now = time.time()
state = self.state

Expand All @@ -698,15 +690,7 @@ def transition(self, supervisord_instance=None):
self.spawn(supervisor)
elif state == ProcessStates.STOPPED and not self.laststart:
if self.config.autostart:
# STOPPED -> STARTING
# make sure dependent processes are spawned before
# check if the process is dependent upon any other process and if so,
# make sure that one is in the RUNNING state
if self.config.depends_on is not None:
rpc_interface = SupervisorNamespaceRPCInterface(supervisord_instance)
rpc_interface.startProcess(self.group.config.name + ':' + self.config.name)
else:
self.spawn()
self.spawn(supervisor)
elif state == ProcessStates.BACKOFF:
if self.backoff <= self.config.startretries:
if now > self.delay:
Expand All @@ -715,7 +699,7 @@ def transition(self, supervisord_instance=None):

processname = as_string(self.config.name)
if state == ProcessStates.STARTING:
if now - self.laststart > self.config.startsecs and not self.config.runningregex:
if now - self.laststart > self.config.startsecs:
# STARTING -> RUNNING if the proc has started
# successfully and it has stayed up for at least
# proc.config.startsecs,
Expand All @@ -728,27 +712,6 @@ def transition(self, supervisord_instance=None):
'> than %s seconds (startsecs)' % self.config.startsecs)
logger.info('success: %s %s' % (processname, msg))

if self.config.runningregex:
logfile = getattr(self.config, 'stdout_logfile')
logfile_as_str = as_string(readFile(logfile, self.log_offset, 0))

# delete ascii escape sequence and newlines with regular expression
ansi_escape = re.compile(r'(\x9B|\x1B\[)[0-?]*[ -\/]*[@-~]|\n')
logfile_as_str = ansi_escape.sub('', logfile_as_str)

# STARTING -> RUNNING if the process has started
# successfully and the runningregex is met
if self.config.runningregex.match(logfile_as_str):
self.delay = 0
self.backoff = 0
self._assertInState(ProcessStates.STARTING)
self.change_state(ProcessStates.RUNNING)
msg = ('entered RUNNING state, found runningregex in stdout')
logger.info('success: %s %s' % (processname, msg))




if state == ProcessStates.BACKOFF:
if self.backoff > self.config.startretries:
# BACKOFF -> FATAL if the proc has exceeded its number
Expand Down Expand Up @@ -894,9 +857,9 @@ def before_remove(self):
pass

class ProcessGroup(ProcessGroupBase):
def transition(self, supervisord_instance):
def transition(self, supervisor):
for proc in self.processes.values():
proc.transition(supervisord_instance)
proc.transition(supervisor)

class FastCGIProcessGroup(ProcessGroup):

Expand Down
Loading

0 comments on commit 3338fd3

Please sign in to comment.