Skip to content

Commit

Permalink
Implement running tests against exisiting invocation
Browse files Browse the repository at this point in the history
Addresses #1290
  • Loading branch information
mvdbeek committed Oct 30, 2023
1 parent ed3f4b0 commit 18ce0b1
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 44 deletions.
1 change: 1 addition & 0 deletions docs/commands.rst
Original file line number Diff line number Diff line change
Expand Up @@ -67,5 +67,6 @@ documentation describes these commands.
.. include:: commands/workflow_edit.rst
.. include:: commands/workflow_job_init.rst
.. include:: commands/workflow_lint.rst
.. include:: commands/workflow_test_check.rst
.. include:: commands/workflow_test_init.rst
.. include:: commands/workflow_upload.rst
8 changes: 8 additions & 0 deletions docs/planemo.commands.rst
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,14 @@ planemo.commands.cmd\_workflow\_lint module
:undoc-members:
:show-inheritance:

planemo.commands.cmd\_workflow\_test\_check module
--------------------------------------------------

.. automodule:: planemo.commands.cmd_workflow_test_check
:members:
:undoc-members:
:show-inheritance:

planemo.commands.cmd\_workflow\_test\_init module
-------------------------------------------------

Expand Down
93 changes: 59 additions & 34 deletions planemo/galaxy/activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ def _execute( # noqa C901
) -> "GalaxyBaseRunResponse":
user_gi = config.user_gi
admin_gi = config.gi
run_response = None

start_datetime = datetime.now()
try:
Expand Down Expand Up @@ -205,46 +206,31 @@ def _execute( # noqa C901
allow_tool_state_corrections=True,
inputs_by="name",
)
invocation_id = invocation["id"]

ctx.vlog("Waiting for invocation [%s]" % invocation_id)
polling_backoff = kwds.get("polling_backoff", 0)
no_wait = kwds.get("no_wait", False)

final_invocation_state, job_state, error_message = wait_for_invocation_and_jobs(
run_response = invocation_to_run_response(
ctx,
invocation_id=invocation_id,
history_id=history_id,
user_gi=user_gi,
no_wait=no_wait,
polling_backoff=polling_backoff,
config.user_gi,
runnable,
invocation,
polling_backoff=kwds.get("polling_backoff", 0),
no_wait=kwds.get("no_wait", False),
start_datetime=start_datetime,
log=log_contents_str(config),
)
if final_invocation_state not in ("ok", "skipped"):
msg = f"Failed to run workflow [{workflow_id}], at least one job is in [{final_invocation_state}] state."
ctx.vlog(msg)
summarize_history(ctx, user_gi, history_id)

response_kwds = {
"workflow_id": workflow_id,
"invocation_id": invocation_id,
"history_state": job_state if not no_wait else None,
"invocation_state": final_invocation_state,
"error_message": error_message,
}

else:
raise NotImplementedError()

run_response = response_class(
ctx=ctx,
runnable=runnable,
user_gi=user_gi,
history_id=history_id,
log=log_contents_str(config),
start_datetime=start_datetime,
end_datetime=datetime.now(),
**response_kwds,
)
if not run_response:
run_response = response_class(
ctx=ctx,
runnable=runnable,
user_gi=user_gi,
history_id=history_id,
log=log_contents_str(config),
start_datetime=start_datetime,
end_datetime=datetime.now(),
**response_kwds,
)
if kwds.get("download_outputs"):
output_directory = kwds.get("output_directory", None)
ctx.vlog("collecting outputs from run...")
Expand All @@ -253,6 +239,45 @@ def _execute( # noqa C901
return run_response


def invocation_to_run_response(
ctx, user_gi, runnable, invocation, polling_backoff=0, no_wait=False, start_datetime=None, log=None
):
start_datetime = start_datetime or datetime.now()
invocation_id = invocation["id"]
history_id = invocation["history_id"]
workflow_id = invocation["workflow_id"]

ctx.vlog("Waiting for invocation [%s]" % invocation_id)

final_invocation_state, job_state, error_message = wait_for_invocation_and_jobs(
ctx,
invocation_id=invocation_id,
history_id=history_id,
user_gi=user_gi,
no_wait=no_wait,
polling_backoff=polling_backoff,
)
if final_invocation_state not in ("ok", "skipped"):
msg = f"Failed to run workflow [{workflow_id}], at least one job is in [{final_invocation_state}] state."
ctx.vlog(msg)
summarize_history(ctx, user_gi, history_id)

return GalaxyWorkflowRunResponse(
ctx,
runnable=runnable,
user_gi=user_gi,
history_id=history_id,
workflow_id=workflow_id,
invocation_id=invocation_id,
history_state=job_state if not no_wait else None,
invocation_state=final_invocation_state,
error_message=error_message,
log=log,
start_datetime=start_datetime,
end_datetime=datetime.now(),
)


def stage_in(
ctx: "PlanemoCliContext", runnable: Runnable, config: "BaseGalaxyConfig", job_path: str, **kwds
) -> Tuple[Dict[str, Any], str]:
Expand Down
15 changes: 10 additions & 5 deletions planemo/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -467,12 +467,13 @@ def docker_extra_volume_option():
)


def galaxy_url_option():
def galaxy_url_option(required: bool = False):
return planemo_option(
"--galaxy_url",
use_global_config=True,
extra_global_config_vars=["galaxy_url"],
use_env_var=True,
required=required,
type=str,
help="Remote Galaxy URL to use with external Galaxy engine.",
)
Expand All @@ -489,12 +490,13 @@ def galaxy_admin_key_option():
)


def galaxy_user_key_option():
def galaxy_user_key_option(required: bool = False):
return planemo_option(
"--galaxy_user_key",
use_global_config=True,
extra_global_config_vars=["admin_key"],
use_env_var=True,
required=required,
type=str,
help="User key to use with external Galaxy engine.",
)
Expand Down Expand Up @@ -824,7 +826,7 @@ def convert(self, value, param, ctx):
return super().convert(value, param, ctx)


def optional_tools_arg(multiple=False, allow_uris=False):
def optional_tools_arg(multiple=False, allow_uris=False, metavar="TOOL_PATH"):
"""Decorate click method as optionally taking in the path to a tool
or directory of tools. If no such argument is given the current working
directory will be treated as a directory of tools.
Expand All @@ -844,7 +846,7 @@ def optional_tools_arg(multiple=False, allow_uris=False):
nargs = -1 if multiple else 1
return click.argument(
name,
metavar="TOOL_PATH",
metavar=metavar,
type=arg_type,
nargs=nargs,
callback=_optional_tools_default,
Expand Down Expand Up @@ -1599,6 +1601,10 @@ def profile_database_options():
)


def test_index_option():
return planemo_option("--test_index", default=1, type=int, help="Select which test to run. Counting starts at 1")


def test_options():
return _compose(
planemo_option(
Expand All @@ -1607,7 +1613,6 @@ def test_options():
help="Update test-data directory with job outputs (normally"
" written to directory --job_output_files if specified.)",
),
paste_test_data_paths_option(),
test_report_options(),
planemo_option(
"--test_output_json",
Expand Down
13 changes: 8 additions & 5 deletions planemo/runnable.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,12 @@ def cases(runnable: Runnable) -> List["AbstractTestCase"]:
cases.append(ExternalGalaxyToolTestCase(runnable, tool_id, tool_version, i, test_dict))
return cases

return definition_to_test_case(tests_path=tests_path, runnable=runnable)


def definition_to_test_case(tests_path, runnable):
with open(tests_path) as f:
tests_def = yaml.safe_load(f)
tests_directory = os.path.abspath(os.path.dirname(tests_path))

def normalize_to_tests_path(path: str) -> str:
Expand All @@ -274,13 +280,11 @@ def normalize_to_tests_path(path: str) -> str:
absolute_path = path
return os.path.normpath(absolute_path)

with open(tests_path) as f:
tests_def = yaml.safe_load(f)

if not isinstance(tests_def, list):
message = TEST_FILE_NOT_LIST_MESSAGE % tests_path
raise Exception(message)

cases = []
for i, test_def in enumerate(tests_def):
if "job" not in test_def:
message = TEST_FIELD_MISSING_MESSAGE % (i + 1, tests_path, "job")
Expand All @@ -305,7 +309,6 @@ def normalize_to_tests_path(path: str) -> str:
doc=doc,
)
cases.append(case)

return cases


Expand Down Expand Up @@ -434,7 +437,7 @@ def _test_id(self) -> str:
]:
return get_tool_source(self.runnable.path).parse_id()
else:
return os.path.basename(self.runnable.path)
return os.path.basename(self.runnable.uri)


class ExternalGalaxyToolTestCase(AbstractTestCase):
Expand Down
15 changes: 15 additions & 0 deletions tests/test_cmds_with_workflow_id.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,18 @@ def test_serve_workflow(self):
with open(output_json_path) as f:
output = json.load(f)
assert "tests" in output
test_index = 1
invocation_id = output["tests"][test_index]["data"]["invocation_details"]["details"]["invocation_id"]
test_path = os.path.join(TEST_DATA_DIR, "wf11-remote.gxwf-test.yml")
workflow_test_check_command = [
"workflow_test_check",
"--galaxy_url",
f"http://localhost:{self._port}",
"--galaxy_user_key",
api.DEFAULT_ADMIN_API_KEY,
"--test_index",
str(test_index),
test_path,
invocation_id,
]
self._check_exit_code(workflow_test_check_command, exit_code=0)

0 comments on commit 18ce0b1

Please sign in to comment.