diff --git a/lib/galaxy/dependencies/pinned-requirements.txt b/lib/galaxy/dependencies/pinned-requirements.txt index 16b857d312f8..18cf6fcde70c 100644 --- a/lib/galaxy/dependencies/pinned-requirements.txt +++ b/lib/galaxy/dependencies/pinned-requirements.txt @@ -127,7 +127,7 @@ promise==2.3 ; python_version >= "3.8" and python_version < "3.12" prompt-toolkit==3.0.43 ; python_version >= "3.8" and python_version < "3.12" prov==1.5.1 ; python_version >= "3.8" and python_version < "3.12" psutil==5.9.7 ; python_version >= "3.8" and python_version < "3.12" -pulsar-galaxy-lib==0.15.5 ; python_version >= "3.8" and python_version < "3.12" +pulsar-galaxy-lib==0.15.6 ; python_version >= "3.8" and python_version < "3.12" pyasn1==0.5.1 ; python_version >= "3.8" and python_version < "3.12" pycparser==2.21 ; python_version >= "3.8" and python_version < "3.12" pycryptodome==3.19.1 ; python_version >= "3.8" and python_version < "3.12" diff --git a/lib/galaxy/webapps/galaxy/api/job_files.py b/lib/galaxy/webapps/galaxy/api/job_files.py index 397205b9a382..80945e4693c2 100644 --- a/lib/galaxy/webapps/galaxy/api/job_files.py +++ b/lib/galaxy/webapps/galaxy/api/job_files.py @@ -3,6 +3,7 @@ """ import logging import os +import re import shutil from galaxy import ( @@ -99,6 +100,14 @@ def create(self, trans, job_id, payload, **kwargs): upload_store ), f"Filename provided by nginx ({file_path}) is not in correct directory ({upload_store})" input_file = open(file_path) + elif "session_id" in payload: + # code stolen from basic.py + session_id = payload["session_id"] + upload_store = trans.app.config.tus_upload_store or trans.app.config.new_file_path + if re.match(r"^[\w-]+$", session_id) is None: + raise ValueError("Invalid session id format.") + local_filename = os.path.abspath(os.path.join(upload_store, session_id)) + input_file = open(local_filename) else: input_file = payload.get("file", payload.get("__file", None)).file target_dir = os.path.dirname(path) @@ -114,6 +123,47 @@ def create(self, trans, job_id, payload, **kwargs): pass return {"message": "ok"} + @expose_api_raw_anonymous_and_sessionless + def tus_patch(self, trans, **kwds): + """ + Exposed as PATCH /api/job_files/resumable_upload. + + I think based on the docs, a separate tusd server is needed for job files if + also hosting one for use facing uploads. + + Setting up tusd for job files should just look like (I think): + + tusd -host localhost -port 1080 -upload-dir=/database/tmp + + See more discussion of checking upload access, but we shouldn't need the + API key and session stuff the user upload tusd server should be configured with. + + Also shouldn't need a hooks endpoint for this reason but if you want to add one + the target CLI entry would be -hooks-http=/api/job_files/tus_hooks + and the action is featured below. + + I would love to check the job state with __authorize_job_access on the first + POST but it seems like TusMiddleware doesn't default to coming in here for that + initial POST the way it does for the subsequent PATCHes. Ultimately, the upload + is still authorized before the write done with POST /api/jobs//files + so I think there is no route here to mess with user data - the worst of the security + issues that can be caused is filling up the sever with needless files that aren't + acted on. Since this endpoint is not meant for public consumption - all the job + files stuff and the TUS server should be blocked to public IPs anyway and restricted + to your Pulsar servers and similar targeting could be accomplished with a user account + and the user facing upload endpoints. + """ + return None + + @expose_api_raw_anonymous_and_sessionless + def tus_hooks(self, trans, **kwds): + """No-op but if hook specified the way we do for user upload it would hit this action. + + Exposed as PATCH /api/job_files/tus_hooks and documented in the docstring for + tus_patch. + """ + pass + def __authorize_job_access(self, trans, encoded_job_id, **kwargs): for key in ["path", "job_key"]: if key not in kwargs: diff --git a/lib/galaxy/webapps/galaxy/buildapp.py b/lib/galaxy/webapps/galaxy/buildapp.py index a17a122d47da..fa89c2d68774 100644 --- a/lib/galaxy/webapps/galaxy/buildapp.py +++ b/lib/galaxy/webapps/galaxy/buildapp.py @@ -327,6 +327,21 @@ def populate_api_routes(webapp, app): ) webapp.mapper.connect("/api/upload/resumable_upload", controller="uploads", action="hooks") webapp.mapper.connect("/api/upload/hooks", controller="uploads", action="hooks", conditions=dict(method=["POST"])) + + webapp.mapper.connect( + "/api/job_files/resumable_upload/{session_id}", + controller="job_files", + action="tus_patch", + conditions=dict(method=["PATCH"]), + ) + # user facing upload has this endpoint enabled but the middleware completely masks it and the controller + # is not used. Probably it isn't needed there but I am keeping the doc here until we remove both + # routes. + # webapp.mapper.connect("/api/job_files/resumable_upload", controller="job_files", action="tus_post") + webapp.mapper.connect( + "/api/job_files/tus_hooks", controller="job_files", action="tus_hooks", conditions=dict(method=["POST"]) + ) + webapp.mapper.resource( "revision", "revisions", @@ -1103,6 +1118,17 @@ def wrap_in_middleware(app, global_conf, application_stack, **local_conf): "max_size": application_stack.config.maximum_upload_file_size, }, ) + # TUS upload middleware for job files.... + app = wrap_if_allowed( + app, + stack, + TusMiddleware, + kwargs={ + "upload_path": urljoin(f"{application_stack.config.galaxy_url_prefix}/", "api/job_files/resumable_upload"), + "tmp_dir": application_stack.config.tus_upload_store or application_stack.config.new_file_path, + "max_size": application_stack.config.maximum_upload_file_size, + }, + ) # X-Forwarded-Host handling app = wrap_if_allowed(app, stack, XForwardedHostMiddleware) # Request ID middleware diff --git a/test/integration/test_job_files.py b/test/integration/test_job_files.py index 4ed02b350dd3..1c14ce81f284 100644 --- a/test/integration/test_job_files.py +++ b/test/integration/test_job_files.py @@ -17,9 +17,11 @@ import io import os import tempfile +from typing import Dict import requests from sqlalchemy import select +from tusclient import client from galaxy import model from galaxy.model.base import ( @@ -35,6 +37,7 @@ TEST_INPUT_TEXT = "test input content\n" TEST_FILE_IO = io.StringIO("some initial text data") +TEST_TUS_CHUNK_SIZE = 1024 class TestJobFilesIntegration(integration_util.IntegrationTestCase): @@ -110,6 +113,38 @@ def files(): response = requests.post(post_url, data=data, files=files()) _assert_insufficient_permissions(response) + def test_write_with_tus(self): + # shared setup with above test + job, output_hda, _ = self.create_static_job_with_state("running") + job_id, job_key = self._api_job_keys(job) + path = self._app.object_store.get_filename(output_hda.dataset) + assert path + + upload_url = self._api_url(f"job_files/resumable_upload?job_key={job_key}", use_key=False) + headers: Dict[str, str] = {} + my_client = client.TusClient(upload_url, headers=headers) + + storage = None + metadata: Dict[str, str] = {} + t_file = tempfile.NamedTemporaryFile("w") + t_file.write("some initial text data") + t_file.flush() + + input_path = t_file.name + + uploader = my_client.uploader(input_path, metadata=metadata, url_storage=storage) + uploader.chunk_size = TEST_TUS_CHUNK_SIZE + uploader.upload() + upload_session_url = uploader.url + assert upload_session_url + tus_session_id = upload_session_url.rsplit("/", 1)[1] + + data = {"path": path, "job_key": job_key, "session_id": tus_session_id} + post_url = self._api_url(f"jobs/{job_id}/files", use_key=False) + response = requests.post(post_url, data=data) + api_asserts.assert_status_code_is_ok(response) + assert open(path).read() == "some initial text data" + def test_write_protection(self): job, _, _ = self.create_static_job_with_state("running") job_id, job_key = self._api_job_keys(job) diff --git a/test/integration/test_job_files_tus.py b/test/integration/test_job_files_tus.py new file mode 100644 index 000000000000..94874813075c --- /dev/null +++ b/test/integration/test_job_files_tus.py @@ -0,0 +1,53 @@ +"""Integration tests for the Pulsar embedded runner with outputs written back to Galaxy via TUS.""" + +import os +import tempfile + +from galaxy.util import safe_makedirs +from galaxy_test.driver import integration_util + +JOB_CONF_TEMPLATE = """ +runners: + local: + load: galaxy.jobs.runners.local:LocalJobRunner + pulsar_embed: + load: galaxy.jobs.runners.pulsar:PulsarEmbeddedJobRunner + pulsar_app_config: + tool_dependency_dir: none + conda_auto_init: false + conda_auto_install: false + +execution: + default: pulsar_embed + environments: + local: + runner: local + pulsar_embed: + runner: pulsar_embed + default_file_action: remote_transfer_tus + +tools: +- class: local + environment: local +""" + + +class EmbeddedPulsarTargetingTusIntegrationInstance(integration_util.IntegrationInstance): + """Describe a Galaxy test instance with embedded pulsar configured to target job files tus.""" + + framework_tool_and_types = True + + @classmethod + def handle_galaxy_config_kwds(cls, config): + jobs_directory = os.path.join(cls._test_driver.mkdtemp(), "pulsar_staging") + safe_makedirs(jobs_directory) + with tempfile.NamedTemporaryFile(suffix="_tus_job_conf.yml", mode="w", delete=False) as job_conf: + job_conf.write(JOB_CONF_TEMPLATE) + config["job_config_file"] = job_conf.name + infrastructure_url = "http://localhost:$GALAXY_WEB_PORT" + config["galaxy_infrastructure_url"] = infrastructure_url + + +instance = integration_util.integration_module_instance(EmbeddedPulsarTargetingTusIntegrationInstance) + +test_tools = integration_util.integration_tool_runner(["simple_constructs", "composite_output_tests"])