Skip to content

Commit

Permalink
Tool Request API.
Browse files Browse the repository at this point in the history
  • Loading branch information
jmchilton committed Sep 2, 2024
1 parent bfbca37 commit 24508e7
Show file tree
Hide file tree
Showing 30 changed files with 1,164 additions and 130 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/framework_tools.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ jobs:
strategy:
matrix:
python-version: ['3.8']
use-legacy-api: ['if_needed', 'always']
services:
postgres:
image: postgres:13
Expand Down Expand Up @@ -66,7 +67,7 @@ jobs:
path: 'galaxy root/.venv'
key: gxy-venv-${{ runner.os }}-${{ steps.full-python-version.outputs.version }}-${{ hashFiles('galaxy root/requirements.txt') }}-framework-tools
- name: Run tests
run: ./run_tests.sh --coverage --framework-tools
run: GALAXY_TEST_USE_LEGACY_TOOL_API="${{ matrix.use-legacy-api }}" ./run_tests.sh --coverage --framework-tools
working-directory: 'galaxy root'
- uses: codecov/codecov-action@v3
with:
Expand Down
9 changes: 4 additions & 5 deletions lib/galaxy/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -672,6 +672,10 @@ def __init__(self, configure_logging=True, use_converters=True, use_display_appl
self._register_singleton(Registry, self.datatypes_registry)
galaxy.model.set_datatypes_registry(self.datatypes_registry)
self.configure_sentry_client()
# Load dbkey / genome build manager
self._configure_genome_builds(data_table_name="__dbkeys__", load_old_style=True)
# Tool Data Tables
self._configure_tool_data_tables(from_shed_config=False)

self._configure_tool_shed_registry()
self._register_singleton(tool_shed_registry.Registry, self.tool_shed_registry)
Expand Down Expand Up @@ -750,11 +754,6 @@ def __init__(self, **kwargs) -> None:
)
self.api_keys_manager = self._register_singleton(ApiKeyManager)

# Tool Data Tables
self._configure_tool_data_tables(from_shed_config=False)
# Load dbkey / genome build manager
self._configure_genome_builds(data_table_name="__dbkeys__", load_old_style=True)

# Genomes
self.genomes = self._register_singleton(Genomes)
# Data providers registry.
Expand Down
17 changes: 15 additions & 2 deletions lib/galaxy/celery/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
DatasetManager,
)
from galaxy.managers.hdas import HDAManager
from galaxy.managers.jobs import JobSubmitter
from galaxy.managers.lddas import LDDAManager
from galaxy.managers.markdown_util import generate_branded_pdf
from galaxy.managers.model_stores import ModelStoreManager
Expand All @@ -54,6 +55,7 @@
MaterializeDatasetInstanceTaskRequest,
PrepareDatasetCollectionDownload,
PurgeDatasetsTaskRequest,
QueueJobs,
SetupHistoryExportJob,
WriteHistoryContentTo,
WriteHistoryTo,
Expand All @@ -75,9 +77,9 @@ def setup_data_table_manager(app):


@lru_cache
def cached_create_tool_from_representation(app, raw_tool_source):
def cached_create_tool_from_representation(app, raw_tool_source, tool_dir=""):
return create_tool_from_representation(
app=app, raw_tool_source=raw_tool_source, tool_dir="", tool_source_class="XmlToolSource"
app=app, raw_tool_source=raw_tool_source, tool_dir=tool_dir, tool_source_class="XmlToolSource"
)


Expand Down Expand Up @@ -335,6 +337,17 @@ def fetch_data(
return abort_when_job_stops(_fetch_data, session=sa_session, job_id=job_id, setup_return=setup_return)


@galaxy_task(action="queuing up submitted jobs")
def queue_jobs(request: QueueJobs, app: MinimalManagerApp, job_submitter: JobSubmitter):
tool = cached_create_tool_from_representation(
app, request.tool_source.raw_tool_source, tool_dir=request.tool_source.tool_dir
)
job_submitter.queue_jobs(
tool,
request,
)


@galaxy_task(ignore_result=True, action="setting up export history job")
def export_history(
model_store_manager: ModelStoreManager,
Expand Down
63 changes: 62 additions & 1 deletion lib/galaxy/managers/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,15 @@
)
from galaxy.managers.datasets import DatasetManager
from galaxy.managers.hdas import HDAManager
from galaxy.managers.histories import HistoryManager
from galaxy.managers.lddas import LDDAManager
from galaxy.managers.users import UserManager
from galaxy.model import (
ImplicitCollectionJobs,
ImplicitCollectionJobsJobAssociation,
Job,
JobParameter,
ToolRequest,
User,
Workflow,
WorkflowInvocation,
Expand All @@ -70,8 +73,13 @@
JobIndexQueryPayload,
JobIndexSortByEnum,
)
from galaxy.schema.tasks import QueueJobs
from galaxy.security.idencoding import IdEncodingHelper
from galaxy.structured_app import StructuredApp
from galaxy.structured_app import (
MinimalManagerApp,
StructuredApp,
)
from galaxy.tools import Tool
from galaxy.tools._types import (
ToolStateDumpedToJsonInternalT,
ToolStateJobInstancePopulatedT,
Expand All @@ -86,6 +94,7 @@
parse_filters_structured,
RawTextTerm,
)
from galaxy.work.context import WorkRequestContext

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -134,6 +143,8 @@ def index_query(self, trans: ProvidesUserContext, payload: JobIndexQueryPayload)
workflow_id = payload.workflow_id
invocation_id = payload.invocation_id
implicit_collection_jobs_id = payload.implicit_collection_jobs_id
tool_request_id = payload.tool_request_id

search = payload.search
order_by = payload.order_by

Expand All @@ -150,6 +161,7 @@ def build_and_apply_filters(stmt, objects, filter_func):

def add_workflow_jobs():
wfi_step = select(WorkflowInvocationStep)

if workflow_id is not None:
wfi_step = (
wfi_step.join(WorkflowInvocation).join(Workflow).where(Workflow.stored_workflow_id == workflow_id)
Expand All @@ -164,6 +176,7 @@ def add_workflow_jobs():
ImplicitCollectionJobsJobAssociation.implicit_collection_jobs_id
== wfi_step_sq.c.implicit_collection_jobs_id,
)

# Ensure the result is models, not tuples
sq = stmt1.union(stmt2).subquery()
# SQLite won't recognize Job.foo as a valid column for the ORDER BY clause due to the UNION clause, so we'll use the subquery `columns` collection (`sq.c`).
Expand Down Expand Up @@ -241,6 +254,9 @@ def add_search_criteria(stmt):
if history_id is not None:
stmt = stmt.where(Job.history_id == history_id)

if tool_request_id is not None:
stmt = stmt.filter(model.Job.tool_request_id == tool_request_id)

order_by_columns = Job
if workflow_id or invocation_id:
stmt, order_by_columns = add_workflow_jobs()
Expand Down Expand Up @@ -1150,3 +1166,48 @@ def get_jobs_to_check_at_startup(session: galaxy_scoped_session, track_jobs_in_d
def get_job(session, *where_clauses):
stmt = select(Job).where(*where_clauses).limit(1)
return session.scalars(stmt).first()


class JobSubmitter:
def __init__(
self,
history_manager: HistoryManager,
user_manager: UserManager,
app: MinimalManagerApp,
):
self.history_manager = history_manager
self.user_manager = user_manager
self.app = app

def queue_jobs(self, tool: Tool, request: QueueJobs) -> None:
user = self.user_manager.by_id(request.user.user_id)
sa_session = self.app.model.context
tool_request: ToolRequest = cast(ToolRequest, sa_session.query(ToolRequest).get(request.tool_request_id))
if tool_request is None:
raise Exception(f"Problem fetching request with ID {request.tool_request_id}")
try:
target_history = tool_request.history
use_cached_jobs = request.use_cached_jobs
rerun_remap_job_id = request.rerun_remap_job_id
trans = WorkRequestContext(
self.app,
user,
history=target_history,
)
tool.handle_input_async(
trans,
tool_request,
history=target_history,
use_cached_job=use_cached_jobs,
rerun_remap_job_id=rerun_remap_job_id,
)
tool_request.state = ToolRequest.states.SUBMITTED
sa_session.add(tool_request)
with transaction(sa_session):
sa_session.commit()
except Exception as e:
tool_request.state = ToolRequest.states.FAILED
tool_request.state_message = str(e)
sa_session.add(tool_request)
with transaction(sa_session):
sa_session.commit()
28 changes: 28 additions & 0 deletions lib/galaxy/model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@
DatasetValidatedState,
InvocationsStateCounts,
JobState,
ToolRequestState,
)
from galaxy.schema.workflow.comments import WorkflowCommentModel
from galaxy.security import get_permitted_actions
Expand Down Expand Up @@ -1336,6 +1337,30 @@ def __init__(self, user, token=None):
self.expiration_time = now() + timedelta(hours=24)


class ToolSource(Base, Dictifiable, RepresentById):
__tablename__ = "tool_source"

id: Mapped[int] = mapped_column(primary_key=True)
hash: Mapped[Optional[str]] = mapped_column(Unicode(255))
source: Mapped[dict] = mapped_column(JSONType)


class ToolRequest(Base, Dictifiable, RepresentById):
__tablename__ = "tool_request"

states: TypeAlias = ToolRequestState

id: Mapped[int] = mapped_column(primary_key=True)
tool_source_id: Mapped[int] = mapped_column(ForeignKey("tool_source.id"), index=True)
history_id: Mapped[Optional[int]] = mapped_column(ForeignKey("history.id"), index=True)
request: Mapped[dict] = mapped_column(JSONType)
state: Mapped[Optional[str]] = mapped_column(TrimmedString(32), index=True)
state_message: Mapped[Optional[str]] = mapped_column(JSONType, index=True)

tool_source: Mapped["ToolSource"] = relationship()
history: Mapped[Optional["History"]] = relationship(back_populates="tool_requests")


class DynamicTool(Base, Dictifiable, RepresentById):
__tablename__ = "dynamic_tool"

Expand Down Expand Up @@ -1462,7 +1487,9 @@ class Job(Base, JobLike, UsesCreateAndUpdateTime, Dictifiable, Serializable):
handler: Mapped[Optional[str]] = mapped_column(TrimmedString(255), index=True)
preferred_object_store_id: Mapped[Optional[str]] = mapped_column(String(255))
object_store_id_overrides: Mapped[Optional[STR_TO_STR_DICT]] = mapped_column(JSONType)
tool_request_id: Mapped[Optional[int]] = mapped_column(ForeignKey("tool_request.id"), index=True)

tool_request: Mapped[Optional["ToolRequest"]] = relationship()
user: Mapped[Optional["User"]] = relationship()
galaxy_session: Mapped[Optional["GalaxySession"]] = relationship()
history: Mapped[Optional["History"]] = relationship(back_populates="jobs")
Expand Down Expand Up @@ -3185,6 +3212,7 @@ class History(Base, HasTags, Dictifiable, UsesAnnotations, HasName, Serializable
)
user: Mapped[Optional["User"]] = relationship(back_populates="histories")
jobs: Mapped[List["Job"]] = relationship(back_populates="history", cascade_backrefs=False)
tool_requests: Mapped[List["ToolRequest"]] = relationship(back_populates="history")

update_time = column_property(
select(func.max(HistoryAudit.update_time)).where(HistoryAudit.history_id == id).scalar_subquery(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
"""implement structured tool state
Revision ID: 7ffd33d5d144
Revises: eee9229a9765
Create Date: 2022-11-09 15:53:11.451185
"""

from sqlalchemy import (
Column,
ForeignKey,
Integer,
String,
)

from galaxy.model.custom_types import JSONType
from galaxy.model.database_object_names import build_index_name
from galaxy.model.migrations.util import (
_is_sqlite,
add_column,
create_table,
drop_column,
drop_index,
drop_table,
transaction,
)

# revision identifiers, used by Alembic.
revision = "7ffd33d5d144"
down_revision = "eee9229a9765"
branch_labels = None
depends_on = None

job_table_name = "job"
request_column_name = "tool_request_id"
job_request_index_name = build_index_name(job_table_name, request_column_name)


def upgrade():
with transaction():
create_table(
"tool_source",
Column("id", Integer, primary_key=True),
Column("hash", String(255), index=True),
Column("source", JSONType),
)
create_table(
"tool_request",
Column("id", Integer, primary_key=True),
Column("request", JSONType),
Column("state", String(32)),
Column("state_message", JSONType),
Column("tool_source_id", Integer, ForeignKey("tool_source.id"), index=True),
Column("history_id", Integer, ForeignKey("history.id"), index=True),
)
index = not _is_sqlite()
add_column(
job_table_name,
Column(request_column_name, Integer, ForeignKey("tool_request.id"), default=None, index=index),
)


def downgrade():
with transaction():
if not _is_sqlite():
drop_index(job_request_index_name, job_table_name)
drop_column(job_table_name, request_column_name)
drop_table("tool_request")
drop_table("tool_source")
13 changes: 13 additions & 0 deletions lib/galaxy/schema/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,19 @@ class JobOutputAssociation(JobAssociation):
)


class JobOutputCollectionAssociation(Model):
name: str = Field(
default=...,
title="name",
description="Name of the job parameter.",
)
dataset_collection_instance: EncodedDataItemSourceId = Field(
default=...,
title="dataset_collection_instance",
description="Reference to the associated item.",
)


class ReportJobErrorPayload(Model):
dataset_id: DecodedDatabaseIdField = Field(
default=...,
Expand Down
17 changes: 17 additions & 0 deletions lib/galaxy/schema/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -1533,6 +1533,7 @@ class JobIndexQueryPayload(Model):
workflow_id: Optional[DecodedDatabaseIdField] = None
invocation_id: Optional[DecodedDatabaseIdField] = None
implicit_collection_jobs_id: Optional[DecodedDatabaseIdField] = None
tool_request_id: Optional[DecodedDatabaseIdField] = None
order_by: JobIndexSortByEnum = JobIndexSortByEnum.update_time
search: Optional[str] = None
limit: int = 500
Expand Down Expand Up @@ -3732,6 +3733,22 @@ class AsyncTaskResultSummary(Model):
)


ToolRequestIdField = Field(title="ID", description="Encoded ID of the role")


class ToolRequestState(str, Enum):
NEW = "new"
SUBMITTED = "submitted"
FAILED = "failed"


class ToolRequestModel(Model):
id: DecodedDatabaseIdField = ToolRequestIdField
request: Dict[str, Any]
state: ToolRequestState
state_message: Optional[str]


class AsyncFile(Model):
storage_request_id: UUID
task: AsyncTaskResultSummary
Expand Down
Loading

0 comments on commit 24508e7

Please sign in to comment.