Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rewrite tool linters defined in planemo #1472

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 2 additions & 19 deletions planemo/commands/cmd_lint.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,7 @@
@options.fail_level_option()
@options.skip_options()
@options.recursive_option()
@click.option(
"--urls",
is_flag=True,
default=False,
help="Check validity of URLs in XML files",
)
@click.option(
"--doi",
is_flag=True,
default=False,
help="Check validity of DOIs in XML files",
)
@click.option(
"--conda_requirements",
is_flag=True,
default=False,
help="Check tool requirements for availability in best practice Conda channels.",
)
@options.lint_biocontainers_option()
@options.lint_planemo_defined_tool_linters_options()
# @click.option(
# "--verify",
# is_flag=True,
Expand All @@ -48,6 +30,7 @@
@command_function
def cli(ctx: PlanemoCliContext, uris, **kwds):
"""Check for common errors and best practices."""
print("LINT")
lint_args = build_tool_lint_args(ctx, **kwds)
exit_code = lint_tools_on_path(ctx, uris, lint_args, recursive=kwds["recursive"])

Expand Down
8 changes: 1 addition & 7 deletions planemo/commands/cmd_shed_lint.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,7 @@
"to allow automated creation and/or updates."
),
)
@click.option(
"--urls",
is_flag=True,
default=False,
help="Check validity of URLs in XML files",
)
@options.lint_biocontainers_option()
@options.lint_planemo_defined_tool_linters_options()
# @click.option(
# "--verify",
# is_flag=True,
Expand Down
90 changes: 0 additions & 90 deletions planemo/lint.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,13 @@
Dict,
TYPE_CHECKING,
)
from urllib.request import urlopen

import requests
from galaxy.tool_util.lint import (
LintContext,
Linter,
)

from planemo.io import error
from planemo.shed import find_urls_for_xml
from planemo.xml import validation

if TYPE_CHECKING:
Expand Down Expand Up @@ -71,46 +68,6 @@ def handle_lint_complete(lint_ctx, lint_args, failed=False):
return 1 if failed else 0


def lint_dois(tool_xml, lint_ctx):
"""Find referenced DOIs and check they have valid with https://doi.org."""
dois = find_dois_for_xml(tool_xml)
for publication in dois:
is_doi(publication, lint_ctx)


def find_dois_for_xml(tool_xml):
dois = []
for element in tool_xml.getroot().findall("citations"):
for citation in list(element):
if citation.tag == "citation" and citation.attrib.get("type", "") == "doi":
dois.append(citation.text)
return dois


def is_doi(publication_id, lint_ctx):
"""Check if dx.doi knows about the ``publication_id``."""
base_url = "https://doi.org"
if publication_id is None:
lint_ctx.error("Empty DOI citation")
return
publication_id = publication_id.strip()
doiless_publication_id = publication_id.split("doi:", 1)[-1]
if not doiless_publication_id:
lint_ctx.error("Empty DOI citation")
return
url = f"{base_url}/{doiless_publication_id}"
r = requests.get(url)
if r.status_code == 200:
if publication_id != doiless_publication_id:
lint_ctx.error("%s is valid, but Galaxy expects DOI without 'doi:' prefix" % publication_id)
else:
lint_ctx.info("%s is a valid DOI" % publication_id)
elif r.status_code == 404:
lint_ctx.error("%s is not a valid DOI" % publication_id)
else:
lint_ctx.warn("dx.doi returned unexpected status code %d" % r.status_code)


def lint_xsd(lint_ctx, schema_path, path):
"""Lint XML at specified path with supplied schema."""
name = lint_ctx.object_name or os.path.basename(path)
Expand All @@ -124,55 +81,8 @@ def lint_xsd(lint_ctx, schema_path, path):
lint_ctx.info("File validates against XML schema.")


def lint_urls(root, lint_ctx):
"""Find referenced URLs and verify they are valid."""
urls, docs = find_urls_for_xml(root)

# This is from Google Chome on macOS, current at time of writing:
BROWSER_USER_AGENT = "Mozilla/5.0 (Macintosh; Intel Mac OS X 11_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.141 Safari/537.36"

def validate_url(url, lint_ctx, user_agent=None):
is_valid = True
if url.startswith("http://") or url.startswith("https://"):
if user_agent:
headers = {"User-Agent": user_agent, "Accept": "*/*"}
else:
headers = None
r = None
try:
r = requests.get(url, headers=headers, stream=True)
r.raise_for_status()
next(r.iter_content(1000))
except Exception as e:
if r is not None and r.status_code == 429:
# too many requests
pass
if r is not None and r.status_code in [403, 503] and "cloudflare" in r.text:
# CloudFlare protection block
pass
else:
is_valid = False
lint_ctx.error(f"Error '{e}' accessing {url}")
else:
try:
with urlopen(url) as handle:
handle.read(100)
except Exception as e:
is_valid = False
lint_ctx.error(f"Error '{e}' accessing {url}")
if is_valid:
lint_ctx.info("URL OK %s" % url)

for url in urls:
validate_url(url, lint_ctx)
for url in docs:
validate_url(url, lint_ctx, BROWSER_USER_AGENT)


__all__ = (
"build_lint_args",
"handle_lint_complete",
"lint_dois",
"lint_urls",
"lint_xsd",
)
50 changes: 36 additions & 14 deletions planemo/linters/biocontainer_registered.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,21 @@
"""Ensure best-practice biocontainer registered for this tool."""

from typing import (
List,
Optional,
TYPE_CHECKING,
)

from galaxy.tool_util.deps.container_resolvers.mulled import targets_to_mulled_name
from galaxy.tool_util.deps.mulled.util import build_target
from galaxy.tool_util.deps.mulled.mulled_build_tool import requirements_to_mulled_targets
from galaxy.tool_util.lint import Linter

from .util import xml_node_from_toolsource

from planemo.conda import tool_source_conda_targets
if TYPE_CHECKING:
from galaxy.tool_util.deps.conda_util import CondaTarget
from galaxy.tool_util.lint import LintContext
from galaxy.tool_util.parser.interface import ToolSource

MESSAGE_WARN_NO_REQUIREMENTS = "No valid package requirement tags found to infer BioContainer from."
MESSAGE_WARN_NO_CONTAINER = "Failed to find a BioContainer registered for these requirements."
Expand All @@ -12,21 +24,31 @@
lint_tool_types = ["*"]


def lint_biocontainer_registered(tool_source, lint_ctx):
conda_targets = tool_source_conda_targets(tool_source)
if not conda_targets:
lint_ctx.warn(MESSAGE_WARN_NO_REQUIREMENTS)
return
class BiocontainerValid(Linter):
@classmethod
def lint(cls, tool_source: "ToolSource", lint_ctx: "LintContext"):
requirements, *_ = tool_source.parse_requirements_and_containers()
targets = requirements_to_mulled_targets(requirements)
name = mulled_container_name("biocontainers", targets)
if name:
requirements_node = xml_node_from_toolsource(tool_source, "requirements")
lint_ctx.info(MESSAGE_INFO_FOUND_BIOCONTAINER % name, linter=cls.name(), node=requirements_node)

mulled_targets = [build_target(c.package, c.version) for c in conda_targets]
name = mulled_container_name("biocontainers", mulled_targets)
if name:
lint_ctx.info(MESSAGE_INFO_FOUND_BIOCONTAINER % name)
else:
lint_ctx.warn(MESSAGE_WARN_NO_CONTAINER)

class BiocontainerMissing(Linter):
@classmethod
def lint(cls, tool_source: "ToolSource", lint_ctx: "LintContext"):
requirements, *_ = tool_source.parse_requirements_and_containers()
targets = requirements_to_mulled_targets(requirements)
name = mulled_container_name("biocontainers", targets)
if not name:
requirements_node = xml_node_from_toolsource(tool_source, "requirements")
lint_ctx.warn(MESSAGE_WARN_NO_CONTAINER, linter=cls.name(), node=requirements_node)


def mulled_container_name(namespace, targets):
def mulled_container_name(namespace: str, targets: List["CondaTarget"]) -> Optional[str]:
name = targets_to_mulled_name(targets=targets, hash_func="v2", namespace=namespace)
if name:
return f"quay.io/{namespace}/{name}"
else:
return None
89 changes: 62 additions & 27 deletions planemo/linters/conda_requirements.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,72 @@
"""Ensure requirements are matched in best practice conda channels."""

from typing import (
Generator,
TYPE_CHECKING,
)

from galaxy.tool_util.deps.conda_util import requirement_to_conda_targets
from galaxy.tool_util.lint import Linter

from planemo.conda import (
BEST_PRACTICE_CHANNELS,
best_practice_search,
tool_source_conda_targets,
)
from .util import xml_node_from_toolsource

if TYPE_CHECKING:
from galaxy.tool_util.deps.conda_util import CondaTarget
from galaxy.tool_util.lint import LintContext
from galaxy.tool_util.parser.interface import ToolSource

lint_tool_types = ["*"]


def lint_requirements_in_conda(tool_source, lint_ctx):
"""Check requirements of tool source against best practice Conda channels."""
conda_targets = tool_source_conda_targets(tool_source)
if not conda_targets:
lint_ctx.warn("No valid package requirement tags found to check against Conda.")
return

for conda_target in conda_targets:
(best_hit, exact) = best_practice_search(conda_target)
conda_target_str = conda_target.package
if conda_target.version:
conda_target_str += "@%s" % (conda_target.version)
if best_hit and exact:
template = "Requirement [%s] matches target in best practice Conda channel [%s]."
message = template % (conda_target_str, best_hit.get("channel"))
lint_ctx.info(message)
elif best_hit:
template = (
"Requirement [%s] doesn't exactly match available version [%s] in best practice Conda channel [%s]."
)
message = template % (conda_target_str, best_hit["version"], best_hit.get("channel"))
lint_ctx.warn(message)
else:
template = "Requirement [%s] doesn't match any recipe in a best practice conda channel [%s]."
message = template % (conda_target_str, BEST_PRACTICE_CHANNELS)
lint_ctx.warn(message)
class CondaRequirementValid(Linter):
@classmethod
def lint(cls, tool_source: "ToolSource", lint_ctx: "LintContext"):
for conda_target in _requirements_conda_targets(tool_source):
(best_hit, exact) = best_practice_search(conda_target)
conda_target_str = conda_target.package
if conda_target.version:
conda_target_str += "@%s" % (conda_target.version)
if best_hit and exact:
message = f"Requirement [{conda_target_str}] matches target in best practice Conda channel [{best_hit.get('channel')}]."
requirements_node = xml_node_from_toolsource(tool_source, "requirements")
lint_ctx.info(message, linter=cls.name(), node=requirements_node)


class CondaRequirementInexact(Linter):
@classmethod
def lint(cls, tool_source: "ToolSource", lint_ctx: "LintContext"):
for conda_target in _requirements_conda_targets(tool_source):
(best_hit, exact) = best_practice_search(conda_target)
conda_target_str = conda_target.package
if conda_target.version:
conda_target_str += "@%s" % (conda_target.version)
if best_hit and not exact:
message = f"Requirement [{conda_target_str}] doesn't exactly match available version [{best_hit['version']}] in best practice Conda channel [{best_hit.get('channel')}]."
requirements_node = xml_node_from_toolsource(tool_source, "requirements")
lint_ctx.warn(message, linter=cls.name(), node=requirements_node)


class CondaRequirementMissing(Linter):
@classmethod
def lint(cls, tool_source: "ToolSource", lint_ctx: "LintContext"):
for conda_target in _requirements_conda_targets(tool_source):
(best_hit, exact) = best_practice_search(conda_target)
conda_target_str = conda_target.package
if conda_target.version:
conda_target_str += "@%s" % (conda_target.version)
if best_hit and not exact:
message = f"Requirement [{conda_target_str}] doesn't match any recipe in a best practice conda channel ['{BEST_PRACTICE_CHANNELS}']."
requirements_node = xml_node_from_toolsource(tool_source, "requirements")
lint_ctx.warn(message, linter=cls.name(), node=requirements_node)


def _requirements_conda_targets(tool_source: "ToolSource") -> Generator["CondaTarget", None, None]:
requirements, *_ = tool_source.parse_requirements_and_containers()
for requirement in requirements:
conda_target = requirement_to_conda_targets(requirement)
if conda_target:
yield conda_target
Loading
Loading