Skip to content

Commit

Permalink
Implement tool upgrade assistant script.
Browse files Browse the repository at this point in the history
Inspect a tool's XML file and provide advice on upgrading to new tool versions. It is implemented as a library in tool_util for integration with Planemo in the future but I've added a script here to run it on the command-line directly. It can also output in JSON for integration with external tools such as the galaxy language server.
  • Loading branch information
jmchilton committed Aug 22, 2024
1 parent 2cd33e8 commit b697873
Show file tree
Hide file tree
Showing 12 changed files with 619 additions and 3 deletions.
6 changes: 5 additions & 1 deletion lib/galaxy/tool_util/parser/xml.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,10 @@ def _get_option_value(self, key, default):
def _command_el(self):
return self.root.find("command")

@property
def _outputs_el(self):
return self.root.find("outputs")

def _get_attribute_as_bool(self, attribute, default, elem=None):
if elem is None:
elem = self.root
Expand Down Expand Up @@ -411,7 +415,7 @@ def parse_input_pages(self) -> "XmlPagesSource":

def parse_provided_metadata_style(self):
style = None
out_elem = self.root.find("outputs")
out_elem = self._outputs_el
if out_elem is not None and "provided_metadata_style" in out_elem.attrib:
style = out_elem.attrib["provided_metadata_style"]

Expand Down
253 changes: 253 additions & 0 deletions lib/galaxy/tool_util/upgrade/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
# Profile changes not covered.
# - Allow invalid values. @mvdbeek any clue what to say here. The PR is here:
# - https://github.com/galaxyproject/galaxy/pull/6264
from json import loads
from typing import (
cast,
Dict,
List,
Optional,
Type,
)

from typing_extensions import (
Literal,
NotRequired,
TypedDict,
)

from galaxy.tool_util.parser.factory import get_tool_source
from galaxy.tool_util.parser.xml import XmlToolSource
from galaxy.util import Element
from galaxy.util.resources import resource_string


class AdviceCode(TypedDict):
name: str
level: Literal["must_fix", "consider", "ready", "info"]
message: str
niche: NotRequired[bool]
url: NotRequired[str]


upgrade_codes_json = resource_string(__package__, "upgrade_codes.json")
upgrade_codes_by_name: Dict[str, AdviceCode] = {}

for name, upgrade_object in loads(upgrade_codes_json).items():
upgrade_object["name"] = name
upgrade_codes_by_name[name] = cast(AdviceCode, upgrade_object)


class AdviceCollection:
_advice: List[AdviceCode]

def __init__(self):
self._advice = []

def add(self, code: str):
self._advice.append(upgrade_codes_by_name[code])

def to_list(self) -> List[AdviceCode]:
return self._advice


class ProfileMigration:
"""A class offering advice on upgrading a Galaxy tool between two profile versions."""

from_version: str
to_version: str

@classmethod
def advise(cls, advice_collection: AdviceCollection, xml_file: str) -> None:
return None


class ProfileMigration16_04(ProfileMigration):
from_version = "16.01"
to_version = "16.04"

@classmethod
def advise(cls, advice_collection: AdviceCollection, xml_file: str) -> None:
tool_source = _xml_tool_source(xml_file)
interpreter = tool_source.parse_interpreter()
if interpreter:
advice_collection.add("16_04_fix_interpreter")
else:
advice_collection.add("16_04_ready_interpreter")
advice_collection.add("16_04_consider_implicit_extra_file_collection")

if _has_matching_xpath(tool_source, ".//data[@format = 'input']"):
advice_collection.add("16_04_fix_output_format")

if not _has_matching_xpath(tool_source, ".//stdio") and not _has_matching_xpath(
tool_source, ".//command[@detect_errors]"
):
advice_collection.add("16_04_exit_code")


class ProfileMigration17_09(ProfileMigration):
from_version = "16.04"
to_version = "17.09"

@classmethod
def advise(cls, advice_collection: AdviceCollection, xml_file: str) -> None:
tool_source = _xml_tool_source(xml_file)

outputs_el = tool_source._outputs_el
if outputs_el is not None and outputs_el.get("`provided_metadata_style`", None) is not None:
advice_collection.add("17_09_consider_provided_metadata_style")


class ProfileMigration18_01(ProfileMigration):
from_version = "17.09"
to_version = "18.01"

@classmethod
def advise(cls, advice_collection: AdviceCollection, xml_file: str) -> None:
tool_source = _xml_tool_source(xml_file)
command_el = tool_source._command_el
if command_el is not None and command_el.get("use_shared_home", None) is None:
advice_collection.add("18_01_consider_home_directory")

if _has_matching_xpath(tool_source, ".//outputs/collection[@structured_like]"):
advice_collection.add("18_01_consider_structured_like")


class ProfileMigration18_09(ProfileMigration):
from_version = "18.01"
to_version = "18.09"

@classmethod
def advise(cls, advice_collection: AdviceCollection, xml_file: str) -> None:
tool_source = _xml_tool_source(xml_file)
tool_type = tool_source.parse_tool_type()
if tool_type == "manage_data":
advice_collection.add("18_09_consider_python_environment")


class ProfileMigration20_05(ProfileMigration):
from_version = "18.09"
to_version = "20.05"

def advise(advice_collection: AdviceCollection, xml_file: str):
tool_source = _xml_tool_source(xml_file)

if _has_matching_xpath(tool_source, ".//configfiles/inputs"):
advice_collection.add("20_05_consider_inputs_as_json_changes")


class ProfileMigration20_09(ProfileMigration):
from_version = "20.05"
to_version = "20.09"

@classmethod
def advise(cls, advice_collection: AdviceCollection, xml_file: str) -> None:
tool_source = _xml_tool_source(xml_file)

tests = tool_source.parse_tests_to_dict()
for test in tests["tests"]:
output_collections = test.get("output_collections")
if not output_collections:
continue

for output_collection in output_collections:
if output_collection.get("element_tests"):
advice_collection.add("20_09_consider_output_collection_order")

command_el = tool_source._command_el
if command_el is not None:
strict = command_el.get("strict", None)
if strict is None:
advice_collection.add("20_09_consider_set_e")


class ProfileMigration21_09(ProfileMigration):
from_version = "20.09"
to_version = "21.09"

@classmethod
def advise(cls, advice_collection: AdviceCollection, xml_file: str) -> None:
tool_source = _xml_tool_source(xml_file)
for el in _find_all(tool_source, ".//data[@from_work_dir]"):
from_work_dir = el.get("from_work_dir") or ""
if from_work_dir != from_work_dir.strip():
advice_collection.add("")

tool_type = tool_source.parse_tool_type()
if tool_type == "data_source":
advice_collection.add("21_09_consider_python_environment")


class ProfileMigration23_0(ProfileMigration):
from_version = "21.09"
to_version = "23.0"

@classmethod
def advise(cls, advice_collection: AdviceCollection, xml_file: str) -> None:
tool_source = _xml_tool_source(xml_file)
for text_param in _find_all(tool_source, ".//input[@type='text']"):
optional_tag_set = text_param.get("optional", None) is not None
if not optional_tag_set:
advice_collection.add("23_0_consider_optional_text")


class ProfileMigration24_0(ProfileMigration):
from_version = "23.0"
to_version = "24.0"

@classmethod
def advise(cls, advice_collection: AdviceCollection, xml_file: str) -> None:
tool_source = _xml_tool_source(xml_file)
tool_type = tool_source.parse_tool_type()
if tool_type == "data_source_async":
advice_collection.add("24_0_consider_python_environment")
if tool_type in ["data_source_async", "data_source"]:
advice_collection.add("24_0_request_cleaning")


profile_migrations: List[Type[ProfileMigration]] = [
ProfileMigration16_04,
ProfileMigration17_09,
ProfileMigration18_01,
ProfileMigration18_09,
ProfileMigration20_05,
ProfileMigration20_09,
ProfileMigration21_09,
ProfileMigration23_0,
ProfileMigration24_0,
]

latest_supported_version = "24.0"


def advise_on_upgrade(xml_file: str, to_version: Optional[str] = None) -> List[AdviceCode]:
to_version = to_version or latest_supported_version
tool_source = _xml_tool_source(xml_file)
initial_version = tool_source.parse_profile()
advice_collection = AdviceCollection()

for migration in profile_migrations:
if migration.to_version < initial_version:
# tool started past here... just skip this advice
continue
if migration.to_version > to_version:
# we're not advising on upgrading past this point
break
migration.advise(advice_collection, xml_file)

return advice_collection.to_list()


def _xml_tool_source(xml_file: str) -> XmlToolSource:
tool_source = get_tool_source(xml_file)
if not isinstance(tool_source, XmlToolSource):
raise Exception("Can only provide upgrade advice for XML tools.")
return cast(XmlToolSource, tool_source)


def _has_matching_xpath(tool_source: XmlToolSource, xpath: str) -> bool:
return tool_source.xml_tree.find(xpath) is not None


def _find_all(tool_source: XmlToolSource, xpath: str) -> List[Element]:
return cast(List[Element], tool_source.xml_tree.findall(".//data[@from_work_dir]") or [])
89 changes: 89 additions & 0 deletions lib/galaxy/tool_util/upgrade/script.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
#!/usr/bin/env python

import argparse
import sys
from json import dumps
from textwrap import wrap
from typing import List

from galaxy.tool_util.upgrade import (
AdviceCode,
advise_on_upgrade,
latest_supported_version,
)

LEVEL_TO_STRING = {
"must_fix": "❌",
"ready": "✅",
"consider": "🤔",
"info": "ℹ️",
}
DESCRIPTION = f"""
A small utility to check for potential problems and provide advice when upgrading a tool's
profile version. This version of the script can provide advice for upgrading tools through
{latest_supported_version}.
"""


def arg_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description=DESCRIPTION)
parser.add_argument("xml_file")
parser.add_argument(
"-p",
"--profile-version",
dest="profile_version",
default=latest_supported_version,
help="Provide upgrade advice up to this Galaxy tool profile version.",
)
parser.add_argument(
"-j",
"--json",
default=False,
action="store_true",
help="Output aadvice as JSON.",
)
parser.add_argument(
"-n",
"--niche",
default=False,
action="store_true",
help="Include advice about niche features that may not be relevant for most tools - including the use of 'galaxy.json' and writing global state in the $HOME directory.",
)
return parser


def _print_advice(advice: AdviceCode):
message = "\n".join(wrap(advice["message"], initial_indent="", subsequent_indent=" "))
level = advice["level"]
level_str = LEVEL_TO_STRING[level]
url = advice.get("url")
print(f"- {level_str}{message}\n")
if url:
print(f" More information at {url}")


def _print_advice_list(advice_list: List[AdviceCode]):
for advice in advice_list:
_print_advice(advice)


def advise(xml_file: str, version: str, json: bool, niche: bool):
advice_list = advise_on_upgrade(xml_file, version)
if not niche:
advice_list = [a for a in advice_list if not a.get("niche", False)]
if json:
print(dumps(advice_list))
else:
_print_advice_list(advice_list)


def main(argv=None) -> None:
if argv is None:
argv = sys.argv[1:]

args = arg_parser().parse_args(argv)
advise(args.xml_file, args.profile_version, args.json, args.niche)


if __name__ == "__main__":
main()
Loading

0 comments on commit b697873

Please sign in to comment.