From 5531ce4bc0869e761b5dafe9ba8cb5e43d150717 Mon Sep 17 00:00:00 2001 From: John Chilton Date: Tue, 30 Apr 2019 09:23:31 -0400 Subject: [PATCH 1/9] Refactor job_conf -> dictifiable: -> runners: --- lib/galaxy/jobs/__init__.py | 101 ++++++++++++------ ...{delay_job_conf.xml => delay_job_conf.yml} | 0 2 files changed, 70 insertions(+), 31 deletions(-) rename test/integration/{delay_job_conf.xml => delay_job_conf.yml} (100%) diff --git a/lib/galaxy/jobs/__init__.py b/lib/galaxy/jobs/__init__.py index ca1c5b3b4ce4..b25a3bb3efe4 100644 --- a/lib/galaxy/jobs/__init__.py +++ b/lib/galaxy/jobs/__init__.py @@ -112,6 +112,35 @@ def config_exception(e, file): return Exception(message) +def job_config_xml_to_dict(config, root): + config_dict = {} + + runners = {} + config_dict["runners"] = runners + + # Parser plugins section populate 'runners' and 'dynamic' in config_dict. + plugins = root.find('plugins') + if plugins is not None: + for plugin in ConfiguresHandlers._findall_with_required(plugins, 'plugin', ('id', 'type', 'load')): + if plugin.get('type') == 'runner': + workers = plugin.get('workers', plugins.get('workers', JobConfiguration.DEFAULT_NWORKERS)) + runner_kwds = JobConfiguration.get_params(config, plugin) + plugin_id = plugin.get('id') + runner_info = dict(id=plugin_id, + load=plugin.get('load'), + workers=int(workers), + kwds=runner_kwds) + runners[plugin_id] = runner_info + else: + log.error('Unknown plugin type: %s' % plugin.get('type')) + + for plugin in ConfiguresHandlers._findall_with_required(plugins, 'plugin', ('id', 'type')): + if plugin.get('id') == 'dynamic' and plugin.get('type') == 'runner': + config_dict["dynamic"] = JobConfiguration.get_params(config, plugin) + + return config_dict + + class JobConfiguration(ConfiguresHandlers): """A parser and interface to advanced job management features. @@ -182,6 +211,29 @@ def __init__(self, app): except Exception as e: raise config_exception(e, job_config_file) + def _configure_from_dict(self, job_config_dict): + for runner_id, runner_info in job_config_dict["runners"].items(): + if "kwds" not in runner_info: + # convert all 'extra' parameters into kwds, allows defining a runner + # with a flat dictionary. + kwds = {} + for key, value in runner_info.items(): + if key in ['id', 'load', 'workers']: + continue + kwds[key] = value + runner_info["kwds"] = kwds + + if not self.__is_enabled(runner_info.get("kwds")): + continue + runner_info["id"] = runner_id + if runner_id == "dynamic": + log.warning('Deprecated treatment of dynamic running configuration as an actual job runner.') + self.dynamic_params = runner_info["kwds"] + continue + self.runner_plugins.append(runner_info) + if "dynamic" in job_config_dict: + self.dynamic_params = job_config_dict.get("dynamic", None) + def __parse_job_conf_xml(self, tree): """Loads the new-style job configuration from options in the job config file (by default, job_conf.xml). @@ -191,29 +243,12 @@ def __parse_job_conf_xml(self, tree): root = tree.getroot() log.debug('Loading job configuration from %s' % self.app.config.job_config_file) - # Parse job plugins - plugins = root.find('plugins') - if plugins is not None: - for plugin in self._findall_with_required(plugins, 'plugin', ('id', 'type', 'load')): - if plugin.get('type') == 'runner': - workers = plugin.get('workers', plugins.get('workers', JobConfiguration.DEFAULT_NWORKERS)) - runner_kwds = self.__get_params(plugin) - if not self.__is_enabled(runner_kwds): - continue - runner_info = dict(id=plugin.get('id'), - load=plugin.get('load'), - workers=int(workers), - kwds=runner_kwds) - self.runner_plugins.append(runner_info) - else: - log.error('Unknown plugin type: %s' % plugin.get('type')) - for plugin in self._findall_with_required(plugins, 'plugin', ('id', 'type')): - if plugin.get('id') == 'dynamic' and plugin.get('type') == 'runner': - self.dynamic_params = self.__get_params(plugin) - + job_config_dict = job_config_xml_to_dict(self.app.config, root) # Load tasks if configured if self.app.config.use_tasked_jobs: - self.runner_plugins.append(dict(id='tasks', load='tasks', workers=self.app.config.local_task_queue_workers)) + job_config_dict["runners"]["tasks"] = dict(id='tasks', load='tasks', workers=self.app.config.local_task_queue_workers, kwds={}) + + self._configure_from_dict(job_config_dict) # Parse handlers handlers_conf = root.find('handlers') @@ -383,14 +418,8 @@ def get_tool_resource_xml(self, tool_id, tool_type): def __parse_resource_parameters(self): self.resource_parameters = util.parse_resource_parameters(self.app.config.job_resource_params_file) - def __get_params(self, parent): - """Parses any child tags in to a dictionary suitable for persistence. - - :param parent: Parent element in which to find child tags. - :type parent: ``xml.etree.ElementTree.Element`` - - :returns: dict - """ + @staticmethod + def get_params(config, parent): rval = {} for param in parent.findall('param'): key = param.get('id') @@ -406,11 +435,21 @@ def __get_params(self, parent): param_value = os.environ.get(environ_var, param_value) elif 'from_config' in param.attrib: config_val = param.attrib['from_config'] - param_value = self.app.config.config_dict.get(config_val, param_value) + param_value = config.config_dict.get(config_val, param_value) rval[key] = param_value return rval + def __get_params(self, parent): + """Parses any child tags in to a dictionary suitable for persistence. + + :param parent: Parent element in which to find child tags. + :type parent: ``xml.etree.ElementTree.Element`` + + :returns: dict + """ + return JobConfiguration.get_params(self.app.config, parent) + def __get_envs(self, parent): """Parses any child tags in to a dictionary suitable for persistence. @@ -451,7 +490,7 @@ def __get_resubmits(self, parent): def __is_enabled(self, params): """Check for an enabled parameter - pop it out - and return as boolean.""" enabled = True - if "enabled" in params: + if "enabled" in (params or {}): raw_enabled = params.pop("enabled") enabled = util.asbool(raw_enabled) diff --git a/test/integration/delay_job_conf.xml b/test/integration/delay_job_conf.yml similarity index 100% rename from test/integration/delay_job_conf.xml rename to test/integration/delay_job_conf.yml From 3152e03f85a74b12ceed18688707d0f77324376c Mon Sep 17 00:00:00 2001 From: John Chilton Date: Tue, 30 Apr 2019 10:06:28 -0400 Subject: [PATCH 2/9] Refactor job_conf -> dictifiable: -> handling: --- lib/galaxy/jobs/__init__.py | 33 ++++---- lib/galaxy/web/stack/handlers.py | 94 ++++++++++++++++------- lib/galaxy/workflow/scheduling_manager.py | 5 +- test/unit/jobs/test_job_configuration.py | 2 +- 4 files changed, 90 insertions(+), 44 deletions(-) diff --git a/lib/galaxy/jobs/__init__.py b/lib/galaxy/jobs/__init__.py index b25a3bb3efe4..72ecbeb0d831 100644 --- a/lib/galaxy/jobs/__init__.py +++ b/lib/galaxy/jobs/__init__.py @@ -138,6 +138,9 @@ def job_config_xml_to_dict(config, root): if plugin.get('id') == 'dynamic' and plugin.get('type') == 'runner': config_dict["dynamic"] = JobConfiguration.get_params(config, plugin) + handling_config_dict = ConfiguresHandlers.xml_to_dict(config, root.find("handlers")) + config_dict["handling"] = handling_config_dict + return config_dict @@ -234,6 +237,18 @@ def _configure_from_dict(self, job_config_dict): if "dynamic" in job_config_dict: self.dynamic_params = job_config_dict.get("dynamic", None) + # Parse handlers + handling_config_dict = job_config_dict.get("handling", {}) + self._init_handler_assignment_methods(handling_config_dict) + self._init_handlers(handling_config_dict) + if not self.handler_assignment_methods_configured: + self._set_default_handler_assignment_methods() + else: + self.app.application_stack.init_job_handling(self) + log.info("Job handler assignment methods set to: %s", ', '.join(self.handler_assignment_methods)) + for tag, handlers in [(t, h) for t, h in self.handlers.items() if isinstance(h, list)]: + log.info("Tag [%s] handlers: %s", tag, ', '.join(handlers)) + def __parse_job_conf_xml(self, tree): """Loads the new-style job configuration from options in the job config file (by default, job_conf.xml). @@ -250,18 +265,6 @@ def __parse_job_conf_xml(self, tree): self._configure_from_dict(job_config_dict) - # Parse handlers - handlers_conf = root.find('handlers') - self._init_handler_assignment_methods(handlers_conf) - self._init_handlers(handlers_conf) - if not self.handler_assignment_methods_configured: - self._set_default_handler_assignment_methods() - else: - self.app.application_stack.init_job_handling(self) - log.info("Job handler assignment methods set to: %s", ', '.join(self.handler_assignment_methods)) - for tag, handlers in [(t, h) for t, h in self.handlers.items() if isinstance(h, list)]: - log.info("Tag [%s] handlers: %s", tag, ', '.join(handlers)) - # Parse destinations destinations = root.find('destinations') job_metrics = self.app.job_metrics @@ -366,11 +369,11 @@ def __parse_job_conf_xml(self, tree): log.debug('Done loading job configuration') - def _parse_handler(self, handler_id, handler_element): - for plugin in handler_element.findall('plugin'): + def _parse_handler(self, handler_id, process_dict): + for plugin_id in process_dict.get("plugins") or []: if handler_id not in self.handler_runner_plugins: self.handler_runner_plugins[handler_id] = [] - self.handler_runner_plugins[handler_id].append(plugin.get('id')) + self.handler_runner_plugins[handler_id].append(plugin_id) def __set_default_job_conf(self): # Run jobs locally diff --git a/lib/galaxy/web/stack/handlers.py b/lib/galaxy/web/stack/handlers.py index 124cf91d7fc6..215a19e41523 100644 --- a/lib/galaxy/web/stack/handlers.py +++ b/lib/galaxy/web/stack/handlers.py @@ -48,23 +48,58 @@ def add_handler(self, handler_id, tags): else: self.handlers[tag] = [handler_id] - def _init_handlers(self, config_element): + @staticmethod + def xml_to_dict(config, config_element): + handling_config_dict = {} + + processes = {} + handling_config_dict["processes"] = processes + # Parse handlers if config_element is not None: - for handler in self._findall_with_required(config_element, 'handler'): + for handler in ConfiguresHandlers._findall_with_required(config_element, 'handler'): handler_id = handler.get('id') - if handler_id in self.handlers: + if handler_id in processes: log.error("Handler '%s' overlaps handler with the same name, ignoring", handler_id) else: log.debug("Read definition for handler '%s'", handler_id) - self._parse_handler(handler_id, handler) - self.add_handler( - handler_id, - [x.strip() for x in handler.get('tags', self.DEFAULT_HANDLER_TAG).split(',')] - ) - self.default_handler_id = self._get_default(self.app.config, config_element, list(self.handlers.keys())) - - def _init_handler_assignment_methods(self, config_element=None): + plugins = [] + for plugin in ConfiguresHandlers._findall_with_required(handler, 'plugin', ['id']): + plugins.append(plugin.get("id")) + tags = [x.strip() for x in handler.get('tags', ConfiguresHandlers.DEFAULT_HANDLER_TAG).split(',')] + handler_def = {"tags": tags} + if plugins: + handler_def["plugins"] = plugins + processes[handler_id] = handler_def + default_handler = ConfiguresHandlers.get_xml_default(config, config_element) + if default_handler: + handling_config_dict["default"] = default_handler + + assign = listify(config_element.attrib.get('assign_with', []), do_strip=True) + if len(assign) > 0: + handling_config_dict["assign"] = assign + max_grap_str = config_element.attrib.get('max_grab', None) + if max_grap_str: + handling_config_dict["max_grab"] = int(max_grap_str) + + return handling_config_dict + + def _init_handlers(self, handling_config_dict=None): + handling_config_dict = handling_config_dict or {} + for handler_id, process in handling_config_dict.get("processes", {}).items(): + process = process or {} + if handler_id in self.handlers: + log.error("Handler '%s' overlaps handler with the same name, ignoring", handler_id) + else: + log.debug("Read definition for handler '%s'", handler_id) + self._parse_handler(handler_id, process) + self.add_handler(handler_id, process.get("tags") or [self.DEFAULT_HANDLER_TAG]) + + self.default_handler_id = self._ensure_default_set(handling_config_dict.get("default"), self.handlers.keys()) + + def _init_handler_assignment_methods(self, handling_config_dict=None): + handling_config_dict = handling_config_dict or {} + self.__is_handler = None # This is set by the stack job handler init code self.pool_for_tag = {} @@ -76,8 +111,8 @@ def _init_handler_assignment_methods(self, config_element=None): HANDLER_ASSIGNMENT_METHODS.DB_SKIP_LOCKED: self._assign_db_tag, HANDLER_ASSIGNMENT_METHODS.UWSGI_MULE_MESSAGE: self._assign_uwsgi_mule_message_handler, } - if config_element is not None: - for method in listify(config_element.attrib.get('assign_with', []), do_strip=True): + if handling_config_dict: + for method in handling_config_dict.get("assign", []): method = method.lower() assert method in HANDLER_ASSIGNMENT_METHODS, \ "Invalid job handler assignment method '%s', must be one of: %s" % ( @@ -89,7 +124,7 @@ def _init_handler_assignment_methods(self, config_element=None): self.handler_assignment_methods = [method] if self.handler_assignment_methods == [HANDLER_ASSIGNMENT_METHODS.MEM_SELF]: self.app.config.track_jobs_in_database = False - self.handler_max_grab = int(config_element.attrib.get('max_grab', self.handler_max_grab)) + self.handler_max_grab = handling_config_dict.get('max_grab', self.handler_max_grab) def _set_default_handler_assignment_methods(self): if not self.handler_assignment_methods_configured: @@ -116,6 +151,17 @@ def _set_default_handler_assignment_methods(self): def _parse_handler(self, handler_id, handler_def): pass + @staticmethod + def get_xml_default(config, parent): + rval = parent.get('default') + if 'default_from_environ' in parent.attrib: + environ_var = parent.attrib['default_from_environ'] + rval = os.environ.get(environ_var, rval) + elif 'default_from_config' in parent.attrib: + config_val = parent.attrib['default_from_config'] + rval = config.config_dict.get(config_val, rval) + return rval + def _get_default(self, config, parent, names, auto=False): """ Returns the default attribute set in a parent tag like or @@ -131,26 +177,22 @@ def _get_default(self, config, parent, names, auto=False): :returns: str -- id or tag representing the default. """ + rval = ConfiguresHandlers.get_xml_default(config, parent) + return self._ensure_default_set(rval, names, auto=auto) - rval = parent.get('default') - if 'default_from_environ' in parent.attrib: - environ_var = parent.attrib['default_from_environ'] - rval = os.environ.get(environ_var, rval) - elif 'default_from_config' in parent.attrib: - config_val = parent.attrib['default_from_config'] - rval = config.config_dict.get(config_val, rval) - + def _ensure_default_set(self, rval, names, auto=False): if rval is not None: # If the parent element has a 'default' attribute, use the id or tag in that attribute if self.deterministic_handler_assignment and rval not in names: - raise Exception("<%s> default attribute '%s' does not match a defined id or tag in a child element" % (parent.tag, rval)) - log.debug("<%s> default set to child with id or tag '%s'" % (parent.tag, rval)) + raise Exception(" default to child with id '%s'" % (parent.tag, names[0])) + log.info("Setting default to child with id '%s'" % (names[0])) rval = names[0] return rval - def _findall_with_required(self, parent, match, attribs=None): + @staticmethod + def _findall_with_required(parent, match, attribs=None): """Like ``xml.etree.ElementTree.Element.findall()``, except only returns children that have the specified attribs. :param parent: Parent element in which to find. diff --git a/lib/galaxy/workflow/scheduling_manager.py b/lib/galaxy/workflow/scheduling_manager.py index f88e9b23e786..33e5cfa193b8 100644 --- a/lib/galaxy/workflow/scheduling_manager.py +++ b/lib/galaxy/workflow/scheduling_manager.py @@ -230,8 +230,9 @@ def __init_schedulers_for_element(self, plugins_element): def __init_handlers(self, config_element=None): assert not self.__handlers_configured - self._init_handler_assignment_methods(config_element) - self._init_handlers(config_element) + handling_config_dict = ConfiguresHandlers.xml_to_dict(self.app.config, config_element) + self._init_handler_assignment_methods(handling_config_dict) + self._init_handlers(handling_config_dict) if not self.handler_assignment_methods_configured: self._set_default_handler_assignment_methods() else: diff --git a/test/unit/jobs/test_job_configuration.py b/test/unit/jobs/test_job_configuration.py index 8886e538709a..19ec3fb6eb2e 100644 --- a/test/unit/jobs/test_job_configuration.py +++ b/test/unit/jobs/test_job_configuration.py @@ -92,7 +92,7 @@ def test_implict_uwsgi_mule_message_handler_assign(self): def test_implict_uwsgi_mule_message_handler_assign_with_explicit_handlers(self): self.__with_handlers_config(handlers=[{'id': 'handler0'}, {'id': 'handler1'}]) self.__with_uwsgi_application_stack(mule='lib/galaxy/main.py', farm='job-handlers:1') - assert self.job_config.handler_assignment_methods == ['uwsgi-mule-message', 'db-preassign'] + assert self.job_config.handler_assignment_methods == ['uwsgi-mule-message', 'db-preassign'], self.job_config.handler_assignment_methods assert self.job_config.default_handler_id is None assert self.job_config.handlers['_default_'] == ['handler0', 'handler1', 'main.job-handlers.1'] From 1706e7a7a6d201e2ec181e8631ea9f1eb652f8e2 Mon Sep 17 00:00:00 2001 From: John Chilton Date: Tue, 30 Apr 2019 10:40:20 -0400 Subject: [PATCH 3/9] Refactor job_conf -> dictifiable: -> execution: --- lib/galaxy/jobs/__init__.py | 157 +++++++++++++++++++++---------- lib/galaxy/web/stack/handlers.py | 2 +- 2 files changed, 110 insertions(+), 49 deletions(-) diff --git a/lib/galaxy/jobs/__init__.py b/lib/galaxy/jobs/__init__.py index 72ecbeb0d831..8be16e53f9e2 100644 --- a/lib/galaxy/jobs/__init__.py +++ b/lib/galaxy/jobs/__init__.py @@ -80,10 +80,6 @@ def __init__(self, **kwds): super(JobDestination, self).__init__(**kwds) - # Store tags as a list - if self.tags is not None: - self['tags'] = [x.strip() for x in self.tags.split(',')] - class JobToolConfiguration(Bunch): """ @@ -141,6 +137,57 @@ def job_config_xml_to_dict(config, root): handling_config_dict = ConfiguresHandlers.xml_to_dict(config, root.find("handlers")) config_dict["handling"] = handling_config_dict + # Parse destinations + environments = [] + + destinations = root.find('destinations') + for destination in ConfiguresHandlers._findall_with_required(destinations, 'destination', ('id', 'runner')): + destination_id = destination.get('id') + destination_metrics = destination.get("metrics", None) + + environment = {"id": destination_id} + + metrics_to_dict = {"src": "default"} + if destination_metrics: + if not util.asbool(destination_metrics): + metrics_to_dict = {"src": "disabled"} + else: + metrics_to_dict = {"src": "path", "path": destination_metrics} + else: + metrics_elements = ConfiguresHandlers._findall_with_required(destination, 'job_metrics', ()) + if metrics_elements: + metrics_to_dict = {"src": "xml_element", 'xml_element': metrics_elements[0]} + + environment["metrics"] = metrics_to_dict + + params = JobConfiguration.get_params(config, destination) + # TODO: handle enabled/disabled in configure_from + environment['params'] = params + environment['env'] = JobConfiguration.get_envs(destination) + destination_resubmits = JobConfiguration.get_resubmits(destination) + if destination_resubmits: + environment['resubmit'] = destination_resubmits + # TODO: handle empty resubmits defaults in configure_from + + runner = destination.get('runner') + if runner: + environment['runner'] = runner + + tags = destination.get('tags') + # Store tags as a list + if tags is not None: + tags = [x.strip() for x in tags.split(',')] + environment['tags'] = tags + + environments.append(environment) + + config_dict['execution'] = { + 'environments': environments, + } + default_destination = ConfiguresHandlers.get_xml_default(config, destinations) + if default_destination: + config_dict['execution']['default'] = default_destination + return config_dict @@ -249,6 +296,60 @@ def _configure_from_dict(self, job_config_dict): for tag, handlers in [(t, h) for t, h in self.handlers.items() if isinstance(h, list)]: log.info("Tag [%s] handlers: %s", tag, ', '.join(handlers)) + # Parse environments + job_metrics = self.app.job_metrics + execution_dict = job_config_dict.get('execution', {}) + environments = execution_dict.get("environments", []) + enviroment_iter = map(lambda e: (e["id"], e), environments) if isinstance(environments, list) else environments.items() + for environment_id, environment_dict in enviroment_iter: + metrics = environment_dict.get("metrics") or {"src": "default"} + metrics_src = metrics.get("src") or "default" + if metrics_src != "default": + # customized metrics for this environment. + if metrics_src == "disabled": + job_metrics.set_destination_instrumenter(environment_id, None) + elif metrics_src == "xml_element": + metrics_element = metrics.get("xml_element") + job_metrics.set_destination_conf_element(environment_id, metrics_element) + elif metrics_src == "path": + metrics_conf_path = self.app.config.resolve_path(metrics.get("path")) + job_metrics.set_destination_conf_file(environment_id, metrics_conf_path) + + destination_kwds = {} + + params = environment_dict.get("params") + if params is None: + # Treat the excess keys in the environment as the destination parameters + # allowing a flat configuration of these things. + params = {} + for key, value in environment_dict.items(): + if key in ['id', 'tags', 'runner', 'shell', 'env', 'resubmit']: + continue + params[key] = value + environment_dict["params"] = params + + for key in ['tags', 'runner', 'shell', 'env', 'resubmit', 'params']: + if key in environment_dict: + destination_kwds[key] = environment_dict[key] + destination_kwds["id"] = environment_id + job_destination = JobDestination(**destination_kwds) + if not self.__is_enabled(job_destination.params): + continue + + if not job_destination.resubmit: + resubmits = self.default_resubmits + job_destination.resubmit = resubmits + + self.destinations[environment_id] = (job_destination,) + if job_destination.tags is not None: + for tag in job_destination.tags: + if tag not in self.destinations: + self.destinations[tag] = [] + self.destinations[tag].append(job_destination) + + # Determine the default destination + self.default_destination_id = self._ensure_default_set(execution_dict.get("default"), list(self.destinations.keys()), auto=True) + def __parse_job_conf_xml(self, tree): """Loads the new-style job configuration from options in the job config file (by default, job_conf.xml). @@ -265,48 +366,6 @@ def __parse_job_conf_xml(self, tree): self._configure_from_dict(job_config_dict) - # Parse destinations - destinations = root.find('destinations') - job_metrics = self.app.job_metrics - for destination in self._findall_with_required(destinations, 'destination', ('id', 'runner')): - id = destination.get('id') - destination_metrics = destination.get("metrics", None) - if destination_metrics: - if not util.asbool(destination_metrics): - # disable - job_metrics.set_destination_instrumenter(id, None) - else: - metrics_conf_path = self.app.config.resolve_path(destination_metrics) - job_metrics.set_destination_conf_file(id, metrics_conf_path) - else: - metrics_elements = self._findall_with_required(destination, 'job_metrics', ()) - if metrics_elements: - job_metrics.set_destination_conf_element(id, metrics_elements[0]) - job_destination = JobDestination(**dict(destination.items())) - params = self.__get_params(destination) - if not self.__is_enabled(params): - continue - - job_destination['params'] = params - job_destination['env'] = self.__get_envs(destination) - destination_resubmits = self.__get_resubmits(destination) - if destination_resubmits: - resubmits = destination_resubmits - else: - resubmits = self.default_resubmits - job_destination["resubmit"] = resubmits - - self.destinations[id] = (job_destination,) - if job_destination.tags is not None: - for tag in job_destination.tags: - if tag not in self.destinations: - self.destinations[tag] = [] - self.destinations[tag].append(job_destination) - - # Determine the default destination - self.default_destination_id = self._get_default( - self.app.config, destinations, list(self.destinations.keys()), auto=True) - # Parse resources... resources = root.find('resources') if resources is not None: @@ -453,7 +512,8 @@ def __get_params(self, parent): """ return JobConfiguration.get_params(self.app.config, parent) - def __get_envs(self, parent): + @staticmethod + def get_envs(parent): """Parses any child tags in to a dictionary suitable for persistence. :param parent: Parent element in which to find child tags. @@ -472,7 +532,8 @@ def __get_envs(self, parent): )) return rval - def __get_resubmits(self, parent): + @staticmethod + def get_resubmits(parent): """Parses any child tags in to a dictionary suitable for persistence. :param parent: Parent element in which to find child tags. diff --git a/lib/galaxy/web/stack/handlers.py b/lib/galaxy/web/stack/handlers.py index 215a19e41523..1290923427dd 100644 --- a/lib/galaxy/web/stack/handlers.py +++ b/lib/galaxy/web/stack/handlers.py @@ -184,7 +184,7 @@ def _ensure_default_set(self, rval, names, auto=False): if rval is not None: # If the parent element has a 'default' attribute, use the id or tag in that attribute if self.deterministic_handler_assignment and rval not in names: - raise Exception(" Date: Tue, 30 Apr 2019 13:47:06 -0400 Subject: [PATCH 4/9] Refactor job_conf -> dictifiable: -> resources: --- lib/galaxy/jobs/__init__.py | 35 +++++++++++++++++++++++++---------- 1 file changed, 25 insertions(+), 10 deletions(-) diff --git a/lib/galaxy/jobs/__init__.py b/lib/galaxy/jobs/__init__.py index 8be16e53f9e2..1b2cd79fd23d 100644 --- a/lib/galaxy/jobs/__init__.py +++ b/lib/galaxy/jobs/__init__.py @@ -188,6 +188,25 @@ def job_config_xml_to_dict(config, root): if default_destination: config_dict['execution']['default'] = default_destination + resources_config_dict = {} + resource_groups = {} + + # Parse resources... + resources = root.find('resources') + if resources is not None: + default_resource_group = resources.get("default", None) + if default_resource_group: + resources_config_dict["default"] = default_resource_group + + for group in ConfiguresHandlers._findall_with_required(resources, 'group'): + group_id = group.get('id') + fields_str = group.get('fields', None) or group.text or '' + fields = [f for f in fields_str.split(",") if f] + resource_groups[group_id] = fields + + resources_config_dict["groups"] = resource_groups + config_dict["resources"] = resources_config_dict + return config_dict @@ -350,6 +369,12 @@ def _configure_from_dict(self, job_config_dict): # Determine the default destination self.default_destination_id = self._ensure_default_set(execution_dict.get("default"), list(self.destinations.keys()), auto=True) + # Read in resources + resources = job_config_dict.get("resources", {}) + self.default_resource_group = resources.get("default", None) + for group_id, fields in resources.get("groups", {}).items(): + self.resource_groups[group_id] = fields + def __parse_job_conf_xml(self, tree): """Loads the new-style job configuration from options in the job config file (by default, job_conf.xml). @@ -366,16 +391,6 @@ def __parse_job_conf_xml(self, tree): self._configure_from_dict(job_config_dict) - # Parse resources... - resources = root.find('resources') - if resources is not None: - self.default_resource_group = resources.get("default", None) - for group in self._findall_with_required(resources, 'group'): - id = group.get('id') - fields_str = group.get('fields', None) or group.text or '' - fields = [f for f in fields_str.split(",") if f] - self.resource_groups[id] = fields - # Parse tool mappings tools = root.find('tools') if tools is not None: From f3c0ffd29fab1eb9c8cb8ae4365bedfac87f9a7f Mon Sep 17 00:00:00 2001 From: John Chilton Date: Tue, 30 Apr 2019 14:06:03 -0400 Subject: [PATCH 5/9] Refactor job_conf -> dictifiable: -> tools: --- lib/galaxy/jobs/__init__.py | 44 +++++++++++++++++++++++++++---------- 1 file changed, 33 insertions(+), 11 deletions(-) diff --git a/lib/galaxy/jobs/__init__.py b/lib/galaxy/jobs/__init__.py index 1b2cd79fd23d..f9e1a0e88814 100644 --- a/lib/galaxy/jobs/__init__.py +++ b/lib/galaxy/jobs/__init__.py @@ -207,6 +207,22 @@ def job_config_xml_to_dict(config, root): resources_config_dict["groups"] = resource_groups config_dict["resources"] = resources_config_dict + # Parse tool mappings + tools = root.find('tools') + config_dict['tools'] = [] + if tools is not None: + for tool in ConfiguresHandlers._findall_with_required(tools, 'tool'): + # There can be multiple definitions with identical ids, but different params + tool_mapping_conf = {} + for key in ['handler', 'destination', 'id', 'resources']: + value = tool.get(key) + if value: + if key == "destination": + key = "environment" + tool_mapping_conf[key] = value + tool_mapping_conf["params"] = JobConfiguration.get_params(config, tool) + config_dict['tools'].append(tool_mapping_conf) + return config_dict @@ -375,6 +391,23 @@ def _configure_from_dict(self, job_config_dict): for group_id, fields in resources.get("groups", {}).items(): self.resource_groups[group_id] = fields + tools = job_config_dict.get('tools', []) + for tool in tools: + tool_id = tool.get('id').lower().rstrip('/') + if id not in self.tools: + self.tools[tool_id] = list() + params = tool.get("params") + if params is None: + params = {} + for key, value in tool.items(): + if key in ["environment", "handler", "id"]: + continue + params[key] = value + tool["params"] = params + if "environment" in tool: + tool["destination"] = tool.pop("environment") + self.tools[tool_id].append(JobToolConfiguration(**dict(tool.items()))) + def __parse_job_conf_xml(self, tree): """Loads the new-style job configuration from options in the job config file (by default, job_conf.xml). @@ -391,17 +424,6 @@ def __parse_job_conf_xml(self, tree): self._configure_from_dict(job_config_dict) - # Parse tool mappings - tools = root.find('tools') - if tools is not None: - for tool in self._findall_with_required(tools, 'tool'): - # There can be multiple definitions with identical ids, but different params - id = tool.get('id').lower().rstrip('/') - if id not in self.tools: - self.tools[id] = list() - self.tools[id].append(JobToolConfiguration(**dict(tool.items()))) - self.tools[id][-1]['params'] = self.__get_params(tool) - types = dict(registered_user_concurrent_jobs=int, anonymous_user_concurrent_jobs=int, walltime=str, From 7ac163f68a4786f1b53e4941fb2fba7bd32424e6 Mon Sep 17 00:00:00 2001 From: John Chilton Date: Tue, 30 Apr 2019 14:25:23 -0400 Subject: [PATCH 6/9] Refactor job_conf -> dictifiable: -> limits: --- lib/galaxy/jobs/__init__.py | 91 ++++++++++++++++++++++--------------- 1 file changed, 54 insertions(+), 37 deletions(-) diff --git a/lib/galaxy/jobs/__init__.py b/lib/galaxy/jobs/__init__.py index f9e1a0e88814..de3fee93a311 100644 --- a/lib/galaxy/jobs/__init__.py +++ b/lib/galaxy/jobs/__init__.py @@ -223,6 +223,21 @@ def job_config_xml_to_dict(config, root): tool_mapping_conf["params"] = JobConfiguration.get_params(config, tool) config_dict['tools'].append(tool_mapping_conf) + limits_config = [] + limits = root.find('limits') + if limits is not None: + for limit in JobConfiguration._findall_with_required(limits, 'limit', ('type',)): + limit_dict = {} + for key in ['type', 'tag', 'id', 'window']: + if key == 'type' and key.startswith('destination_'): + key = 'environment_%s' % key[len("destination_"):] + value = limit.get(key) + if value: + limit_dict[key] = value + limit_dict['value'] = limit.text + limits_config.append(limit_dict) + + config_dict['limits'] = limits_config return config_dict @@ -394,7 +409,7 @@ def _configure_from_dict(self, job_config_dict): tools = job_config_dict.get('tools', []) for tool in tools: tool_id = tool.get('id').lower().rstrip('/') - if id not in self.tools: + if tool_id not in self.tools: self.tools[tool_id] = list() params = tool.get("params") if params is None: @@ -408,22 +423,6 @@ def _configure_from_dict(self, job_config_dict): tool["destination"] = tool.pop("environment") self.tools[tool_id].append(JobToolConfiguration(**dict(tool.items()))) - def __parse_job_conf_xml(self, tree): - """Loads the new-style job configuration from options in the job config file (by default, job_conf.xml). - - :param tree: Object representing the root ```` object in the job config file. - :type tree: ``xml.etree.ElementTree.Element`` - """ - root = tree.getroot() - log.debug('Loading job configuration from %s' % self.app.config.job_config_file) - - job_config_dict = job_config_xml_to_dict(self.app.config, root) - # Load tasks if configured - if self.app.config.use_tasked_jobs: - job_config_dict["runners"]["tasks"] = dict(id='tasks', load='tasks', workers=self.app.config.local_task_queue_workers, kwds={}) - - self._configure_from_dict(job_config_dict) - types = dict(registered_user_concurrent_jobs=int, anonymous_user_concurrent_jobs=int, walltime=str, @@ -431,26 +430,28 @@ def __parse_job_conf_xml(self, tree): output_size=util.size_to_bytes) # Parse job limits - limits = root.find('limits') - if limits is not None: - for limit in self._findall_with_required(limits, 'limit', ('type',)): - type = limit.get('type') - # concurrent_jobs renamed to destination_user_concurrent_jobs in job_conf.xml - if type in ('destination_user_concurrent_jobs', 'concurrent_jobs', 'destination_total_concurrent_jobs'): - id = limit.get('tag', None) or limit.get('id') - if type == 'destination_total_concurrent_jobs': - self.limits.destination_total_concurrent_jobs[id] = int(limit.text) - else: - self.limits.destination_user_concurrent_jobs[id] = int(limit.text) - elif type == 'total_walltime': - self.limits.total_walltime["window"] = ( - int(limit.get('window')) or 30 - ) - self.limits.total_walltime["raw"] = ( - types.get(type, str)(limit.text) - ) - elif limit.text: - self.limits.__dict__[type] = types.get(type, str)(limit.text) + for limit_dict in job_config_dict.get("limits", []): + limit_type = limit_dict.get('type') + if limit_type.startswith("environment_"): + limit_type = 'destination_%s' % limit_type[len("environment_"):] + + limit_value = limit_dict.get("value") + # concurrent_jobs renamed to destination_user_concurrent_jobs in job_conf.xml + if limit_type in ('destination_user_concurrent_jobs', 'concurrent_jobs', 'destination_total_concurrent_jobs'): + id = limit_dict.get('tag', None) or limit_dict.get('id') + if limit_type == 'destination_total_concurrent_jobs': + self.limits.destination_total_concurrent_jobs[id] = int(limit_value) + else: + self.limits.destination_user_concurrent_jobs[id] = int(limit_value) + elif limit_type == 'total_walltime': + self.limits.total_walltime["window"] = ( + int(limit_dict.get('window')) or 30 + ) + self.limits.total_walltime["raw"] = ( + types.get(limit_type, str)(limit_value) + ) + elif limit_value: + self.limits.__dict__[limit_type] = types.get(limit_type, str)(limit_value) if self.limits.walltime is not None: h, m, s = [int(v) for v in self.limits.walltime.split(':')] @@ -463,6 +464,22 @@ def __parse_job_conf_xml(self, tree): 0, s, 0, 0, m, h ) + def __parse_job_conf_xml(self, tree): + """Loads the new-style job configuration from options in the job config file (by default, job_conf.xml). + + :param tree: Object representing the root ```` object in the job config file. + :type tree: ``xml.etree.ElementTree.Element`` + """ + root = tree.getroot() + log.debug('Loading job configuration from %s' % self.app.config.job_config_file) + + job_config_dict = job_config_xml_to_dict(self.app.config, root) + # Load tasks if configured + if self.app.config.use_tasked_jobs: + job_config_dict["runners"]["tasks"] = dict(id='tasks', load='tasks', workers=self.app.config.local_task_queue_workers, kwds={}) + + self._configure_from_dict(job_config_dict) + log.debug('Done loading job configuration') def _parse_handler(self, handler_id, process_dict): From 048c97646a04fc532b3ec172833ae65f07f359c3 Mon Sep 17 00:00:00 2001 From: John Chilton Date: Tue, 30 Apr 2019 14:45:10 -0400 Subject: [PATCH 7/9] Refactor test_job_configuration to duplicate advanced job_conf testing. --- test/unit/jobs/test_job_configuration.py | 242 ++++++++++++----------- 1 file changed, 124 insertions(+), 118 deletions(-) diff --git a/test/unit/jobs/test_job_configuration.py b/test/unit/jobs/test_job_configuration.py index 19ec3fb6eb2e..33f6e716a6ad 100644 --- a/test/unit/jobs/test_job_configuration.py +++ b/test/unit/jobs/test_job_configuration.py @@ -18,7 +18,7 @@ HANDLER_TEMPLATE_JOB_CONF = os.path.join(os.path.dirname(__file__), "handler_template_job_conf.xml") -class JobConfXmlParserTestCase(unittest.TestCase): +class BaseJobConfXmlParserTestCase(unittest.TestCase): def setUp(self): self.temp_directory = tempfile.mkdtemp() @@ -31,16 +31,75 @@ def setUp(self): track_jobs_in_database=True, server_name="main", ) - self.__write_config_from(SIMPLE_JOB_CONF) - self.__app = None - self.__application_stack = None - self.__job_configuration = None - self.__job_configuration_base_pools = None - self.__uwsgi_opt = None + self._write_config_from(SIMPLE_JOB_CONF) + self._app = None + self._application_stack = None + self._job_configuration = None + self._job_configuration_base_pools = None + self._uwsgi_opt = None def tearDown(self): shutil.rmtree(self.temp_directory) + # TODO: Add job metrics parsing test. + + @property + def app(self): + if not self._app: + self._app = bunch.Bunch( + config=self.config, + job_metrics=MockJobMetrics(), + application_stack=self.application_stack + ) + return self._app + + @property + def application_stack(self): + if not self._application_stack: + self._application_stack = ApplicationStack() + return self._application_stack + + @property + def job_config(self): + if not self._job_configuration: + base_handler_pools = self._job_configuration_base_pools or JobConfiguration.DEFAULT_BASE_HANDLER_POOLS + mock_uwsgi = mock.Mock() + mock_uwsgi.mule_id = lambda: 1 + with mock.patch('galaxy.web.stack.uwsgi', mock_uwsgi), \ + mock.patch('galaxy.web.stack.uwsgi.opt', self._uwsgi_opt), \ + mock.patch('galaxy.jobs.JobConfiguration.DEFAULT_BASE_HANDLER_POOLS', base_handler_pools): + self._job_configuration = JobConfiguration(self.app) + return self._job_configuration + + def _with_uwsgi_application_stack(self, **uwsgi_opt): + self._uwsgi_opt = uwsgi_opt + self._application_stack = UWSGIApplicationStack() + + def _with_handlers_config(self, assign_with=None, default=None, handlers=None, base_pools=None): + handlers = handlers or [] + template = { + 'assign_with': ' assign_with="%s"' % assign_with if assign_with is not None else '', + 'default': ' default="%s"' % default if default is not None else '', + 'handlers': '\n'.join([ + ''.format( + id=x['id'], + tags=' tags="%s"' % x['tags'] if 'tags' in x else '' + ) for x in handlers]), + } + self._job_configuration_base_pools = base_pools + self._write_config_from(HANDLER_TEMPLATE_JOB_CONF, template=template) + + def _write_config_from(self, path, template=None): + template = template or {} + self._write_config(open(path, "r").read().format(**template)) + + def _write_config(self, contents): + with open(os.path.join(self.temp_directory, "job_conf.%s" % self.extension), "w") as f: + f.write(contents) + + +class SimpleJobConfXmlParserTestCase(BaseJobConfXmlParserTestCase): + def test_load_simple_runner(self): runner_plugin = self.job_config.runner_plugins[0] assert runner_plugin["id"] == "local" @@ -58,14 +117,14 @@ def test_configuration_of_tasks(self): assert task_runners[0]["workers"] == 5 def test_explicit_handler_default(self): - self.__with_handlers_config( + self._with_handlers_config( handlers=[{'id': 'handler0', 'tags': 'handlers'}, {'id': 'handler1', 'tags': 'handlers'}], default='handlers' ) assert self.job_config.default_handler_id == "handlers" def test_handler_tag_parsing(self): - self.__with_handlers_config( + self._with_handlers_config( handlers=[{'id': 'handler0', 'tags': 'handlers'}, {'id': 'handler1', 'tags': 'handlers'}], default='handlers' ) @@ -78,72 +137,72 @@ def test_implict_db_self_handler_assign(self): assert self.job_config.handlers == {} def test_implicit_db_assign_handler_assign_with_explicit_handlers(self): - self.__with_handlers_config(handlers=[{'id': 'handler0'}, {'id': 'handler1'}]) + self._with_handlers_config(handlers=[{'id': 'handler0'}, {'id': 'handler1'}]) assert self.job_config.handler_assignment_methods == ['db-preassign'] assert self.job_config.default_handler_id is None assert self.job_config.handlers['_default_'] == ['handler0', 'handler1'] def test_implict_uwsgi_mule_message_handler_assign(self): - self.__with_uwsgi_application_stack(mule='lib/galaxy/main.py', farm='job-handlers:1') + self._with_uwsgi_application_stack(mule='lib/galaxy/main.py', farm='job-handlers:1') assert self.job_config.handler_assignment_methods == ['uwsgi-mule-message'] assert self.job_config.default_handler_id is None assert self.job_config.handlers['_default_'] == ['main.job-handlers.1'] def test_implict_uwsgi_mule_message_handler_assign_with_explicit_handlers(self): - self.__with_handlers_config(handlers=[{'id': 'handler0'}, {'id': 'handler1'}]) - self.__with_uwsgi_application_stack(mule='lib/galaxy/main.py', farm='job-handlers:1') + self._with_handlers_config(handlers=[{'id': 'handler0'}, {'id': 'handler1'}]) + self._with_uwsgi_application_stack(mule='lib/galaxy/main.py', farm='job-handlers:1') assert self.job_config.handler_assignment_methods == ['uwsgi-mule-message', 'db-preassign'], self.job_config.handler_assignment_methods assert self.job_config.default_handler_id is None assert self.job_config.handlers['_default_'] == ['handler0', 'handler1', 'main.job-handlers.1'] def test_explicit_mem_self_handler_assign(self): - self.__with_handlers_config(assign_with='mem-self') + self._with_handlers_config(assign_with='mem-self') assert self.job_config.handler_assignment_methods == ['mem-self'] assert self.job_config.default_handler_id is None assert self.job_config.handlers == {} assert not self.config.track_jobs_in_database def test_explicit_mem_self_handler_assign_with_handlers(self): - self.__with_handlers_config(assign_with='mem-self', handlers=[{'id': 'handler0'}]) + self._with_handlers_config(assign_with='mem-self', handlers=[{'id': 'handler0'}]) assert self.job_config.handler_assignment_methods == ['mem-self'] assert self.job_config.default_handler_id is None assert self.job_config.handlers['_default_'] == ['handler0'] def test_explicit_db_preassign_handler_assign_with_uwsgi(self): - self.__with_handlers_config(assign_with='db-preassign', handlers=[{'id': 'handler0'}]) - self.__with_uwsgi_application_stack(mule='lib/galaxy/main.py', farm='job-handlers:1') + self._with_handlers_config(assign_with='db-preassign', handlers=[{'id': 'handler0'}]) + self._with_uwsgi_application_stack(mule='lib/galaxy/main.py', farm='job-handlers:1') assert self.job_config.handler_assignment_methods == ['db-preassign'] assert self.job_config.default_handler_id is None assert self.job_config.handlers['_default_'] == ['handler0', 'main.job-handlers.1'] def test_explicit_db_transaction_isolation_handler_assign(self): - self.__with_handlers_config(assign_with='db-transaction-isolation') + self._with_handlers_config(assign_with='db-transaction-isolation') assert self.job_config.handler_assignment_methods == ['db-transaction-isolation'] assert self.job_config.default_handler_id is None assert self.job_config.handlers == {} def test_explicit_db_transaction_isolation_handler_assign_with_uwsgi(self): - self.__with_handlers_config(assign_with='db-transaction-isolation', handlers=[{'id': 'handler0'}]) - self.__with_uwsgi_application_stack(mule='lib/galaxy/main.py', farm='job-handlers:1') + self._with_handlers_config(assign_with='db-transaction-isolation', handlers=[{'id': 'handler0'}]) + self._with_uwsgi_application_stack(mule='lib/galaxy/main.py', farm='job-handlers:1') assert self.job_config.handler_assignment_methods == ['db-transaction-isolation'] assert self.job_config.default_handler_id is None assert self.job_config.handlers['_default_'] == ['handler0', 'main.job-handlers.1'] def test_explicit_db_skip_locked_handler_assign(self): - self.__with_handlers_config(assign_with='db-skip-locked') + self._with_handlers_config(assign_with='db-skip-locked') assert self.job_config.handler_assignment_methods == ['db-skip-locked'] assert self.job_config.default_handler_id is None assert self.job_config.handlers == {} def test_explicit_db_skip_locked_handler_assign_with_uwsgi(self): - self.__with_handlers_config(assign_with='db-skip-locked', handlers=[{'id': 'handler0'}]) - self.__with_uwsgi_application_stack(mule='lib/galaxy/main.py', farm='job-handlers:1') + self._with_handlers_config(assign_with='db-skip-locked', handlers=[{'id': 'handler0'}]) + self._with_uwsgi_application_stack(mule='lib/galaxy/main.py', farm='job-handlers:1') assert self.job_config.handler_assignment_methods == ['db-skip-locked'] assert self.job_config.default_handler_id is None assert self.job_config.handlers['_default_'] == ['handler0', 'main.job-handlers.1'] def test_uwsgi_farms_as_handler_tags(self): - self.__with_uwsgi_application_stack( + self._with_uwsgi_application_stack( mule=['lib/galaxy/main.py'] * 2, farm=['job-handlers:1', 'job-handlers.foo:2'] ) @@ -152,8 +211,8 @@ def test_uwsgi_farms_as_handler_tags(self): assert self.job_config.handlers['foo'] == ['main.job-handlers.foo.1'] def test_uwsgi_overlapping_pools(self): - self.__with_handlers_config(base_pools=('workflow-schedulers', 'job-handlers')) - self.__with_uwsgi_application_stack( + self._with_handlers_config(base_pools=('workflow-schedulers', 'job-handlers')) + self._with_uwsgi_application_stack( mule=['lib/galaxy/main.py'] * 3, farm=['job-handlers:1', 'workflow-schedulers:2', 'job-handlers.foo:3'] ) @@ -166,6 +225,44 @@ def test_load_simple_destination(self): assert local_dest.id == "local" assert local_dest.runner == "local" + def test_default_limits(self): + limits = self.job_config.limits + assert limits.registered_user_concurrent_jobs is None + assert limits.anonymous_user_concurrent_jobs is None + assert limits.walltime is None + assert limits.walltime_delta is None + assert limits.total_walltime == {} + assert limits.output_size is None + assert limits.destination_user_concurrent_jobs == {} + assert limits.destination_total_concurrent_jobs == {} + + def test_conditional_runners(self): + self._write_config_from(CONDITIONAL_RUNNER_JOB_CONF) + runner_ids = [r["id"] for r in self.job_config.runner_plugins] + assert "local2" in runner_ids + assert "local3" not in runner_ids + + assert "local2_dest" in self.job_config.destinations + assert "local3_dest" not in self.job_config.destinations + + def test_conditional_runners_from_environ(self): + self._write_config_from(CONDITIONAL_RUNNER_JOB_CONF) + os.environ["LOCAL2_ENABLED"] = "False" + os.environ["LOCAL3_ENABLED"] = "True" + try: + runner_ids = [r["id"] for r in self.job_config.runner_plugins] + assert "local2" not in runner_ids + assert "local3" in runner_ids + + assert "local2_dest" not in self.job_config.destinations + assert "local3_dest" in self.job_config.destinations + finally: + del os.environ["LOCAL2_ENABLED"] + del os.environ["LOCAL3_ENABLED"] + + +class AdvancedJobConfXmlParserTestCase(BaseJobConfXmlParserTestCase): + def test_load_destination_params(self): self.__with_advanced_config() pbs_dest = self.job_config.destinations["pbs_longjobs"][0] @@ -193,17 +290,6 @@ def test_load_tool_params(self): foo_tool = self.job_config.tools["foo"][0] assert foo_tool.params["source"] == "trackster" - def test_default_limits(self): - limits = self.job_config.limits - assert limits.registered_user_concurrent_jobs is None - assert limits.anonymous_user_concurrent_jobs is None - assert limits.walltime is None - assert limits.walltime_delta is None - assert limits.total_walltime == {} - assert limits.output_size is None - assert limits.destination_user_concurrent_jobs == {} - assert limits.destination_total_concurrent_jobs == {} - def test_limit_overrides(self): self.__with_advanced_config() limits = self.job_config.limits @@ -280,88 +366,8 @@ def test_macro_expansion(self): for name in ["foo_small", "foo_medium", "foo_large", "foo_longrunning"]: assert self.job_config.destinations[name] - def test_conditional_runners(self): - self.__write_config_from(CONDITIONAL_RUNNER_JOB_CONF) - runner_ids = [r["id"] for r in self.job_config.runner_plugins] - assert "local2" in runner_ids - assert "local3" not in runner_ids - - assert "local2_dest" in self.job_config.destinations - assert "local3_dest" not in self.job_config.destinations - - def test_conditional_runners_from_environ(self): - self.__write_config_from(CONDITIONAL_RUNNER_JOB_CONF) - os.environ["LOCAL2_ENABLED"] = "False" - os.environ["LOCAL3_ENABLED"] = "True" - try: - runner_ids = [r["id"] for r in self.job_config.runner_plugins] - assert "local2" not in runner_ids - assert "local3" in runner_ids - - assert "local2_dest" not in self.job_config.destinations - assert "local3_dest" in self.job_config.destinations - finally: - del os.environ["LOCAL2_ENABLED"] - del os.environ["LOCAL3_ENABLED"] - - # TODO: Add job metrics parsing test. - - @property - def app(self): - if not self.__app: - self.__app = bunch.Bunch( - config=self.config, - job_metrics=MockJobMetrics(), - application_stack=self.application_stack - ) - return self.__app - - @property - def application_stack(self): - if not self.__application_stack: - self.__application_stack = ApplicationStack() - return self.__application_stack - - @property - def job_config(self): - if not self.__job_configuration: - base_handler_pools = self.__job_configuration_base_pools or JobConfiguration.DEFAULT_BASE_HANDLER_POOLS - mock_uwsgi = mock.Mock() - mock_uwsgi.mule_id = lambda: 1 - with mock.patch('galaxy.web.stack.uwsgi', mock_uwsgi), \ - mock.patch('galaxy.web.stack.uwsgi.opt', self.__uwsgi_opt), \ - mock.patch('galaxy.jobs.JobConfiguration.DEFAULT_BASE_HANDLER_POOLS', base_handler_pools): - self.__job_configuration = JobConfiguration(self.app) - return self.__job_configuration - - def __with_uwsgi_application_stack(self, **uwsgi_opt): - self.__uwsgi_opt = uwsgi_opt - self.__application_stack = UWSGIApplicationStack() - def __with_advanced_config(self): - self.__write_config_from(ADVANCED_JOB_CONF) - - def __with_handlers_config(self, assign_with=None, default=None, handlers=None, base_pools=None): - handlers = handlers or [] - template = { - 'assign_with': ' assign_with="%s"' % assign_with if assign_with is not None else '', - 'default': ' default="%s"' % default if default is not None else '', - 'handlers': '\n'.join([ - ''.format( - id=x['id'], - tags=' tags="%s"' % x['tags'] if 'tags' in x else '' - ) for x in handlers]), - } - self.__job_configuration_base_pools = base_pools - self.__write_config_from(HANDLER_TEMPLATE_JOB_CONF, template=template) - - def __write_config_from(self, path, template=None): - template = template or {} - self.__write_config(open(path, "r").read().format(**template)) - - def __write_config(self, contents): - with open(os.path.join(self.temp_directory, "job_conf.xml"), "w") as f: - f.write(contents) + self._write_config_from(ADVANCED_JOB_CONF) class MockJobMetrics(object): From f81c41895b30b0b1b0d1a1f94e001b5207a37dea Mon Sep 17 00:00:00 2001 From: John Chilton Date: Wed, 1 May 2019 08:14:37 -0400 Subject: [PATCH 8/9] Allow YAML configuration of job_conf.xml. --- config/galaxy.yml.sample | 5 + doc/source/admin/galaxy_options.rst | 12 + lib/galaxy/dependencies/__init__.py | 54 +- lib/galaxy/jobs/__init__.py | 37 +- lib/galaxy/jobs/runners/pulsar.py | 3 +- .../jobs/runners/state_handlers/resubmit.py | 3 +- lib/galaxy/webapps/galaxy/config_schema.yml | 3 + .../webapps/galaxy/job_config_schema.yml | 116 ++++ test/integration/delay_job_conf.yml | 46 +- .../embedded_pulsar_metadata_job_conf.xml | 20 - .../embedded_pulsar_metadata_job_conf.yml | 19 + test/integration/io_injection_job_conf.xml | 13 - test/integration/io_injection_job_conf.yml | 11 + .../resubmission_default_job_conf.xml | 43 -- .../resubmission_default_job_conf.yml | 34 ++ .../resubmission_dynamic_job_conf.xml | 13 +- test/integration/resubmission_job_conf.xml | 101 --- test/integration/resubmission_job_conf.yml | 114 ++++ ...ubmission_job_resource_parameters_conf.xml | 2 +- test/integration/resubmission_rules/rules.py | 12 +- .../resubmission_small_memory_job_conf.xml | 6 +- test/integration/test_job_environments.py | 2 +- test/integration/test_job_recovery.py | 2 +- test/integration/test_job_resubmission.py | 28 +- .../test_pulsar_embedded_metadata.py | 2 +- test/unit/jobs/job_conf.sample_advanced.yml | 577 ++++++++++++++++++ test/unit/jobs/test_job_configuration.py | 81 ++- 27 files changed, 1063 insertions(+), 296 deletions(-) create mode 100644 lib/galaxy/webapps/galaxy/job_config_schema.yml delete mode 100644 test/integration/embedded_pulsar_metadata_job_conf.xml create mode 100644 test/integration/embedded_pulsar_metadata_job_conf.yml delete mode 100644 test/integration/io_injection_job_conf.xml create mode 100644 test/integration/io_injection_job_conf.yml delete mode 100644 test/integration/resubmission_default_job_conf.xml create mode 100644 test/integration/resubmission_default_job_conf.yml delete mode 100644 test/integration/resubmission_job_conf.xml create mode 100644 test/integration/resubmission_job_conf.yml create mode 100644 test/unit/jobs/job_conf.sample_advanced.yml diff --git a/config/galaxy.yml.sample b/config/galaxy.yml.sample index c911ebe358f0..9fba6eb3b707 100644 --- a/config/galaxy.yml.sample +++ b/config/galaxy.yml.sample @@ -1530,6 +1530,11 @@ galaxy: # configuration file. #job_config_file: config/job_conf.xml + # Description of job running configuration, can be embedded into + # Galaxy configuration or loaded from an additional file with the + # job_config_file option. + #job_config: null + # When jobs fail due to job runner problems, Galaxy can be configured # to retry these or reroute the jobs to new destinations. Very fine # control of this is available with resubmit declarations in diff --git a/doc/source/admin/galaxy_options.rst b/doc/source/admin/galaxy_options.rst index 82791c59d71e..ede50c69ece9 100644 --- a/doc/source/admin/galaxy_options.rst +++ b/doc/source/admin/galaxy_options.rst @@ -3187,6 +3187,18 @@ :Type: str +~~~~~~~~~~~~~~ +``job_config`` +~~~~~~~~~~~~~~ + +:Description: + Description of job running configuration, can be embedded into + Galaxy configuration or loaded from an additional file with the + job_config_file option. +:Default: ``None`` +:Type: map + + ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ``default_job_resubmission_condition`` ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ diff --git a/lib/galaxy/dependencies/__init__.py b/lib/galaxy/dependencies/__init__.py index 06162cf7b968..274e1c69a80a 100644 --- a/lib/galaxy/dependencies/__init__.py +++ b/lib/galaxy/dependencies/__init__.py @@ -34,20 +34,46 @@ def __init__(self, config_file): def parse_configs(self): self.config = load_app_properties(config_file=self.config_file) - job_conf_xml = self.config.get( - "job_config_file", - join(dirname(self.config_file), 'job_conf.xml')) - try: - for plugin in ElementTree.parse(job_conf_xml).find('plugins').findall('plugin'): - if 'load' in plugin.attrib: - self.job_runners.append(plugin.attrib['load']) - except (OSError, IOError): - pass - try: - for plugin in ElementTree.parse(job_conf_xml).findall('.//destination/param[@id="rules_module"]'): - self.job_rule_modules.append(plugin.text) - except (OSError, IOError): - pass + + def load_job_config_dict(job_conf_dict): + for runner in job_conf_dict.get("runners"): + if "load" in runner: + self.job_runners.append(runner.get("load")) + if "rules_module" in runner: + self.job_rule_modules.append(plugin.text) + if "params" in runner: + runner_params = runner["params"] + if "rules_module" in runner_params: + self.job_rule_modules.append(plugin.text) + + if "job_config" in self.config: + load_job_config_dict(self.config.get("job_config")) + else: + job_conf_path = self.config.get( + "job_config_file", + join(dirname(self.config_file), 'job_conf.xml')) + if '.xml' in job_conf_path: + try: + try: + for plugin in ElementTree.parse(job_conf_path).find('plugins').findall('plugin'): + if 'load' in plugin.attrib: + self.job_runners.append(plugin.attrib['load']) + except (OSError, IOError): + pass + try: + for plugin in ElementTree.parse(job_conf_path).findall('.//destination/param[@id="rules_module"]'): + self.job_rule_modules.append(plugin.text) + except (OSError, IOError): + pass + except ElementTree.ParseError: + pass + else: + try: + job_conf_dict = yaml.safe_load(job_conf_path) + load_job_config_dict(job_conf_dict) + except (OSError, IOError): + pass + object_store_conf_xml = self.config.get( "object_store_config_file", join(dirname(self.config_file), 'object_store_conf.xml')) diff --git a/lib/galaxy/jobs/__init__.py b/lib/galaxy/jobs/__init__.py index de3fee93a311..302959436284 100644 --- a/lib/galaxy/jobs/__init__.py +++ b/lib/galaxy/jobs/__init__.py @@ -20,6 +20,7 @@ from xml.etree import ElementTree import six +import yaml from pulsar.client.staging import COMMAND_VERSION_FILENAME import galaxy @@ -291,7 +292,7 @@ def __init__(self, app): default_resubmit_condition = self.app.config.default_job_resubmission_condition if default_resubmit_condition: default_resubmits.append(dict( - destination=None, + environment=None, condition=default_resubmit_condition, handler=None, delay=None, @@ -300,10 +301,26 @@ def __init__(self, app): self.__parse_resource_parameters() # Initialize the config - job_config_file = self.app.config.job_config_file try: - tree = load(job_config_file) - self.__parse_job_conf_xml(tree) + if 'job_config' in self.app.config.config_dict: + job_config_dict = self.app.config.config_dict["job_config"] + else: + job_config_file = self.app.config.job_config_file + if '.xml' in job_config_file: + tree = load(job_config_file) + job_config_dict = self.__parse_job_conf_xml(tree) + else: + with open(job_config_file, "r") as f: + job_config_dict = yaml.safe_load(f) + + # Load tasks if configured + if self.app.config.use_tasked_jobs: + job_config_dict["runners"]["tasks"] = dict(id='tasks', load='tasks', workers=self.app.config.local_task_queue_workers, kwds={}) + + self._configure_from_dict(job_config_dict) + + log.debug('Done loading job configuration') + except IOError: log.warning('Job configuration "%s" does not exist, using default job configuration', self.app.config.job_config_file) @@ -474,13 +491,7 @@ def __parse_job_conf_xml(self, tree): log.debug('Loading job configuration from %s' % self.app.config.job_config_file) job_config_dict = job_config_xml_to_dict(self.app.config, root) - # Load tasks if configured - if self.app.config.use_tasked_jobs: - job_config_dict["runners"]["tasks"] = dict(id='tasks', load='tasks', workers=self.app.config.local_task_queue_workers, kwds={}) - - self._configure_from_dict(job_config_dict) - - log.debug('Done loading job configuration') + return job_config_dict def _parse_handler(self, handler_id, process_dict): for plugin_id in process_dict.get("plugins") or []: @@ -599,7 +610,7 @@ def get_resubmits(parent): for resubmit in parent.findall('resubmit'): rval.append(dict( condition=resubmit.get('condition'), - destination=resubmit.get('destination'), + environment=resubmit.get('destination'), handler=resubmit.get('handler'), delay=resubmit.get('delay'), )) @@ -753,7 +764,7 @@ def get_job_runner_plugins(self, handler_id): log.warning("Job runner classes must be subclassed from BaseJobRunner, %s has bases: %s" % (id, runner_class.__bases__)) continue try: - rval[id] = runner_class(self.app, runner['workers'], **runner.get('kwds', {})) + rval[id] = runner_class(self.app, runner.get('workers', JobConfiguration.DEFAULT_NWORKERS), **runner.get('kwds', {})) except TypeError: log.exception("Job runner '%s:%s' has not been converted to a new-style runner or encountered TypeError on load", module_name, class_name) diff --git a/lib/galaxy/jobs/runners/pulsar.py b/lib/galaxy/jobs/runners/pulsar.py index 0e0f5c89b80b..38c4634b4c5f 100644 --- a/lib/galaxy/jobs/runners/pulsar.py +++ b/lib/galaxy/jobs/runners/pulsar.py @@ -12,6 +12,7 @@ import packaging.version import pulsar.core +import six import yaml from pulsar.client import ( build_client_manager, @@ -449,7 +450,7 @@ def get_client_from_wrapper(self, job_wrapper): user = job_wrapper.get_job().user if user: for key, value in params.items(): - if value: + if value and isinstance(value, six.string_types): params[key] = model.User.expand_user_properties(user, value) env = getattr(job_wrapper.job_destination, "env", []) diff --git a/lib/galaxy/jobs/runners/state_handlers/resubmit.py b/lib/galaxy/jobs/runners/state_handlers/resubmit.py index f9cbd744f7cf..69f9761e4d65 100644 --- a/lib/galaxy/jobs/runners/state_handlers/resubmit.py +++ b/lib/galaxy/jobs/runners/state_handlers/resubmit.py @@ -97,7 +97,8 @@ def _handle_resubmit_definitions(resubmit_definitions, app, job_runner, job_stat else: job_log_prefix = "(%s)" % (job_state.job_wrapper.job_id) - destination = resubmit['destination'] + # Is destination needed here, might these be serialized to the database? + destination = resubmit.get('environment') or resubmit.get('destination') log.info("%s Job will be resubmitted to '%s' because %s at " "the '%s' destination", job_log_prefix, diff --git a/lib/galaxy/webapps/galaxy/config_schema.yml b/lib/galaxy/webapps/galaxy/config_schema.yml index 585b2be3c777..165992d241b4 100644 --- a/lib/galaxy/webapps/galaxy/config_schema.yml +++ b/lib/galaxy/webapps/galaxy/config_schema.yml @@ -2360,6 +2360,9 @@ mapping: the system on which Galaxy is started. Advanced job running capabilities can be configured through the job configuration file. + job_config: + !include job_config_schema.yml + default_job_resubmission_condition: type: str required: false diff --git a/lib/galaxy/webapps/galaxy/job_config_schema.yml b/lib/galaxy/webapps/galaxy/job_config_schema.yml new file mode 100644 index 000000000000..f025bc85be1d --- /dev/null +++ b/lib/galaxy/webapps/galaxy/job_config_schema.yml @@ -0,0 +1,116 @@ +desc: Description of job running configuration, can be embedded into Galaxy configuration + or loaded from an additional file with the job_config_file option. +type: map +mapping: + runners: + type: map + mapping: + dynamic: + type: map + mapping: + rules_module: + type: str + regex;(.+): + type: map + allowempty: true + mapping: + workers: + type: int + load: + type: str + enabled: + type: bool + handling: + type: map + allowempty: true + execution: + type: map + allowempty: true + mapping: + default: + type: str + environments: + type: map + mapping: + regex;(.+): + type: map + allowempty: true + mapping: + runner: + type: str + tags: + type: seq + sequence: + - type: str + shell: + type: str + env: + type: seq + sequence: + - type: map + mapping: + name: + type: str + raw: + type: bool + value: + type: str + file: + type: str + execute: + type: str + resubmit: + type: seq + sequence: + - type: map + mapping: + delay: + type: any # str or number + condition: + type: str + environment: + type: str + handler: + type: str + + tools: + type: seq + sequence: + - type: map + allowempty: true + mapping: + id: + type: str + handler: + type: str + environment: + type: str + resources: + type: str + resources: + type: map + mapping: + default: + type: str + groups: + type: map + mapping: + regex;(.+): + type: seq + sequence: + - type: str + limits: + type: seq + sequence: + - type: map + mapping: + type: + type: str + value: + type: any + window: + type: int + id: + type: str + tag: + type: str diff --git a/test/integration/delay_job_conf.yml b/test/integration/delay_job_conf.yml index cb36d4050792..7d3b14879678 100644 --- a/test/integration/delay_job_conf.yml +++ b/test/integration/delay_job_conf.yml @@ -1,29 +1,17 @@ - - - - - - - integration.delay_rules - - - - - - - python - delay - - - - - - - - - - - - +runners: + local: + load: galaxy.jobs.runners.local:LocalJobRunner + workers: 1 + dynamic: + rules_module: integration.delay_rules +execution: + environments: + local_dest: + runner: dynamic + type: python + function: delay + upload_dest: + runner: local +tools: + - id: upload1 + destination: upload_dest diff --git a/test/integration/embedded_pulsar_metadata_job_conf.xml b/test/integration/embedded_pulsar_metadata_job_conf.xml deleted file mode 100644 index 478f19787a0a..000000000000 --- a/test/integration/embedded_pulsar_metadata_job_conf.xml +++ /dev/null @@ -1,20 +0,0 @@ - - - - - - - - - - - - - true - copy - - - - - - diff --git a/test/integration/embedded_pulsar_metadata_job_conf.yml b/test/integration/embedded_pulsar_metadata_job_conf.yml new file mode 100644 index 000000000000..76520bf4ed5e --- /dev/null +++ b/test/integration/embedded_pulsar_metadata_job_conf.yml @@ -0,0 +1,19 @@ +runners: + local: + load: galaxy.jobs.runners.local:LocalJobRunner + pulsar_embed: + load: galaxy.jobs.runners.pulsar:PulsarEmbeddedJobRunner + +execution: + default: pulsar_embed + environments: + local: + runner: local + pulsar_embed: + runner: pulsar_embed + remote_metadata: true + default_file_action: copy + +tools: +- id: upload1 + environment: local diff --git a/test/integration/io_injection_job_conf.xml b/test/integration/io_injection_job_conf.xml deleted file mode 100644 index 4b8822777a63..000000000000 --- a/test/integration/io_injection_job_conf.xml +++ /dev/null @@ -1,13 +0,0 @@ - - - - - - - - - - - - - diff --git a/test/integration/io_injection_job_conf.yml b/test/integration/io_injection_job_conf.yml new file mode 100644 index 000000000000..5dc05647c89a --- /dev/null +++ b/test/integration/io_injection_job_conf.yml @@ -0,0 +1,11 @@ +runners: + local: + load: galaxy.jobs.runners.local:LocalJobRunner + +execution: + environments: + local_dest: + runner: local + env: + - execute: "echo 'moo std cow'" + - execute: "(>&2 echo 'moo err cow')" diff --git a/test/integration/resubmission_default_job_conf.xml b/test/integration/resubmission_default_job_conf.xml deleted file mode 100644 index a09e58e11af2..000000000000 --- a/test/integration/resubmission_default_job_conf.xml +++ /dev/null @@ -1,43 +0,0 @@ - - - - - - - - - integration.resubmission_rules - - - - - - - - - - python - initial_destination - - - - - - - - - - - - - - test_name,failure_state,initial_destination,run_for - - - - - - - diff --git a/test/integration/resubmission_default_job_conf.yml b/test/integration/resubmission_default_job_conf.yml new file mode 100644 index 000000000000..accb17eabd3d --- /dev/null +++ b/test/integration/resubmission_default_job_conf.yml @@ -0,0 +1,34 @@ +# Slimmed down resubmission_job_conf.xml for testing default resubmission rules. +runners: + local: + load: galaxy.jobs.runners.local:LocalJobRunner + workers: 1 + first_failure_runner: + load: integration.resubmission_runners:FailOnlyFirstJobRunner + workers: 1 + dynamic: + rules_module: integration.resubmission_rules + +execution: + default: initial_destination + environments: + initial_destination: + runner: dynamic + type: python + function: initial_destination + + fail_first_try: + runner: first_failure_runner + local: + runner: local + +resources: + default: test + groups: + upload: [] + test: [test_name,failure_state,initial_target_environment,run_for] + +tools: + - id: upload1 + environment: local + resources: upload diff --git a/test/integration/resubmission_dynamic_job_conf.xml b/test/integration/resubmission_dynamic_job_conf.xml index 8a25d200fcae..6221974fdc4d 100644 --- a/test/integration/resubmission_dynamic_job_conf.xml +++ b/test/integration/resubmission_dynamic_job_conf.xml @@ -1,21 +1,18 @@ - - + + integration.resubmission_rules - - - - python @@ -29,7 +26,7 @@ - + diff --git a/test/integration/resubmission_job_conf.xml b/test/integration/resubmission_job_conf.xml deleted file mode 100644 index 40f73acbc519..000000000000 --- a/test/integration/resubmission_job_conf.xml +++ /dev/null @@ -1,101 +0,0 @@ - - - - - - - - - - - integration.resubmission_rules - - - - - - python - initial_destination - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - retry_test_more_mem - - - - retry_test_more_walltime - - - - retry_unknown_error - - - - retry_after_delay - - - - retry_test_generic - - - - - - - - - - - - test_name,failure_state,initial_destination,run_for - - - - - - - diff --git a/test/integration/resubmission_job_conf.yml b/test/integration/resubmission_job_conf.yml new file mode 100644 index 000000000000..e55c236c718d --- /dev/null +++ b/test/integration/resubmission_job_conf.yml @@ -0,0 +1,114 @@ +runners: + local: + load: galaxy.jobs.runners.local:LocalJobRunner + workers: 1 + failure_runner: + load: integration.resubmission_runners:FailsJobRunner + workers: 1 + assertion_runner: + load: integration.resubmission_runners:AssertionJobRunner + workers: 1 + dynamic: + rules_module: integration.resubmission_rules + +execution: + default: initial_target_environment + environments: + initial_target_environment: + runner: dynamic + type: python + function: initial_target_environment + + fail_first_try: + runner: failure_runner + resubmit: + - condition: walltime_reached + environment: retry_test_more_walltime + - condition: memory_limit_reached + environment: retry_test_more_mem + - condition: unknown_error + environment: retry_unknown_error + + fail_first_if_memory_or_walltime: + runner: failure_runner + resubmit: + - condition: 'walltime_reached or memory_limit_reached' + environment: retry_test_generic + + fail_first_any_failure: + runner: failure_runner + resubmit: + - condition: any_failure + environment: retry_test_generic + + # This will fail twice and succeed on walltime reached and will fail twice and fail hard else. + fail_two_attempts: + runner: failure_runner + resubmit: + - condition: 'attempt < 3' + - condition: 'attempt == 3 and walltime_reached' + environment: retry_test_generic + + # Resubmit only jobs shorter than 5 seconds. + resubmit_if_short: + runner: failure_runner + resubmit: + - condition: 'seconds_running < 5' + environment: retry_test_generic + - condition: any_failure + environment: fails_without_resubmission + + # Resubmit after a delay. + resubmit_after_delay: + runner: failure_runner + resubmit: + - condition: any_failure + delay: 5 + environment: retry_after_delay + + # Resubmit after a couple delays. + resubmit_after_two_delays: + runner: failure_runner + resubmit: + # Delay 1.5 seconds and then 3 seconds, then finally send off too another runner. + - condition: 'attempt < 3' + delay: 'attempt * 1.5' + - condition: any_failure + environment: retry_after_delay + + fails_without_resubmission: + runner: failure_runner + + retry_test_more_mem: + runner: assertion_runner + dest_name: retry_test_more_mem + + retry_test_more_walltime: + runner: assertion_runner + dest_name: retry_test_more_walltime + + retry_unknown_error: + runner: assertion_runner + dest_name: retry_unknown_error + + retry_after_delay: + runner: assertion_runner + dest_name: retry_after_delay + + retry_test_generic: + runner: assertion_runner + dest_name: retry_test_generic + + local: + runner: local + +resources: + default: test + groups: + upload: [] + test: [test_name,failure_state,initial_target_environment,run_for] + +tools: + - id: upload1 + environment: local + resources: upload diff --git a/test/integration/resubmission_job_resource_parameters_conf.xml b/test/integration/resubmission_job_resource_parameters_conf.xml index 35f987781331..724c450d39e3 100644 --- a/test/integration/resubmission_job_resource_parameters_conf.xml +++ b/test/integration/resubmission_job_resource_parameters_conf.xml @@ -1,6 +1,6 @@ - + diff --git a/test/integration/resubmission_rules/rules.py b/test/integration/resubmission_rules/rules.py index 4d82e1d2d087..ec98b0e56d51 100644 --- a/test/integration/resubmission_rules/rules.py +++ b/test/integration/resubmission_rules/rules.py @@ -1,20 +1,20 @@ from galaxy.jobs import JobDestination -DEFAULT_INITIAL_DESTINATION = "fail_first_try" +DEFAULT_INITIAL_ENVIRONMENT = "fail_first_try" -def initial_destination(resource_params): - return resource_params.get("initial_destination", None) or DEFAULT_INITIAL_DESTINATION +def initial_target_environment(resource_params): + return resource_params.get("initial_target_environment", None) or DEFAULT_INITIAL_ENVIRONMENT def dynamic_resubmit_once(resource_params): - """Build destination that always fails first time and always re-routes to passing destination.""" + """Build environment that always fails first time and always re-routes to passing environment.""" job_destination = JobDestination() # Always fail on the first attempt. job_destination['runner'] = "failure_runner" - # Resubmit to a valid destination. + # Resubmit to a valid environment. job_destination['resubmit'] = [dict( condition="any_failure", - destination="local", + environment="local", )] return job_destination diff --git a/test/integration/resubmission_small_memory_job_conf.xml b/test/integration/resubmission_small_memory_job_conf.xml index a919ca586285..0c068bbef763 100644 --- a/test/integration/resubmission_small_memory_job_conf.xml +++ b/test/integration/resubmission_small_memory_job_conf.xml @@ -1,13 +1,9 @@ - + - - - - 4 diff --git a/test/integration/test_job_environments.py b/test/integration/test_job_environments.py index 6622ada24a28..b950627600b9 100644 --- a/test/integration/test_job_environments.py +++ b/test/integration/test_job_environments.py @@ -12,7 +12,7 @@ SCRIPT_DIRECTORY = os.path.abspath(os.path.dirname(__file__)) SIMPLE_JOB_CONFIG_FILE = os.path.join(SCRIPT_DIRECTORY, "simple_job_conf.xml") -IO_INJECTION_JOB_CONFIG_FILE = os.path.join(SCRIPT_DIRECTORY, "io_injection_job_conf.xml") +IO_INJECTION_JOB_CONFIG_FILE = os.path.join(SCRIPT_DIRECTORY, "io_injection_job_conf.yml") SETS_TMP_DIR_TO_TRUE_JOB_CONFIG = os.path.join(SCRIPT_DIRECTORY, "sets_tmp_dir_to_true_job_conf.xml") SETS_TMP_DIR_AS_EXPRESSION_JOB_CONFIG = os.path.join(SCRIPT_DIRECTORY, "sets_tmp_dir_expression_job_conf.xml") diff --git a/test/integration/test_job_recovery.py b/test/integration/test_job_recovery.py index 3bb54a540558..6d0d0db1641c 100644 --- a/test/integration/test_job_recovery.py +++ b/test/integration/test_job_recovery.py @@ -8,7 +8,7 @@ ) SCRIPT_DIRECTORY = os.path.abspath(os.path.dirname(__file__)) -DELAY_JOB_CONFIG_FILE = os.path.join(SCRIPT_DIRECTORY, "delay_job_conf.xml") +DELAY_JOB_CONFIG_FILE = os.path.join(SCRIPT_DIRECTORY, "delay_job_conf.yml") SIMPLE_JOB_CONFIG_FILE = os.path.join(SCRIPT_DIRECTORY, "simple_job_conf.xml") diff --git a/test/integration/test_job_resubmission.py b/test/integration/test_job_resubmission.py index 39b4bfe8c70d..0ec1ab7975f4 100644 --- a/test/integration/test_job_resubmission.py +++ b/test/integration/test_job_resubmission.py @@ -5,7 +5,7 @@ from base import integration_util SCRIPT_DIRECTORY = os.path.abspath(os.path.dirname(__file__)) -JOB_RESUBMISSION_JOB_CONFIG_FILE = os.path.join(SCRIPT_DIRECTORY, "resubmission_job_conf.xml") +JOB_RESUBMISSION_JOB_CONFIG_FILE = os.path.join(SCRIPT_DIRECTORY, "resubmission_job_conf.yml") JOB_RESUBMISSION_DEFAULT_JOB_CONFIG_FILE = os.path.join(SCRIPT_DIRECTORY, "resubmission_default_job_conf.xml") JOB_RESUBMISSION_DYNAMIC_JOB_CONFIG_FILE = os.path.join(SCRIPT_DIRECTORY, "resubmission_dynamic_job_conf.xml") JOB_RESUBMISSION_SMALL_MEMORY_JOB_CONFIG_FILE = os.path.join(SCRIPT_DIRECTORY, "resubmission_small_memory_job_conf.xml") @@ -48,12 +48,12 @@ def test_retry_tools_have_resource_params(self): assert "__job_resource" in input_names def test_job_resources(self): - """Test initial destination dynamic rule used by remaining re-submission test case works.""" - self._assert_job_passes(resource_parameters={"test_name": "test_job_resources", "initial_destination": "local"}) + """Test initial environment dynamic rule used by remaining re-submission test case works.""" + self._assert_job_passes(resource_parameters={"test_name": "test_job_resources", "initial_target_environment": "local"}) def test_failure_runner(self): """Test FailsJobRunner used by remaining re-submission test cases.""" - self._assert_job_fails(resource_parameters={"test_name": "test_failure_runner", "initial_destination": "fails_without_resubmission"}) + self._assert_job_fails(resource_parameters={"test_name": "test_failure_runner", "initial_target_environment": "fails_without_resubmission"}) def test_walltime_resubmission(self): self._assert_job_passes(resource_parameters={"test_name": "test_walltime_resubmission", "failure_state": "walltime_reached"}) @@ -66,46 +66,46 @@ def test_unknown_error(self): def test_condition_expressions(self): self._assert_job_passes(resource_parameters={"test_name": "test_condition_expressions_0", - "initial_destination": "fail_first_if_memory_or_walltime", + "initial_target_environment": "fail_first_if_memory_or_walltime", "failure_state": "memory_limit_reached"}) self._assert_job_passes(resource_parameters={"test_name": "test_condition_expressions_1", - "initial_destination": "fail_first_if_memory_or_walltime", + "initial_target_environment": "fail_first_if_memory_or_walltime", "failure_state": "walltime_reached"}) self._assert_job_fails(resource_parameters={"test_name": "test_condition_expressions_2", - "initial_destination": "fail_first_if_memory_or_walltime", + "initial_target_environment": "fail_first_if_memory_or_walltime", "failure_state": "unknown_error"}) def test_condition_any_failure(self): self._assert_job_passes(resource_parameters={"test_name": "test_condition_any_failure", - "initial_destination": "fail_first_any_failure", + "initial_target_environment": "fail_first_any_failure", "failure_state": "unknown_error"}) def test_condition_attempt(self): self._assert_job_fails(resource_parameters={"test_name": "test_condition_attempt", - "initial_destination": "fail_two_attempts", + "initial_target_environment": "fail_two_attempts", "failure_state": "unknown_error"}) self._assert_job_passes(resource_parameters={"test_name": "test_condition_attempt", - "initial_destination": "fail_two_attempts", + "initial_target_environment": "fail_two_attempts", "failure_state": "walltime_reached"}) def test_condition_seconds_running(self): self._assert_job_passes(resource_parameters={"test_name": "test_condition_seconds_running", - "initial_destination": "resubmit_if_short", + "initial_target_environment": "resubmit_if_short", "failure_state": "walltime_reached", "run_for": "1"}) self._assert_job_fails(resource_parameters={"test_name": "test_condition_seconds_running", - "initial_destination": "resubmit_if_short", + "initial_target_environment": "resubmit_if_short", "failure_state": "walltime_reached", "run_for": "15"}) def test_resubmission_after_delay(self): self._assert_job_passes(resource_parameters={"test_name": "test_resubmission_after_delay", - "initial_destination": "resubmit_after_delay", + "initial_target_environment": "resubmit_after_delay", "failure_state": "unknown_error"}) def test_resubmission_after_delay_expression(self): self._assert_job_passes(resource_parameters={"test_name": "test_resubmission_after_delay_expression", - "initial_destination": "resubmit_after_two_delays", + "initial_target_environment": "resubmit_after_two_delays", "failure_state": "unknown_error"}) diff --git a/test/integration/test_pulsar_embedded_metadata.py b/test/integration/test_pulsar_embedded_metadata.py index d5b5582dbcbd..5fcafe73dd9f 100644 --- a/test/integration/test_pulsar_embedded_metadata.py +++ b/test/integration/test_pulsar_embedded_metadata.py @@ -5,7 +5,7 @@ from base import integration_util SCRIPT_DIRECTORY = os.path.abspath(os.path.dirname(__file__)) -EMBEDDED_PULSAR_JOB_CONFIG_FILE = os.path.join(SCRIPT_DIRECTORY, "embedded_pulsar_metadata_job_conf.xml") +EMBEDDED_PULSAR_JOB_CONFIG_FILE = os.path.join(SCRIPT_DIRECTORY, "embedded_pulsar_metadata_job_conf.yml") class EmbeddedPulsarIntegrationInstance(integration_util.IntegrationInstance): diff --git a/test/unit/jobs/job_conf.sample_advanced.yml b/test/unit/jobs/job_conf.sample_advanced.yml new file mode 100644 index 000000000000..216aba9ffcae --- /dev/null +++ b/test/unit/jobs/job_conf.sample_advanced.yml @@ -0,0 +1,577 @@ +# TODO: translocate docs for all pulsar embedded +# TODO: translocate docs for all kubernetes +# TODO: implement and demo embedded job_metrics. +# TODO: translocate docker option docs into docker_local environment description +# TODO: translocate singluarity option docs into singularity_local environment description +# TODO: all destinations from condor on +runners: + local: + load: galaxy.jobs.runners.local:LocalJobRunner + workers: 4 + drmaa: + load: galaxy.jobs.runners.drmaa:DRMAAJobRunner + + # Different DRMs handle successfully completed jobs differently, + # these options can be changed to handle such differences and + # are explained in detail on the Galaxy wiki. Defaults are shown + invalidjobexception_state: ok + invalidjobexception_retries: 0 + internalexception_state: ok + internalexception_retries: 0 + sge: + load: galaxy.jobs.runners.drmaa:DRMAAJobRunner + # Override the $DRMAA_LIBRARY_PATH environment variable + drmaa_library_path: /sge/lib/libdrmaa.so + cli: + load: galaxy.jobs.runners.cli:ShellJobRunner + condor: + load: galaxy.jobs.runners.condor:CondorJobRunner + slurm: + load: galaxy.jobs.runners.slurm:SlurmJobRunner + dynamic: + # The dynamic runner is not a real job running plugin and is + # always loaded, so it does not need to be explicitly stated in + # runners. However, if you wish to change the base module + # containing your dynamic rules, you can do so. + # The `load` attribute is not required (and ignored if + # included). + rules_module: galaxy.jobs.rules + + godocker: + # Go-Docker is a batch computing/cluster management tool using Docker + # See https://bitbucket.org/osallou/go-docker for more details. + load: galaxy.jobs.runners.godocker:GodockerJobRunner + # Specify the instance of GoDocker + godocker_master: GODOCKER_URL + # GoDocker username + user: USERNAME + # GoDocker API key + key: APIKEY + # Specify the project present in the GoDocker setup + godocker_project: galaxy + + chronos: + # Chronos is a framework for the Apache Mesos software; a software which manages + # computer clusters. Specifically, Chronos runs of top of Mesos and it's used + # for job orchestration. + # + # This runner requires a shared file system where the directories of + # `job_working_directory`, `file_path` and `new_file_path` settings defined on + # the `galaxy.ini` file are shared amongst the Mesos agents (i.e. nodes which + # actually run the jobs). + load: galaxy.jobs.runners.chronos:ChronosJobRunner + # Hostname which runs Chronos instance. + chronos: '`chronos_host`' + # The email address of the person responsible for the job. + owner: foo@bar.com + # Username to access Mesos cluster. + username: username + # Password to access Mesos cluster. + password: password + # True to communicate with Chronos over HTTPS; false otherwise + insecure: true + + # Pulsar runners (see more at https://pulsar.readthedocs.io/) + pulsar_rest: + load: galaxy.jobs.runners.pulsar:PulsarRESTJobRunner + # Allow optimized HTTP calls with libcurl (defaults to urllib) + transport: curl + # Experimental Caching*: Undocumented, don't use. + #cache: false + + pulsar_mq: + load: galaxy.jobs.runners.pulsar:PulsarMQJobRunner + # AMQP URL to connect to. + amqp_url: amqp://guest:guest@localhost:5672// + # URL remote Pulsar apps should transfer files to this Galaxy + # instance to/from. This can be unspecified/empty if + # `galaxy_infrastructure_url1 is set in galaxy.yml. + galaxy_url: http://localhost:8080 + + # AMQP does not guarantee that a published message is received by + # the AMQP server, so Galaxy/Pulsar can request that the consumer + # acknowledge messages and will resend them if acknowledgement is + # not received after a configurable timeout. + #amqp_acknowledge: false + + # Galaxy reuses Pulsar's persistence_directory parameter (via the + # Pulsar client lib) to store a record of received + # acknowledgements, and to keep track of messages which have not + # been acknowledged. + #persistence_directory: /path/to/dir + + # Number of seconds to wait for an acknowledgement before + # republishing a message. + #amqp_republish_time: 30 + + # Pulsar job manager to communicate with (see Pulsar + # docs for information on job managers). + #manager: _default_ + + # The AMQP client can provide an SSL client certificate (e.g. for + # validation), the following options configure that certificate + # (see for reference: + # https://kombu.readthedocs.io/en/latest/reference/kombu.connection.html + # ). If you simply want to use SSL but not use/validate a client + # cert, just use the ?ssl=1 query on the amqp URL instead. + #amqp_connect_ssl_ca_certs: /path/to/cacert.pem + #amqp_connect_ssl_keyfile: /path/to/key.pem + #amqp_connect_ssl_certfile: /path/to/cert.pem + #amqp_connect_ssl_cert_reqs: cert_required + # By default, the AMQP consumer uses a nonblocking connection with + # a 0.2 second timeout. In testing, this works fine for + # unencrypted AMQP connections, but with SSL it will cause the + # client to reconnect to the server after each timeout. Set to a + # higher value (in seconds) (or `None` to use blocking connections). + #amqp_consumer_timeout: None + + pulsar_legacy: + # Pulsar job runner with default parameters matching those + # of old LWR job runner. If your Pulsar server is running on a + # Windows machine for instance this runner should still be used. + + # These destinations still needs to target a Pulsar server, + # older LWR plugins and destinations still work in Galaxy can + # target LWR servers, but this support should be considered + # deprecated and will disappear with a future release of Galaxy. + load: galaxy.jobs.runners.pulsar:PulsarLegacyJobRunner + +handling: + processes: + handler0: + handler1: + sge_handler: + # Restrict a handler to load specific runners, by default they will load all. + plugins: ['sge'] + special_handler0: + tags: [special_handlers] + special_handler1: + tags: [special_handlers] + + +execution: + environments: + local: + runner: local + + multicore_local: + runner: local + # Warning: Local slot count doesn't tie up additional worker threads, to prevent over + # allocating machine define a second local runner with different name and fewer workers + # to run this destination. + local_slots: 4 + # Embed metadata collection in local job script (defaults to true for most runners). + embed_metadata_in_job: true + + docker_local: + runner: local + docker_enabled: true + + singularity_local: + runner: local + singularity_enabled: true + + # The above Docker and Singularity examples describe how to specify + # default and override containers but fuller descriptions can be used + # also to tweak extra options. Like in the above examples, "container_override" + # will override the tool centric container resolution specified by the container + # resolvers configuration and "containers" will provide a default if no such + # container is found during resolution. + + # resolve_dependencies defaults to false, but can be set to true to use + # dependency resolution inside the container (you'll likely want to ensure + # Galaxy's tool dependency directory and/or Conda prefix is mounted in the + # container if this is set. shell (defaults to /bin/sh) can be used to tell + # Galaxy to use bash for instance in the target contanier. + + # If using these options, docker_enabled and/or singularity_enabled should + # also be set to true to enable the desired container technology. If multiple + # such containers are defined (as in the example below), the first one matching + # the enabled container types for this destination will be used. + customized_container: + runner: local + container: + - type: docker + shell: '/bin/sh' + resolve_dependencies: false + identifier: 'busybox:ubuntu-14.04' + - type: singularity + shell: '/bin/sh' + resolve_dependencies: false + identifier: '/path/to/default/container' + container_override: + - type: docker + shell: '/bin/sh' + resolve_dependencies: false + identifier: 'busybox:ubuntu-14.04' + - type: singularity + shell: '/bin/sh' + resolve_dependencies: false + identifier: '/path/to/default/container' + pbs: + runner: pbs + tags: [mycluster] + pbs_longjobs: + runner: pbs + tags: [mycluster, longjobs] + Resource_List: 'walltime=72:00:00' + remote_cluster: + runner: drmaa + tags: [longjobs] + # Set to False if cluster nodes don't shared Galaxy library, + # it will perform metadata calculation locally after the job finishes. + embed_metadata_in_job: true + # If jobs are configured to run as the real user, this option allows + # users that are not mapped to any real users to run jobs + # as a Galaxy (fallback). Default is false. + allow_guests: true + java_cluster: + runner: drmaa + env: + # set arbitrary environment variables at runtime. General + # dependencies for tools should be configured via + # tool_dependency_dir and package options and these + # options should be reserved for defining cluster + # specific options. + - name: '_JAVA_OPTIONS' + value: '-Xmx6G' + - name: ANOTHER_OPTION + raw: true # disable auto-quoting. + value: "'5'" + - file: /mnt/java_cluster/environment_setup.sh # script will be sourced + - execute: 'module load javastuff/2.10' + # files to source and exec statements will be handled on remote + # clusters. These don't need to be available on the Galaxy server + # itself. + + # Following three environments demonstrate setting up per-job temp directory handling. + # In these cases TEMP, TMP, and TMPDIR will be set for each job dispatched to these + # environments. + + # The first simply tells Galaxy to create a temp directory in the job directory, the + # other forms can be used to issue shell commands before the job runs on the worker node to + # allocate a temp directory. In these other cases, Galaxy will not clean up these + # directories so either use directories managed by the job resource manager or setup + # tooling to clean old temp directories up outside of Galaxy. + clean_tmp_by_job: + runner: drmma + tmp_dir: true + clean_tmp_drm: + runner: drmma + tmp_dir: $DRM_SET_VARIABLES_FOR_THIS_JOB + clean_tmp_fast_scratch: + runner: drmma + tmp_dir: '$(mktemp -d /mnt/scratch/fastest/gxyjobXXXXXXXXXXX)' + + real_user_cluster: + # The drmaa runner can be used to run jobs as the submitting user, + # make sure to setup 3 real user parameters in galaxy.yml. + runner: drmaa + + dynamic: + runner: dynamic + # A destination that represents a method in the dynamic runner. + # foo should be a Python function defined in any file in + # lib/galaxy/jobs/rules. + function: foo + dtd_destination: + runner: dynamic + # DTD is a special dynamic job destination type that builds up + # rules given a YAML-based DSL (see config/tool_destinations.yml.sample + # for the syntax). + type: dtd + load_balance: + runner: dynamic + # Randomly assign jobs to various static destination ids + type: choose_one + destination_ids: cluster1,cluster2,cluster3 + load_balance_with_data_locality: + runner: dynamic + # Randomly assign jobs to various static destination ids, + # but keep jobs in the same workflow invocation together and + # for those jobs ran outside of workflows keep jobs in same + # history together. + + type: choose_one + destination_ids: cluster1,cluster2,cluster3 + hash_by: workflow_invocation,history + burst_out: + runner: dynamic + # Burst out from static destination local_cluster_8_core to + # static destination shared_cluster_8_core when there are about + # 50 Galaxy jobs assigned to any of the local_cluster_XXX + # destinations (either running or queued). If there are fewer + # than 50 jobs, just use local_cluster_8_core destination. + + # Uncomment job_state parameter to make this bursting happen when + # roughly 50 jobs are queued instead. + type: burst + from_destination_ids: [local_cluster_8_core, local_cluster_1_core, local_cluster_16_core] + to_destination_id: shared_cluster_8_core + num_jobs: 50 + # job_states: queued + burst_if_queued: + runner: dynamic + # Dynamic destinations can be chained together to create more + # complex rules. In this example, the built-in burst stock rule + # determines whether to burst, and if so, directs to burst_if_size, + # a user-defined dynamic destination. This destination in turn will + # conditionally route it to a remote pulsar node if the input size + # is below a certain threshold, or route to local if not. + type: burst + from_destination_ids: local + to_destination_id: burst_if_size + num_jobs: 2 + job_states: queued + burst_if_size: + runner: dynamic + type: python + function: to_destination_if_size + # Also demonstrates a destination level override of the + # rules_module. This rules_module will take precedence over the + # plugin level rules module when resolving the dynamic function + rules_module: galaxycloudrunner.rules + max_size: 1g + to_destination_id: galaxycloudrunner + fallback_destination_id: local + galaxycloudrunner: + runner: dynamic + # Demonstrates how to use the galaxycloudrunner, which enables dynamic bursting + # to cloud destinations. For detailed information on how to use the galaxycloudrunner, + # consult the documentation at: https://galaxycloudrunner.readthedocs.io/ + type: python + function: cloudlaunch_pulsar_burst + rules_module: galaxycloudrunner.rules + cloudlaunch_api_endpoint: https://launch.usegalaxy.org/cloudlaunch/api/v1 + # Obtain your CloudLaunch token by visiting: https://launch.usegalaxy.org/profile + cloudlaunch_api_token: 37c46c89bcbea797bc7cd76fee10932d2c6a2389 + # id of the PulsarRESTJobRunner plugin. Defaults to "pulsar" + pulsar_runner_id: pulsar + # Destination to fallback to if no nodes are available + fallback_destination: local + # Pick next available server and resubmit if an unknown error occurs + # resubmit: {condition: 'unknown_error and attempt <= 3', environment: galaxycloudrunner} + + docker_dispatch: + runner: dynamic + # Follow dynamic destination type will send all tool's that + # support docker to static destination defined by + # docker_destination_id (docker_cluster in this example) and all + # other tools to default_destination_id (normal_cluster in this + # example). + type: docker_dispatch + docker_destination_id: docker_cluster + default_destination_id: normal_cluster + + # Pulsar enviornment examples + secure_pulsar_rest_dest: + runner: pulsar_rest + # URL of Pulsar server. + url: https://examle.com:8913/ + + # If set, private_token must match token in remote Pulsar's + # configuration. + private_token: 123456789changeme + + # Uncomment the following statement to disable file staging (e.g. + # if there is a shared file system between Galaxy and the Pulsar + # server). Alternatively action can be set to 'copy' - to replace + # http transfers with file system copies, 'remote_transfer' to cause + # the Pulsar to initiate HTTP transfers instead of Galaxy, or + # 'remote_copy' to cause Pulsar to initiate file system copies. + # If setting this to 'remote_transfer' be sure to specify a + # 'galaxy_url' attribute on the runner plugin above. --> + default_file_action: none + + # The above option is just the default, the transfer behavior + # none|copy|http|remote_transfer|remote_copy can be configured on a per + # path basis via the following file or dictionary. See Pulsar documentation + # for more details and examples. + #file_action_config: file_actions.yaml + #file_actions: {} + + # The non-legacy Pulsar runners will attempt to resolve Galaxy + # dependencies remotely - to enable this set a tool_dependency_dir + # in Pulsar's configuration (can work with all the same dependency + # resolutions mechanisms as Galaxy - tool Shed installs, Galaxy + # packages, etc...). To disable this behavior, set the follow parameter + # to none. To generate the dependency resolution command locally + # set the following parameter local. + #dependency_resolution: none + + # Uncomment following option to enable setting metadata on remote + # Pulsar server. The 'use_remote_datatypes' option is available for + # determining whether to use remotely configured datatypes or local + # ones (both alternatives are a little brittle). + #remote_metadata: true + #use_remote_datatypes: false + #remote_property_galaxy_home: /path/to/remote/galaxy-central + + # If remote Pulsar server is configured to run jobs as the real user, + # uncomment the following line to pass the current Galaxy user + # along. + #submit_user: $__user_name__ + + # Various other submission parameters can be passed along to the Pulsar + # whose use will depend on the remote Pulsar's configured job manager. + # For instance: + #submit_native_specification: -P bignodes -R y -pe threads 8 + + # Disable parameter rewriting and rewrite generated commands + # instead. This may be required if remote host is Windows machine + # but probably not otherwise. + #rewrite_parameters: false + + pulsar_mq_dest: + runner: pulsar_mq + # The RESTful Pulsar client sends a request to Pulsar + # to populate various system properties. This + # extra step can be disabled and these calculated here + # on client by uncommenting jobs_directory and + # specifying any additional remote_property_ of + # interest, this is not optional when using message + # queues. + jobs_directory: /path/to/remote/pulsar/files/staging/ + # Otherwise MQ and Legacy pulsar destinations can be supplied + # all the same destination parameters as the RESTful client documented + # above (though url and private_token are ignored when using a MQ). + + # Example CLI runners. + ssh_torque: + runner: cli + shell_plugin: SecureShell + job_plugin: Torque + shell_username: foo + shell_hostname: foo.example.org + job_Resource_List: walltime=24:00:00,ncpus=4 + + ssh_slurm: + runner: cli + shell_plugin: SecureShell + job_plugin: Slurm + shell_username: foo + shell_hostname: my_host + job_time: 2:00:00 + job_ncpus: 4 + job_partition: my_partition + + local_lsf_8cpu_16GbRam: + runner: cli + shell_plugin: LocalShell + job_plugin: LSF + job_memory: 16000 + job_cores: 8 + job_project: BigMem + +# Tools can be configured to use specific destinations or handlers, +# identified by either the "id" or "tags" attribute. If assigned to +# a tag, a handler or destination that matches that tag will be +# chosen at random. +tools: +- id: bwa + handler: handler0 +- id: bowtie + handler: handler1 +- id: bar + enviroment: dynamic +- + # Next example defines resource group to insert into tool interface + # and pass to dynamic destination (as resource_params argument). + id: longbar + environment: dynamic + resources: all +- + # Pick a handler randomly from those declaring this tag. + id: baz + handler: special_handlers + environment: bigmem +- + # legacy trackerster parameter for tool mapping + id: foo + handler: handler0 + source: trackster + +resources: + default: default + groups: + # Group different parameters defined in job_resource_params_conf.xml + # together and assign these groups ids. Tool section below can map + # tools to different groups. + default: [] + memoryonly: [memory] + all: [processors, memory, time, project] + +# Certain limits can be defined. The 'concurrent_jobs' limits all +# control the number of jobs that can be "active" at a time, that +# is, dispatched to a runner and in the 'queued' or 'running' +# states. + +# A race condition exists that will allow environment_* concurrency +# limits to be surpassed when multiple handlers are allowed to +# handle jobs for the same environment. To prevent this, assign all +# jobs for a specific environment to a single handler. +limits: +- + # Limit on the number of jobs a user with a registered Galaxy + # account can have active across all environments. + type: registered_user_concurrent_jobs + value: 2 + +- + # Likewise, but for unregistered/anonymous users. + type: anonymous_user_concurrent_jobs + value: 1 + +- + # The number of jobs a user can have active in the specified + # environment, or across all environments identified by the + # specified tag. (formerly: concurrent_jobs) + type: environment_user_concurrent_jobs + id: local + value: 1 + +- + type: environment_user_concurrent_jobs + tag: mycluster + value: 2 + +- + type: environment_user_concurrent_jobs + tag: longjobs + value: 1 + +- + # The number of jobs that can be active in the specified + # environment (or across all environments identified by the + # specified tag) by any/all users. + type: environment_total_concurrent_jobs + id: local + value: 16 + +- + type: environment_total_concurrent_jobs + tag: longjobs + value: 100 + +- + # Amount of time a job can run (in any environment) before it + # will be terminated by Galaxy. + type: walltime + value: '24:00:00' + +- + # Total walltime that jobs may not exceed during a set period. + # If total walltime of finished jobs exceeds this value, any + # new jobs are paused. `window` is a number in days, + # representing the period. + type: total_walltime + window: 30 + value: '24:00:00' + +- + # Size that any defined tool output can grow to before the job + # will be terminated. This does not include temporary files + # created by the job. Format is flexible, e.g.: + # '10GB' = '10g' = '10240 Mb' = '10737418240' + type: output_size + value: '10GB' diff --git a/test/unit/jobs/test_job_configuration.py b/test/unit/jobs/test_job_configuration.py index 33f6e716a6ad..b1597edf274f 100644 --- a/test/unit/jobs/test_job_configuration.py +++ b/test/unit/jobs/test_job_configuration.py @@ -5,6 +5,7 @@ import unittest import mock +from pykwalify.core import Core from galaxy.jobs import JobConfiguration from galaxy.util import bunch @@ -14,16 +15,18 @@ # there are advantages to testing the documentation/examples. SIMPLE_JOB_CONF = os.path.join(os.path.dirname(__file__), "..", "..", "..", "config", "job_conf.xml.sample_basic") ADVANCED_JOB_CONF = os.path.join(os.path.dirname(__file__), "..", "..", "..", "config", "job_conf.xml.sample_advanced") +ADVANCED_JOB_CONF_YAML = os.path.join(os.path.dirname(__file__), "job_conf.sample_advanced.yml") CONDITIONAL_RUNNER_JOB_CONF = os.path.join(os.path.dirname(__file__), "conditional_runners_job_conf.xml") HANDLER_TEMPLATE_JOB_CONF = os.path.join(os.path.dirname(__file__), "handler_template_job_conf.xml") class BaseJobConfXmlParserTestCase(unittest.TestCase): + extension = "xml" def setUp(self): self.temp_directory = tempfile.mkdtemp() self.config = bunch.Bunch( - job_config_file=os.path.join(self.temp_directory, "job_conf.xml"), + job_config_file=os.path.join(self.temp_directory, "job_conf.%s" % self.extension), use_tasked_jobs=False, job_resource_params_file="/tmp/fake_absent_path", config_dict={}, @@ -91,14 +94,24 @@ def _with_handlers_config(self, assign_with=None, default=None, handlers=None, b def _write_config_from(self, path, template=None): template = template or {} - self._write_config(open(path, "r").read().format(**template)) + contents = open(path, "r").read() + if template: + contents = contents.format(**template) + self._write_config(contents) def _write_config(self, contents): with open(os.path.join(self.temp_directory, "job_conf.%s" % self.extension), "w") as f: f.write(contents) + def _with_advanced_config(self): + if self.extension == "xml": + self._write_config_from(ADVANCED_JOB_CONF) + else: + self._write_config_from(ADVANCED_JOB_CONF_YAML) + class SimpleJobConfXmlParserTestCase(BaseJobConfXmlParserTestCase): + extension = "xml" def test_load_simple_runner(self): runner_plugin = self.job_config.runner_plugins[0] @@ -260,38 +273,43 @@ def test_conditional_runners_from_environ(self): del os.environ["LOCAL2_ENABLED"] del os.environ["LOCAL3_ENABLED"] + def test_macro_expansion(self): + self._with_advanced_config() + for name in ["foo_small", "foo_medium", "foo_large", "foo_longrunning"]: + assert self.job_config.destinations[name] + class AdvancedJobConfXmlParserTestCase(BaseJobConfXmlParserTestCase): def test_load_destination_params(self): - self.__with_advanced_config() + self._with_advanced_config() pbs_dest = self.job_config.destinations["pbs_longjobs"][0] - assert pbs_dest.id == "pbs_longjobs" + assert pbs_dest.id == "pbs_longjobs", pbs_dest assert pbs_dest.runner == "pbs" dest_params = pbs_dest.params assert dest_params["Resource_List"] == "walltime=72:00:00" def test_destination_tags(self): - self.__with_advanced_config() - longjob_dests = self.job_config.destinations["longjobs"] - assert len(longjob_dests) == 2 - assert longjob_dests[0].id == "pbs_longjobs" - assert longjob_dests[1].id == "remote_cluster" + self._with_advanced_config() + longjob_dests_ids = sorted([j.id for j in self.job_config.destinations["longjobs"]]) + assert len(longjob_dests_ids) == 2 + assert longjob_dests_ids[0] == "pbs_longjobs" + assert longjob_dests_ids[1] == "remote_cluster" def test_load_tool(self): - self.__with_advanced_config() + self._with_advanced_config() baz_tool = self.job_config.tools["baz"][0] assert baz_tool.id == "baz" assert baz_tool.handler == "special_handlers" assert baz_tool.destination == "bigmem" def test_load_tool_params(self): - self.__with_advanced_config() + self._with_advanced_config() foo_tool = self.job_config.tools["foo"][0] assert foo_tool.params["source"] == "trackster" def test_limit_overrides(self): - self.__with_advanced_config() + self._with_advanced_config() limits = self.job_config.limits assert limits.registered_user_concurrent_jobs == 2 assert limits.anonymous_user_concurrent_jobs == 1 @@ -303,7 +321,7 @@ def test_limit_overrides(self): assert limits.total_walltime["window"] == 30 def test_env_parsing(self): - self.__with_advanced_config() + self._with_advanced_config() env_dest = self.job_config.destinations["java_cluster"][0] assert len(env_dest.env) == 4, len(env_dest.env) assert env_dest.env[0]["name"] == "_JAVA_OPTIONS" @@ -317,7 +335,7 @@ def test_env_parsing(self): assert env_dest.env[3]["execute"] == "module load javastuff/2.10" def test_runners_kwds(self): - self.__with_advanced_config() + self._with_advanced_config() sge_runner = [r for r in self.job_config.runner_plugins if r["id"] == "sge"][0] assert sge_runner["kwds"]["drmaa_library_path"] == "/sge/lib/libdrmaa.so" @@ -327,7 +345,7 @@ def test_runners_kwds(self): assert self.job_config.dynamic_params["rules_module"] == "galaxy.jobs.rules" def test_container_tag_in_destination(self): - self.__with_advanced_config() + self._with_advanced_config() container_dest = self.job_config.destinations["customized_container"][0] assert "container" in container_dest.params assert "container_override" in container_dest.params @@ -346,28 +364,43 @@ def test_container_tag_in_destination(self): assert not container_override1["resolve_dependencies"] def test_tool_mapping_parameters(self): - self.__with_advanced_config() + self._with_advanced_config() assert self.job_config.tools["foo"][-1].params["source"] == "trackster" assert self.job_config.tools["longbar"][-1].destination == "dynamic" assert self.job_config.tools["longbar"][-1].resources == "all" def test_handler_runner_plugins(self): - self.__with_advanced_config() + self._with_advanced_config() assert self.job_config.handler_runner_plugins["sge_handler"] == ["sge"] assert "special_handler1" not in self.job_config.handler_runner_plugins def test_resource_groups(self): - self.__with_advanced_config() + self._with_advanced_config() assert self.job_config.default_resource_group == "default" assert self.job_config.resource_groups["memoryonly"] == ["memory"] - def test_macro_expansion(self): - self.__with_advanced_config() - for name in ["foo_small", "foo_medium", "foo_large", "foo_longrunning"]: - assert self.job_config.destinations[name] - def __with_advanced_config(self): - self._write_config_from(ADVANCED_JOB_CONF) +class AdvancedJobConfYamlParserTestCase(AdvancedJobConfXmlParserTestCase): + extension = "yml" + + +def test_yaml_advanced_validation(): + schema = os.path.join(os.path.dirname(__file__), "..", "..", "..", "lib", "galaxy", "webapps", "galaxy", "job_config_schema.yml") + integration_tests_dir = os.path.join(os.path.dirname(__file__), "..", "..", "..", "test", "integration") + valid_files = [ + ADVANCED_JOB_CONF_YAML, + os.path.join(integration_tests_dir, "delay_job_conf.yml"), + os.path.join(integration_tests_dir, "embedded_pulsar_metadata_job_conf.yml"), + os.path.join(integration_tests_dir, "io_injection_job_conf.yml"), + os.path.join(integration_tests_dir, "resubmission_job_conf.yml"), + os.path.join(integration_tests_dir, "resubmission_default_job_conf.yml"), + ] + for valid_file in valid_files: + c = Core( + source_file=valid_file, + schema_files=[schema], + ) + c.validate() class MockJobMetrics(object): From 18d182c07b5271b02c818e3c834f5a05e3421399 Mon Sep 17 00:00:00 2001 From: John Chilton Date: Sat, 4 May 2019 11:37:18 -0400 Subject: [PATCH 9/9] Don't rely on dictionary ordering for test_job_configuration unit tests. More robust tests and needed for YAML loading of job configuration files. --- test/unit/jobs/test_job_configuration.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/test/unit/jobs/test_job_configuration.py b/test/unit/jobs/test_job_configuration.py index b1597edf274f..6884c4cfd7ce 100644 --- a/test/unit/jobs/test_job_configuration.py +++ b/test/unit/jobs/test_job_configuration.py @@ -153,7 +153,7 @@ def test_implicit_db_assign_handler_assign_with_explicit_handlers(self): self._with_handlers_config(handlers=[{'id': 'handler0'}, {'id': 'handler1'}]) assert self.job_config.handler_assignment_methods == ['db-preassign'] assert self.job_config.default_handler_id is None - assert self.job_config.handlers['_default_'] == ['handler0', 'handler1'] + assert sorted(self.job_config.handlers['_default_']) == ['handler0', 'handler1'] def test_implict_uwsgi_mule_message_handler_assign(self): self._with_uwsgi_application_stack(mule='lib/galaxy/main.py', farm='job-handlers:1') @@ -166,7 +166,7 @@ def test_implict_uwsgi_mule_message_handler_assign_with_explicit_handlers(self): self._with_uwsgi_application_stack(mule='lib/galaxy/main.py', farm='job-handlers:1') assert self.job_config.handler_assignment_methods == ['uwsgi-mule-message', 'db-preassign'], self.job_config.handler_assignment_methods assert self.job_config.default_handler_id is None - assert self.job_config.handlers['_default_'] == ['handler0', 'handler1', 'main.job-handlers.1'] + assert sorted(self.job_config.handlers['_default_']) == ['handler0', 'handler1', 'main.job-handlers.1'] def test_explicit_mem_self_handler_assign(self): self._with_handlers_config(assign_with='mem-self') @@ -186,7 +186,7 @@ def test_explicit_db_preassign_handler_assign_with_uwsgi(self): self._with_uwsgi_application_stack(mule='lib/galaxy/main.py', farm='job-handlers:1') assert self.job_config.handler_assignment_methods == ['db-preassign'] assert self.job_config.default_handler_id is None - assert self.job_config.handlers['_default_'] == ['handler0', 'main.job-handlers.1'] + assert sorted(self.job_config.handlers['_default_']) == ['handler0', 'main.job-handlers.1'] def test_explicit_db_transaction_isolation_handler_assign(self): self._with_handlers_config(assign_with='db-transaction-isolation') @@ -199,7 +199,7 @@ def test_explicit_db_transaction_isolation_handler_assign_with_uwsgi(self): self._with_uwsgi_application_stack(mule='lib/galaxy/main.py', farm='job-handlers:1') assert self.job_config.handler_assignment_methods == ['db-transaction-isolation'] assert self.job_config.default_handler_id is None - assert self.job_config.handlers['_default_'] == ['handler0', 'main.job-handlers.1'] + assert sorted(self.job_config.handlers['_default_']) == ['handler0', 'main.job-handlers.1'] def test_explicit_db_skip_locked_handler_assign(self): self._with_handlers_config(assign_with='db-skip-locked') @@ -212,7 +212,7 @@ def test_explicit_db_skip_locked_handler_assign_with_uwsgi(self): self._with_uwsgi_application_stack(mule='lib/galaxy/main.py', farm='job-handlers:1') assert self.job_config.handler_assignment_methods == ['db-skip-locked'] assert self.job_config.default_handler_id is None - assert self.job_config.handlers['_default_'] == ['handler0', 'main.job-handlers.1'] + assert sorted(self.job_config.handlers['_default_']) == ['handler0', 'main.job-handlers.1'] def test_uwsgi_farms_as_handler_tags(self): self._with_uwsgi_application_stack(