Skip to content

Commit

Permalink
Merge pull request galaxyproject#17367 from nsoranzo/lxml-stubs
Browse files Browse the repository at this point in the history
Fix type annotation of code using XML etree
  • Loading branch information
mvdbeek authored Jan 29, 2024
2 parents f81cc1b + 9a682d4 commit 5c6dd8e
Show file tree
Hide file tree
Showing 31 changed files with 192 additions and 132 deletions.
6 changes: 4 additions & 2 deletions lib/galaxy/datatypes/dataproviders/hierarchy.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
"""
import logging

from galaxy.util import etree
from galaxy.util import (
Element,
etree,
)
from . import line

_TODO = """
Expand Down Expand Up @@ -65,7 +68,6 @@ def matches_selector(self, element, selector=None):
# TODO: add more flexibility here w/o re-implementing xpath
# TODO: fails with '#' - browser thinks it's an anchor - use urlencode
# TODO: need removal/replacement of etree namespacing here - then move to string match
Element = getattr(etree, "_Element", etree.Element)
return bool((selector is None) or (isinstance(element, Element) and selector in element.tag))

def element_as_dict(self, element):
Expand Down
17 changes: 7 additions & 10 deletions lib/galaxy/dependencies/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,18 +67,15 @@ def load_job_config_dict(job_conf_dict):
job_conf_path = join(dirname(self.config_file), job_conf_path)
if ".xml" in job_conf_path:
try:
try:
for plugin in parse_xml(job_conf_path).find("plugins").findall("plugin"):
job_conf_tree = parse_xml(job_conf_path)
plugins_elem = job_conf_tree.find("plugins")
if plugins_elem:
for plugin in plugins_elem.findall("plugin"):
if "load" in plugin.attrib:
self.job_runners.append(plugin.attrib["load"])
except OSError:
pass
try:
for plugin in parse_xml(job_conf_path).findall('.//destination/param[@id="rules_module"]'):
self.job_rule_modules.append(plugin.text)
except OSError:
pass
except etree.ParseError:
for plugin in job_conf_tree.findall('.//destination/param[@id="rules_module"]'):
self.job_rule_modules.append(plugin.text)
except (OSError, etree.ParseError):
pass
else:
try:
Expand Down
1 change: 1 addition & 0 deletions lib/galaxy/dependencies/pinned-typecheck-requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
annotated-types==0.6.0 ; python_version >= "3.8" and python_version < "3.13"
cffi==1.16.0 ; python_version >= "3.8" and python_version < "3.13"
cryptography==41.0.7 ; python_version >= "3.8" and python_version < "3.13"
lxml-stubs==0.5.1 ; python_version >= "3.8" and python_version < "3.13"
mypy-extensions==1.0.0 ; python_version >= "3.8" and python_version < "3.13"
mypy==1.8.0 ; python_version >= "3.8" and python_version < "3.13"
pycparser==2.21 ; python_version >= "3.8" and python_version < "3.13"
Expand Down
3 changes: 1 addition & 2 deletions lib/galaxy/jobs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@
ToolEvaluator,
)
from galaxy.util import (
etree,
parse_xml_string,
RWXRWXRWX,
safe_makedirs,
Expand Down Expand Up @@ -643,7 +642,7 @@ def get_tool_resource_xml(self, tool_id, tool_type):
field_name, self.resource_parameters
)
raise KeyError(message)
fields.append(etree.fromstring(self.resource_parameters[field_name]))
fields.append(parse_xml_string(self.resource_parameters[field_name]))

if fields:
conditional_element = parse_xml_string(self.JOB_RESOURCE_CONDITIONAL_XML)
Expand Down
8 changes: 4 additions & 4 deletions lib/galaxy/jobs/dynamic_tool_destination.py
Original file line number Diff line number Diff line change
Expand Up @@ -1647,14 +1647,14 @@ def get_destination_list_from_job_config(job_config_location) -> set:

# Add all destination IDs from the job configuration xml file
for destination in job_conf.getroot().iter("destination"):
if isinstance(destination.get("id"), str):
destination_list.add(destination.get("id"))

destination_id = destination.get("id")
if destination_id:
destination_list.add(destination_id)
else:
error = f"Destination ID '{str(destination)}"
error += "' in job configuration file cannot be"
error += " parsed. Things may not work as expected!"
log.debug(error)
log.warning(error)

return destination_list

Expand Down
6 changes: 5 additions & 1 deletion lib/galaxy/jobs/runners/util/cli/job/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
abstractmethod,
)
from enum import Enum
from typing import (
Dict,
List,
)

try:
from galaxy.model import Job
Expand Down Expand Up @@ -60,7 +64,7 @@ def get_single_status(self, job_id):
"""

@abstractmethod
def parse_status(self, status, job_ids):
def parse_status(self, status: str, job_ids: List[str]) -> Dict[str, str]:
"""
Parse the statuses of output from get_status command.
"""
Expand Down
4 changes: 2 additions & 2 deletions lib/galaxy/jobs/runners/util/cli/job/lsf.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def parse_failure_reason(self, reason, job_id):
return runner_states.WALLTIME_REACHED
return None

def _get_job_state(self, state):
def _get_job_state(self, state: str) -> str:
# based on:
# https://www.ibm.com/support/knowledgecenter/en/SSETD4_9.1.3/lsf_admin/job_state_lsf.html
# https://www.ibm.com/support/knowledgecenter/en/SSETD4_9.1.2/lsf_command_ref/bjobs.1.html
Expand All @@ -115,7 +115,7 @@ def _get_job_state(self, state):
"UNKWN": job_states.ERROR,
"WAIT": job_states.QUEUED,
"ZOMBI": job_states.ERROR,
}.get(state)
}[state]
except KeyError:
raise KeyError(f"Failed to map LSF status code [{state}] to job state.")

Expand Down
4 changes: 2 additions & 2 deletions lib/galaxy/jobs/runners/util/cli/job/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,15 @@ def parse_single_status(self, status, job_id):
# else line like "slurm_load_jobs error: Invalid job id specified"
return job_states.OK

def _get_job_state(self, state):
def _get_job_state(self, state: str) -> str:
try:
return {
"F": job_states.ERROR,
"R": job_states.RUNNING,
"CG": job_states.RUNNING,
"PD": job_states.QUEUED,
"CD": job_states.OK,
}.get(state)
}[state]
except KeyError:
raise KeyError(f"Failed to map slurm status code [{state}] to job state.")

Expand Down
21 changes: 12 additions & 9 deletions lib/galaxy/jobs/runners/util/cli/job/torque.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,19 @@ def parse_status(self, status, job_ids):
tree = None
if tree is None:
log.warning(f"No valid qstat XML return from `qstat -x`, got the following: {status}")
return None
return {}
else:
for job in tree.findall("Job"):
id = job.find("Job_Id").text
if id in job_ids:
state = job.find("job_state").text
job_id_elem = job.find("Job_Id")
assert job_id_elem is not None
id_ = job_id_elem.text
if id_ in job_ids:
job_state_elem = job.find("job_state")
assert job_state_elem is not None
state = job_state_elem.text
assert state
# map PBS job states to Galaxy job states.
rval[id] = self._get_job_state(state)
rval[id_] = self._get_job_state(state)
return rval

def parse_single_status(self, status, job_id):
Expand All @@ -95,11 +100,9 @@ def parse_single_status(self, status, job_id):
# no state found, job has exited
return job_states.OK

def _get_job_state(self, state):
def _get_job_state(self, state: str) -> str:
try:
return {"E": job_states.RUNNING, "R": job_states.RUNNING, "Q": job_states.QUEUED, "C": job_states.OK}.get(
state
)
return {"E": job_states.RUNNING, "R": job_states.RUNNING, "Q": job_states.QUEUED, "C": job_states.OK}[state]
except KeyError:
raise KeyError(f"Failed to map torque status code [{state}] to job state.")

Expand Down
6 changes: 3 additions & 3 deletions lib/galaxy/tool_shed/galaxy_install/tools/data_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ def install_data_managers(
data_manager_id = elem.get("id", None)
if data_manager_id is None:
log.error(
"A data manager was defined that does not have an id and will not be installed:\n%s"
% xml_to_string(elem)
"A data manager was defined that does not have an id and will not be installed:\n%s",
xml_to_string(elem),
)
continue
data_manager_dict = (
Expand Down Expand Up @@ -170,7 +170,7 @@ def install_data_managers(
)
if data_manager:
rval.append(data_manager)
elif elem.tag is etree.Comment:
elif elem.tag is etree.Comment: # type: ignore[comparison-overlap]
pass
else:
log.warning(f"Encountered unexpected element '{elem.tag}':\n{xml_to_string(elem)}")
Expand Down
38 changes: 18 additions & 20 deletions lib/galaxy/tool_shed/galaxy_install/tools/tool_panel_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@
from galaxy.tool_shed.util.repository_util import get_repository_owner
from galaxy.tool_shed.util.shed_util_common import get_tool_panel_config_tool_path_install_dir
from galaxy.util import (
etree,
Element,
parse_xml_string,
SubElement,
xml_to_string,
)
from galaxy.util.renamed_temporary_file import RenamedTemporaryFile
Expand Down Expand Up @@ -154,27 +155,27 @@ def config_elems_to_xml_file(self, config_elems, config_filename, tool_path, too

def generate_tool_elem(
self, tool_shed, repository_name, changeset_revision, owner, tool_file_path, tool, tool_section
):
) -> Element:
"""Create and return an ElementTree tool Element."""
if tool_section is not None:
tool_elem = etree.SubElement(tool_section, "tool")
tool_elem = SubElement(tool_section, "tool")
else:
tool_elem = etree.Element("tool")
tool_elem = Element("tool")
tool_elem.attrib["file"] = tool_file_path
if not tool.guid:
raise ValueError("tool has no guid")
tool_elem.attrib["guid"] = tool.guid
tool_shed_elem = etree.SubElement(tool_elem, "tool_shed")
tool_shed_elem = SubElement(tool_elem, "tool_shed")
tool_shed_elem.text = tool_shed
repository_name_elem = etree.SubElement(tool_elem, "repository_name")
repository_name_elem = SubElement(tool_elem, "repository_name")
repository_name_elem.text = repository_name
repository_owner_elem = etree.SubElement(tool_elem, "repository_owner")
repository_owner_elem = SubElement(tool_elem, "repository_owner")
repository_owner_elem.text = owner
changeset_revision_elem = etree.SubElement(tool_elem, "installed_changeset_revision")
changeset_revision_elem = SubElement(tool_elem, "installed_changeset_revision")
changeset_revision_elem.text = changeset_revision
id_elem = etree.SubElement(tool_elem, "id")
id_elem = SubElement(tool_elem, "id")
id_elem.text = tool.id
version_elem = etree.SubElement(tool_elem, "version")
version_elem = SubElement(tool_elem, "version")
version_elem.text = tool.version
return tool_elem

Expand Down Expand Up @@ -297,7 +298,7 @@ def generate_tool_panel_elem_list(
owner="",
):
"""Generate a list of ElementTree Element objects for each section or tool."""
elem_list: List[etree.Element] = []
elem_list: List[Element] = []
tool_elem = None
cleaned_repository_clone_url = remove_protocol_and_user_from_clone_url(repository_clone_url)
if not owner:
Expand Down Expand Up @@ -335,6 +336,7 @@ def generate_tool_panel_elem_list(
tool_section if inside_section else None,
)
if inside_section:
assert tool_section is not None
if section_in_elem_list is not None:
elem_list[section_in_elem_list] = tool_section
else:
Expand Down Expand Up @@ -366,17 +368,13 @@ def generate_tool_section_dicts(self, tool_config=None, tool_sections=None) -> L
tool_section_dicts.append(dict(tool_config=tool_config, id="", version="", name=""))
return tool_section_dicts

def generate_tool_section_element_from_dict(self, tool_section_dict):
def generate_tool_section_element_from_dict(self, tool_section_dict: Dict[str, str]) -> Element:
# The value of tool_section_dict looks like the following.
# { id: <ToolSection id>, version : <ToolSection version>, name : <TooSection name>}
if tool_section_dict["id"]:
# Create a new tool section.
tool_section = etree.Element("section")
tool_section.attrib["id"] = tool_section_dict["id"]
tool_section.attrib["name"] = tool_section_dict["name"]
tool_section.attrib["version"] = tool_section_dict["version"]
else:
tool_section = None
tool_section = Element("section")
tool_section.attrib["id"] = tool_section_dict["id"]
tool_section.attrib["name"] = tool_section_dict["name"]
tool_section.attrib["version"] = tool_section_dict["version"]
return tool_section

def get_or_create_tool_section(self, toolbox, tool_panel_section_id, new_tool_panel_section_label=None):
Expand Down
4 changes: 2 additions & 2 deletions lib/galaxy/tool_shed/metadata/metadata_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -492,9 +492,9 @@ def generate_repository_dependency_metadata(self, repository_dependencies_config
root = tree.getroot()
xml_is_valid = root.tag == "repositories"
if xml_is_valid:
invalid_repository_dependencies_dict = dict(description=root.get("description"))
invalid_repository_dependencies_dict: Dict[str, Any] = dict(description=root.get("description"))
invalid_repository_dependency_tups = []
valid_repository_dependencies_dict = dict(description=root.get("description"))
valid_repository_dependencies_dict: Dict[str, Any] = dict(description=root.get("description"))
valid_repository_dependency_tups = []
for repository_elem in root.findall("repository"):
repository_dependency_tup, repository_dependency_is_valid, err_msg = self.handle_repository_elem(
Expand Down
21 changes: 12 additions & 9 deletions lib/galaxy/tool_shed/tools/data_table_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@

from galaxy.tool_shed.galaxy_install.client import InstallationTarget
from galaxy.tool_shed.util import hg_util
from galaxy.util import etree
from galaxy.util import (
Element,
SubElement,
)
from galaxy.util.tool_shed import xml_util

if TYPE_CHECKING:
Expand All @@ -29,24 +32,24 @@ def __init__(self, app: RequiredAppT):

def generate_repository_info_elem(
self, tool_shed: str, repository_name: str, changeset_revision: str, owner: str, parent_elem=None, **kwd
) -> etree.Element:
) -> Element:
"""Create and return an ElementTree repository info Element."""
if parent_elem is None:
elem = etree.Element("tool_shed_repository")
elem = Element("tool_shed_repository")
else:
elem = etree.SubElement(parent_elem, "tool_shed_repository")
tool_shed_elem = etree.SubElement(elem, "tool_shed")
elem = SubElement(parent_elem, "tool_shed_repository")
tool_shed_elem = SubElement(elem, "tool_shed")
tool_shed_elem.text = tool_shed
repository_name_elem = etree.SubElement(elem, "repository_name")
repository_name_elem = SubElement(elem, "repository_name")
repository_name_elem.text = repository_name
repository_owner_elem = etree.SubElement(elem, "repository_owner")
repository_owner_elem = SubElement(elem, "repository_owner")
repository_owner_elem.text = owner
changeset_revision_elem = etree.SubElement(elem, "installed_changeset_revision")
changeset_revision_elem = SubElement(elem, "installed_changeset_revision")
changeset_revision_elem.text = changeset_revision
# add additional values
# TODO: enhance additional values to allow e.g. use of dict values that will recurse
for key, value in kwd.items():
new_elem = etree.SubElement(elem, key)
new_elem = SubElement(elem, key)
new_elem.text = value
return elem

Expand Down
Loading

0 comments on commit 5c6dd8e

Please sign in to comment.