Skip to content

Commit

Permalink
Merge pull request #1180 from matthewrmshin/rose-suite-scan-port-file
Browse files Browse the repository at this point in the history
rose suite-scan: scan port files as well
  • Loading branch information
benfitzpatrick committed Mar 19, 2014
2 parents 5789725 + 909b476 commit b19f342
Show file tree
Hide file tree
Showing 7 changed files with 181 additions and 120 deletions.
9 changes: 4 additions & 5 deletions lib/python/rose/suite_control.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from rose.reporter import Event, Reporter
from rose.resource import ResourceLocator
from rose.suite_engine_proc import SuiteEngineProcessor
from rose.suite_scan import SuiteScan
from rose.suite_scan import SuiteScanner
import sys


Expand Down Expand Up @@ -180,10 +180,9 @@ def main():
if not opts.non_interactive:
confirm = prompt
if opts.all:
suite_scan = SuiteScan(event_handler=event_handler)
res = suite_scan.scan()
for r in res:
suite_names.append(str(r).split()[0])
suite_scanner = SuiteScanner(event_handler=event_handler)
results, exceptions = suite_scanner.scan()
suite_names = [result.name for result in results]
else:
if opts.name:
suite_names.append(opts.name)
Expand Down
40 changes: 24 additions & 16 deletions lib/python/rose/suite_engine_proc.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,28 +55,28 @@ def __str__(self):
return "%s %s" % self.args


class SuiteScanResult(object):
class SuiteScanResult(Event):

"""Information on where a suite is running.
suite_name: suite name
user: suite owner's user ID
host: host name of running suite
port: port at host name of running suite
name: suite name
location: can be one of: "user@host:port", location of port file, etc
"""

def __init__(self, suite_name, user, host, port=None):
self.suite_name = suite_name
self.user = user
self.host = host
self.port = port
LEVEL = 0

def __init__(self, name, location):
Event.__init__(self, name, location)
self.name = name
self.location = location

def __cmp__(self, other):
return (cmp(self.name, other.name) or
cmp(self.location, other.location))

def __str__(self):
port = ""
if self.port:
port = ":" + self.port
return "%s %s@%s%s" % (self.suite_name, self.user, self.host, port)
return "%s %s\n" % (self.name, self.location)

class CycleOffset(object):
"""Represent a cycle time offset."""
Expand Down Expand Up @@ -248,6 +248,7 @@ class SuiteEngineProcessor(object):
TASK_NAME_DELIM = {"prefix": "_", "suffix": "_"}
SCHEME_HANDLER_MANAGER = None
SCHEME_DEFAULT = "cylc" # TODO: site configuration?
TIMEOUT = 5 # seconds

@classmethod
def get_processor(cls, key=None, event_handler=None, popen=None,
Expand Down Expand Up @@ -582,8 +583,15 @@ def run(self, suite_name, host=None, host_environ=None, restart_mode=False,
"""Start a suite (in a specified host)."""
raise NotImplementedError()

def scan(self, host_names=None):
"""Return a list of SuiteScanResult for suites running in host_names.
def scan(self, host_names=None, timeout=TIMEOUT):
"""Scan for running suites (in hosts).
Return (suite_scan_results, exceptions) where
suite_scan_results is a list of SuiteScanResult instances and
exceptions is a list of exceptions resulting from any failed scans
Default timeout for SSH and "cylc scan" command is 5 seconds.
"""
raise NotImplementedError()

Expand Down
80 changes: 64 additions & 16 deletions lib/python/rose/suite_engine_procs/cylc.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@
from uuid import uuid4


_PORT_FILE = "port-file"
_PORT_SCAN = "port-scan"


class CylcProcessor(SuiteEngineProcessor):

"""Logic specific to the Cylc suite engine."""
Expand Down Expand Up @@ -77,7 +81,6 @@ class CylcProcessor(SuiteEngineProcessor):
"name ASC, cycle DESC, task_events.submit_num DESC",
"name_desc_cycle_desc":
"name DESC, cycle DESC, task_events.submit_num DESC"}
PYRO_TIMEOUT = 5
REC_CYCLE_TIME = re.compile(r"\A[\+\-]?\d+(?:T\d+)?\Z") # Good enough?
REC_SEQ_LOG = re.compile(r"\A(.*\.)(\d+)(\.html)?\Z")
SCHEME = "cylc"
Expand All @@ -89,6 +92,7 @@ class CylcProcessor(SuiteEngineProcessor):
SUITE_DB = "cylc-suite.db"
SUITE_DIR_REL_ROOT = "cylc-run"
TASK_ID_DELIM = "."
TIMEOUT = 5 # seconds

def __init__(self, *args, **kwargs):
SuiteEngineProcessor.__init__(self, *args, **kwargs)
Expand Down Expand Up @@ -912,28 +916,72 @@ def run(self, suite_name, host=None, host_environ=None, run_mode=None,
if out:
self.handle_event(out)

def scan(self, hosts=None):
"""Return a list of SuiteScanResult for suites running in hosts.
def scan(self, hosts=None, timeout=None):
"""Scan for running suites (in hosts).
Return (suite_scan_results, exceptions) where
suite_scan_results is a list of SuiteScanResult instances and
exceptions is a list of exceptions resulting from any failed scans
Default timeout for SSH and "cylc scan" command is 5 seconds.
"""
if not hosts:
hosts = ["localhost"]
host_proc_dict = {}
if not timeout:
timeout = self.TIMEOUT
procs = {}
for host in sorted(hosts):
timeout = "--pyro-timeout=%s" % self.PYRO_TIMEOUT
proc = self.popen.run_bg("cylc", "scan", "--host=" + host, timeout)
host_proc_dict[host] = proc
ret = []
while host_proc_dict:
for host, proc in host_proc_dict.items():
cmd = ["cylc", "scan", "--host=" + host, "--pyro-timeout=%s" % timeout]
proc = self.popen.run_bg(*cmd)
procs[(host, _PORT_SCAN, tuple(cmd))] = proc
sh_cmd = "whoami && cd ~/.cylc/ports/ && ls || true"
if host == "localhost":
cmd = ["bash", "-c", sh_cmd]
else:
cmd = self.popen.get_cmd(
"ssh", "-oConnectTimeout=%s" % timeout, host, sh_cmd)
proc = self.popen.run_bg(*cmd)
procs[(host, _PORT_FILE, tuple(cmd))] = proc
results = {}
exceptions = []
while procs:
for keys, proc in procs.items():
rc = proc.poll()
if rc is not None:
host_proc_dict.pop(host)
if rc == 0:
if rc is None:
continue
procs.pop(keys)
host, key, cmd = keys
if rc == 0:
if key == _PORT_SCAN:
for line in proc.communicate()[0].splitlines():
ret.append(SuiteScanResult(*line.split()))
if host_proc_dict:
name, user, host, port = line.split()
auth = "%s@%s:%s" % (user, host, port)
result = SuiteScanResult(name, auth)
results[(name, host, key)] = result
# N.B. Trust port-scan over port-file
for i_host in hosts:
try:
results.pop((name, i_host, _PORT_FILE))
except KeyError:
pass
else: # if key == _PORT_FILE:
lines = proc.communicate()[0].splitlines()
user = lines.pop(0)
for name in lines:
# N.B. Trust port-scan over port-file
if (name, host, _PORT_SCAN) in results:
continue
location = "%s@%s:%s" % (
user, host, "~/.cylc/ports/" + name)
result = SuiteScanResult(name, location)
results[(name, host, key)] = result
else:
out, err = proc.communicate()
exceptions.append(RosePopenError(cmd, rc, out, err))
if procs:
sleep(0.1)
return ret
return (sorted(results.values()), exceptions)

def shutdown(self, suite_name, host=None, engine_version=None, args=None,
stderr=None, stdout=None):
Expand Down
32 changes: 23 additions & 9 deletions lib/python/rose/suite_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from rose.suite_engine_proc import SuiteEngineProcessor
import sys

class SuiteScan(object):
class SuiteScanner(object):

"""Scan for running suites in suite hosts."""

Expand All @@ -51,14 +51,23 @@ def handle_event(self, *args, **kwargs):
return self.event_handler(*args, **kwargs)

def scan(self, *hosts):
"""Scan for running suites (in hosts)."""
"""Scan for running suites (in hosts).
Return (suite_scan_results, exceptions) where suite_scan_results is a
list of rose.suite_engine_proc.SuiteScanResult instances and exceptions
is a list of exceptions resulting from any failed scans
"""
conf = ResourceLocator.default().get_conf()
if not hosts:
conf = ResourceLocator.default().get_conf()
hosts = self.host_selector.expand(
["localhost"] +
conf.get_value(["rose-suite-run", "hosts"], "").split() +
conf.get_value(["rose-suite-run", "scan-hosts"], "").split())[0]
return self.suite_engine_proc.scan(set(hosts))
timeout = conf.get_value(["rose-suite-scan", "timeout"])
if timeout:
timeout = int(timeout)
return self.suite_engine_proc.scan(set(hosts), timeout=timeout)

__call__ = scan

Expand All @@ -67,13 +76,18 @@ def main():
opt_parser = RoseOptionParser()
opts, args = opt_parser.parse_args()
event_handler = Reporter(opts.verbosity - opts.quietness)
suite_scan = SuiteScan(event_handler=event_handler)
results = suite_scan(*args)
suite_scanner = SuiteScanner(event_handler=event_handler)
results, exceptions = SuiteScanner(event_handler=event_handler)(*args)
ret = 1
if results:
ret = 0
for result in results:
print(result)
else:
sys.exit(1)
suite_scanner.handle_event(result)
if exceptions:
ret = 2
for exception in exceptions:
event_handler(exception)
sys.exit(ret)


if __name__ == "__main__":
Expand Down
47 changes: 35 additions & 12 deletions t/rose-suite-scan/00-localhost.t
Original file line number Diff line number Diff line change
Expand Up @@ -17,38 +17,61 @@
# You should have received a copy of the GNU General Public License
# along with Rose. If not, see <http://www.gnu.org/licenses/>.
#-------------------------------------------------------------------------------
# Test "rose suite-scan", on localhost, without site/user configurations.
# Test "rose suite-scan" with suite localhost or hosts in site/user
# configurations. Assume shared $HOME file system.
#-------------------------------------------------------------------------------
. $(dirname $0)/test_header
export ROSE_CONF_PATH=

if [[ $TEST_KEY_BASE == *localhost ]]; then
export ROSE_CONF_PATH=
HOST=localhost
else
HOSTS=$(rose config rose-suite-run hosts)
if [[ -z $HOSTS ]]; then
skip_all '[rose-suite-run]hosts not defined'
fi
HOST=$(rose host-select $HOSTS)
fi
#-------------------------------------------------------------------------------
tests 6
tests 9
#-------------------------------------------------------------------------------
# Run the suite.
# Run the suite
SUITE_RUN_DIR=$(mktemp -d --tmpdir=$HOME/cylc-run 'rose-test-battery.XXXXXX')
NAME=$(basename $SUITE_RUN_DIR)
rose suite-run -q -C $TEST_SOURCE_DIR/$TEST_KEY_BASE --name=$NAME \
--no-gcontrol --host=localhost
--no-gcontrol --host=$HOST
if [[ $HOST == 'localhost' ]]; then
PORT=$(cat ~/.cylc/ports/$NAME)
else
PORT=$(ssh -oBatchMode=yes $HOST cat ~/.cylc/ports/$NAME)
fi
#-------------------------------------------------------------------------------
# No argument
TEST_KEY=$TEST_KEY_BASE
run_pass "$TEST_KEY" rose suite-scan
file_grep "$TEST_KEY.out" "$NAME $USER@localhost" "$TEST_KEY.out"
file_grep "$TEST_KEY.out" "$NAME $USER@$HOST:$PORT" "$TEST_KEY.out"
file_cmp "$TEST_KEY.err" "$TEST_KEY.err" </dev/null
#-------------------------------------------------------------------------------
# localhost
TEST_KEY=$TEST_KEY_BASE-localhost
run_pass "$TEST_KEY" rose suite-scan localhost
file_grep "$TEST_KEY.out" "$NAME $USER@localhost" "$TEST_KEY.out"
# Specific host
TEST_KEY=$TEST_KEY_BASE-hostname
run_pass "$TEST_KEY" rose suite-scan $HOST
file_grep "$TEST_KEY.out" "$NAME $USER@$HOST:$PORT" "$TEST_KEY.out"
file_cmp "$TEST_KEY.err" "$TEST_KEY.err" </dev/null
#-------------------------------------------------------------------------------
# Wait for the suite to complete
touch $SUITE_RUN_DIR/flag
TIMEOUT=$(($(date +%s) + 60)) # wait 1 minute
while [[ -e $HOME/.cylc/ports/$NAME ]] && (($(date +%s) < TIMEOUT)); do
while [[ -e ~/.cylc/ports/$NAME ]] && (($(date +%s) < TIMEOUT)); do
sleep 1
done
rose suite-clean -q -y $NAME || exit 1
#-------------------------------------------------------------------------------
# Left behind port file
TEST_KEY=$TEST_KEY_BASE-port-file
echo 7766 >~/.cylc/ports/$NAME
run_pass "$TEST_KEY" rose suite-scan
file_grep "$TEST_KEY.out" \
"$NAME $USER@$HOST:~/.cylc/ports/$NAME" "$TEST_KEY.out"
file_cmp "$TEST_KEY.err" "$TEST_KEY.err" </dev/null
rm ~/.cylc/ports/$NAME
#-------------------------------------------------------------------------------
exit 0
Loading

0 comments on commit b19f342

Please sign in to comment.