Skip to content

Commit

Permalink
Tool Request API.
Browse files Browse the repository at this point in the history
  • Loading branch information
jmchilton committed Sep 8, 2024
1 parent e134910 commit 30ddcdd
Show file tree
Hide file tree
Showing 34 changed files with 1,377 additions and 163 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/framework_tools.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ jobs:
strategy:
matrix:
python-version: ['3.8']
use-legacy-api: ['if_needed', 'always']
services:
postgres:
image: postgres:13
Expand Down Expand Up @@ -66,7 +67,7 @@ jobs:
path: 'galaxy root/.venv'
key: gxy-venv-${{ runner.os }}-${{ steps.full-python-version.outputs.version }}-${{ hashFiles('galaxy root/requirements.txt') }}-framework-tools
- name: Run tests
run: ./run_tests.sh --coverage --framework-tools
run: GALAXY_TEST_USE_LEGACY_TOOL_API="${{ matrix.use-legacy-api }}" ./run_tests.sh --coverage --framework-tools
working-directory: 'galaxy root'
- uses: codecov/codecov-action@v3
with:
Expand Down
9 changes: 4 additions & 5 deletions lib/galaxy/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -672,6 +672,10 @@ def __init__(self, configure_logging=True, use_converters=True, use_display_appl
self._register_singleton(Registry, self.datatypes_registry)
galaxy.model.set_datatypes_registry(self.datatypes_registry)
self.configure_sentry_client()
# Load dbkey / genome build manager
self._configure_genome_builds(data_table_name="__dbkeys__", load_old_style=True)
# Tool Data Tables
self._configure_tool_data_tables(from_shed_config=False)

self._configure_tool_shed_registry()
self._register_singleton(tool_shed_registry.Registry, self.tool_shed_registry)
Expand Down Expand Up @@ -750,11 +754,6 @@ def __init__(self, **kwargs) -> None:
)
self.api_keys_manager = self._register_singleton(ApiKeyManager)

# Tool Data Tables
self._configure_tool_data_tables(from_shed_config=False)
# Load dbkey / genome build manager
self._configure_genome_builds(data_table_name="__dbkeys__", load_old_style=True)

# Genomes
self.genomes = self._register_singleton(Genomes)
# Data providers registry.
Expand Down
17 changes: 15 additions & 2 deletions lib/galaxy/celery/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
DatasetManager,
)
from galaxy.managers.hdas import HDAManager
from galaxy.managers.jobs import JobSubmitter
from galaxy.managers.lddas import LDDAManager
from galaxy.managers.markdown_util import generate_branded_pdf
from galaxy.managers.model_stores import ModelStoreManager
Expand All @@ -54,6 +55,7 @@
MaterializeDatasetInstanceTaskRequest,
PrepareDatasetCollectionDownload,
PurgeDatasetsTaskRequest,
QueueJobs,
SetupHistoryExportJob,
WriteHistoryContentTo,
WriteHistoryTo,
Expand All @@ -75,9 +77,9 @@ def setup_data_table_manager(app):


@lru_cache
def cached_create_tool_from_representation(app, raw_tool_source):
def cached_create_tool_from_representation(app, raw_tool_source, tool_dir=""):
return create_tool_from_representation(
app=app, raw_tool_source=raw_tool_source, tool_dir="", tool_source_class="XmlToolSource"
app=app, raw_tool_source=raw_tool_source, tool_dir=tool_dir, tool_source_class="XmlToolSource"
)


Expand Down Expand Up @@ -335,6 +337,17 @@ def fetch_data(
return abort_when_job_stops(_fetch_data, session=sa_session, job_id=job_id, setup_return=setup_return)


@galaxy_task(action="queuing up submitted jobs")
def queue_jobs(request: QueueJobs, app: MinimalManagerApp, job_submitter: JobSubmitter):
tool = cached_create_tool_from_representation(
app, request.tool_source.raw_tool_source, tool_dir=request.tool_source.tool_dir
)
job_submitter.queue_jobs(
tool,
request,
)


@galaxy_task(ignore_result=True, action="setting up export history job")
def export_history(
model_store_manager: ModelStoreManager,
Expand Down
63 changes: 62 additions & 1 deletion lib/galaxy/managers/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,15 @@
)
from galaxy.managers.datasets import DatasetManager
from galaxy.managers.hdas import HDAManager
from galaxy.managers.histories import HistoryManager
from galaxy.managers.lddas import LDDAManager
from galaxy.managers.users import UserManager
from galaxy.model import (
ImplicitCollectionJobs,
ImplicitCollectionJobsJobAssociation,
Job,
JobParameter,
ToolRequest,
User,
Workflow,
WorkflowInvocation,
Expand All @@ -71,8 +74,13 @@
JobIndexQueryPayload,
JobIndexSortByEnum,
)
from galaxy.schema.tasks import QueueJobs
from galaxy.security.idencoding import IdEncodingHelper
from galaxy.structured_app import StructuredApp
from galaxy.structured_app import (
MinimalManagerApp,
StructuredApp,
)
from galaxy.tools import Tool
from galaxy.tools._types import (
ToolStateDumpedToJsonInternalT,
ToolStateJobInstancePopulatedT,
Expand All @@ -87,6 +95,7 @@
parse_filters_structured,
RawTextTerm,
)
from galaxy.work.context import WorkRequestContext

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -135,6 +144,8 @@ def index_query(self, trans: ProvidesUserContext, payload: JobIndexQueryPayload)
workflow_id = payload.workflow_id
invocation_id = payload.invocation_id
implicit_collection_jobs_id = payload.implicit_collection_jobs_id
tool_request_id = payload.tool_request_id

search = payload.search
order_by = payload.order_by

Expand All @@ -151,6 +162,7 @@ def build_and_apply_filters(stmt, objects, filter_func):

def add_workflow_jobs():
wfi_step = select(WorkflowInvocationStep)

if workflow_id is not None:
wfi_step = (
wfi_step.join(WorkflowInvocation).join(Workflow).where(Workflow.stored_workflow_id == workflow_id)
Expand All @@ -165,6 +177,7 @@ def add_workflow_jobs():
ImplicitCollectionJobsJobAssociation.implicit_collection_jobs_id
== wfi_step_sq.c.implicit_collection_jobs_id,
)

# Ensure the result is models, not tuples
sq = stmt1.union(stmt2).subquery()
# SQLite won't recognize Job.foo as a valid column for the ORDER BY clause due to the UNION clause, so we'll use the subquery `columns` collection (`sq.c`).
Expand Down Expand Up @@ -242,6 +255,9 @@ def add_search_criteria(stmt):
if history_id is not None:
stmt = stmt.where(Job.history_id == history_id)

if tool_request_id is not None:
stmt = stmt.filter(model.Job.tool_request_id == tool_request_id)

order_by_columns = Job
if workflow_id or invocation_id:
stmt, order_by_columns = add_workflow_jobs()
Expand Down Expand Up @@ -1152,3 +1168,48 @@ def get_jobs_to_check_at_startup(session: galaxy_scoped_session, track_jobs_in_d
def get_job(session, *where_clauses):
stmt = select(Job).where(*where_clauses).limit(1)
return session.scalars(stmt).first()


class JobSubmitter:
def __init__(
self,
history_manager: HistoryManager,
user_manager: UserManager,
app: MinimalManagerApp,
):
self.history_manager = history_manager
self.user_manager = user_manager
self.app = app

def queue_jobs(self, tool: Tool, request: QueueJobs) -> None:
user = self.user_manager.by_id(request.user.user_id)
sa_session = self.app.model.context
tool_request: ToolRequest = cast(ToolRequest, sa_session.query(ToolRequest).get(request.tool_request_id))
if tool_request is None:
raise Exception(f"Problem fetching request with ID {request.tool_request_id}")
try:
target_history = tool_request.history
use_cached_jobs = request.use_cached_jobs
rerun_remap_job_id = request.rerun_remap_job_id
trans = WorkRequestContext(
self.app,
user,
history=target_history,
)
tool.handle_input_async(
trans,
tool_request,
history=target_history,
use_cached_job=use_cached_jobs,
rerun_remap_job_id=rerun_remap_job_id,
)
tool_request.state = ToolRequest.states.SUBMITTED
sa_session.add(tool_request)
with transaction(sa_session):
sa_session.commit()
except Exception as e:
tool_request.state = ToolRequest.states.FAILED
tool_request.state_message = str(e)
sa_session.add(tool_request)
with transaction(sa_session):
sa_session.commit()
28 changes: 28 additions & 0 deletions lib/galaxy/model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@
DatasetValidatedState,
InvocationsStateCounts,
JobState,
ToolRequestState,
)
from galaxy.schema.workflow.comments import WorkflowCommentModel
from galaxy.security import get_permitted_actions
Expand Down Expand Up @@ -1336,6 +1337,30 @@ def __init__(self, user, token=None):
self.expiration_time = now() + timedelta(hours=24)


class ToolSource(Base, Dictifiable, RepresentById):
__tablename__ = "tool_source"

id: Mapped[int] = mapped_column(primary_key=True)
hash: Mapped[Optional[str]] = mapped_column(Unicode(255))
source: Mapped[dict] = mapped_column(JSONType)


class ToolRequest(Base, Dictifiable, RepresentById):
__tablename__ = "tool_request"

states: TypeAlias = ToolRequestState

id: Mapped[int] = mapped_column(primary_key=True)
tool_source_id: Mapped[int] = mapped_column(ForeignKey("tool_source.id"), index=True)
history_id: Mapped[Optional[int]] = mapped_column(ForeignKey("history.id"), index=True)
request: Mapped[dict] = mapped_column(JSONType)
state: Mapped[Optional[str]] = mapped_column(TrimmedString(32), index=True)
state_message: Mapped[Optional[str]] = mapped_column(JSONType, index=True)

tool_source: Mapped["ToolSource"] = relationship()
history: Mapped[Optional["History"]] = relationship(back_populates="tool_requests")


class DynamicTool(Base, Dictifiable, RepresentById):
__tablename__ = "dynamic_tool"

Expand Down Expand Up @@ -1462,7 +1487,9 @@ class Job(Base, JobLike, UsesCreateAndUpdateTime, Dictifiable, Serializable):
handler: Mapped[Optional[str]] = mapped_column(TrimmedString(255), index=True)
preferred_object_store_id: Mapped[Optional[str]] = mapped_column(String(255))
object_store_id_overrides: Mapped[Optional[STR_TO_STR_DICT]] = mapped_column(JSONType)
tool_request_id: Mapped[Optional[int]] = mapped_column(ForeignKey("tool_request.id"), index=True)

tool_request: Mapped[Optional["ToolRequest"]] = relationship()
user: Mapped[Optional["User"]] = relationship()
galaxy_session: Mapped[Optional["GalaxySession"]] = relationship()
history: Mapped[Optional["History"]] = relationship(back_populates="jobs")
Expand Down Expand Up @@ -3185,6 +3212,7 @@ class History(Base, HasTags, Dictifiable, UsesAnnotations, HasName, Serializable
)
user: Mapped[Optional["User"]] = relationship(back_populates="histories")
jobs: Mapped[List["Job"]] = relationship(back_populates="history", cascade_backrefs=False)
tool_requests: Mapped[List["ToolRequest"]] = relationship(back_populates="history")

update_time = column_property(
select(func.max(HistoryAudit.update_time)).where(HistoryAudit.history_id == id).scalar_subquery(),
Expand Down
13 changes: 13 additions & 0 deletions lib/galaxy/schema/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,19 @@ class JobOutputAssociation(JobAssociation):
)


class JobOutputCollectionAssociation(Model):
name: str = Field(
default=...,
title="name",
description="Name of the job parameter.",
)
dataset_collection_instance: EncodedDataItemSourceId = Field(
default=...,
title="dataset_collection_instance",
description="Reference to the associated item.",
)


class ReportJobErrorPayload(Model):
dataset_id: DecodedDatabaseIdField = Field(
default=...,
Expand Down
17 changes: 17 additions & 0 deletions lib/galaxy/schema/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -1533,6 +1533,7 @@ class JobIndexQueryPayload(Model):
workflow_id: Optional[DecodedDatabaseIdField] = None
invocation_id: Optional[DecodedDatabaseIdField] = None
implicit_collection_jobs_id: Optional[DecodedDatabaseIdField] = None
tool_request_id: Optional[DecodedDatabaseIdField] = None
order_by: JobIndexSortByEnum = JobIndexSortByEnum.update_time
search: Optional[str] = None
limit: int = 500
Expand Down Expand Up @@ -3732,6 +3733,22 @@ class AsyncTaskResultSummary(Model):
)


ToolRequestIdField = Field(title="ID", description="Encoded ID of the role")


class ToolRequestState(str, Enum):
NEW = "new"
SUBMITTED = "submitted"
FAILED = "failed"


class ToolRequestModel(Model):
id: EncodedDatabaseIdField = ToolRequestIdField
request: Dict[str, Any]
state: ToolRequestState
state_message: Optional[str]


class AsyncFile(Model):
storage_request_id: UUID
task: AsyncTaskResultSummary
Expand Down
13 changes: 13 additions & 0 deletions lib/galaxy/schema/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,3 +119,16 @@ class ComputeDatasetHashTaskRequest(Model):

class PurgeDatasetsTaskRequest(Model):
dataset_ids: List[int]


class ToolSource(Model):
raw_tool_source: str
tool_dir: str


class QueueJobs(Model):
tool_source: ToolSource
tool_request_id: int # links to request ("incoming") and history
user: RequestUser # TODO: test anonymous users through this submission path
use_cached_jobs: bool
rerun_remap_job_id: Optional[int] # link to a job to rerun & remap
6 changes: 6 additions & 0 deletions lib/galaxy/tool_util/parameters/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from .convert import (
decode,
encode,
encode_test,
)
from .factory import (
from_input_source,
Expand All @@ -26,7 +27,9 @@
CwlStringParameterModel,
CwlUnionParameterModel,
DataCollectionParameterModel,
DataCollectionRequest,
DataParameterModel,
DataRequest,
FloatParameterModel,
HiddenParameterModel,
IntegerParameterModel,
Expand Down Expand Up @@ -75,6 +78,8 @@
"JobInternalToolState",
"ToolParameterBundle",
"ToolParameterBundleModel",
"DataRequest",
"DataCollectionRequest",
"ToolParameterModel",
"IntegerParameterModel",
"BooleanParameterModel",
Expand Down Expand Up @@ -120,6 +125,7 @@
"VISITOR_NO_REPLACEMENT",
"decode",
"encode",
"encode_test",
"WorkflowStepToolState",
"WorkflowStepLinkedToolState",
)
3 changes: 3 additions & 0 deletions lib/galaxy/tool_util/parameters/case.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,11 +286,14 @@ def _select_which_when(
conditional: ConditionalParameterModel, state: dict, inputs: ToolSourceTestInputs, prefix: str
) -> ConditionalWhen:
test_parameter = conditional.test_parameter
is_boolean = test_parameter.parameter_type == "gx_boolean"
test_parameter_name = test_parameter.name
test_parameter_flat_path = flat_state_path(test_parameter_name, prefix)

test_input = _input_for(test_parameter_flat_path, inputs)
explicit_test_value = test_input["value"] if test_input else None
if is_boolean and isinstance(explicit_test_value, str):
explicit_test_value = asbool(explicit_test_value)
test_value = validate_explicit_conditional_test_value(test_parameter_name, explicit_test_value)
for when in conditional.whens:
if test_value is None and when.is_default_when:
Expand Down
Loading

0 comments on commit 30ddcdd

Please sign in to comment.