From 2ae6ff7dfd4154c9bcdffabbcff0513507f658df Mon Sep 17 00:00:00 2001 From: John Chilton Date: Thu, 4 Jan 2024 13:28:32 -0500 Subject: [PATCH] Allow job files to consume TUS uploads --- lib/galaxy/webapps/galaxy/api/job_files.py | 20 +++++++++++++ lib/galaxy/webapps/galaxy/buildapp.py | 23 ++++++++++++++ test/integration/test_job_files.py | 35 ++++++++++++++++++++++ 3 files changed, 78 insertions(+) diff --git a/lib/galaxy/webapps/galaxy/api/job_files.py b/lib/galaxy/webapps/galaxy/api/job_files.py index 397205b9a382..604baaa1bdd4 100644 --- a/lib/galaxy/webapps/galaxy/api/job_files.py +++ b/lib/galaxy/webapps/galaxy/api/job_files.py @@ -3,6 +3,7 @@ """ import logging import os +import re import shutil from galaxy import ( @@ -99,6 +100,14 @@ def create(self, trans, job_id, payload, **kwargs): upload_store ), f"Filename provided by nginx ({file_path}) is not in correct directory ({upload_store})" input_file = open(file_path) + elif "session_id" in payload: + # code stolen from basic.py + session_id = payload["session_id"] + upload_store = trans.app.config.tus_upload_store or trans.app.config.new_file_path + if re.match(r"^[\w-]+$", session_id) is None: + raise ValueError("Invalid session id format.") + local_filename = os.path.abspath(os.path.join(upload_store, session_id)) + input_file = open(local_filename) else: input_file = payload.get("file", payload.get("__file", None)).file target_dir = os.path.dirname(path) @@ -114,6 +123,17 @@ def create(self, trans, job_id, payload, **kwargs): pass return {"message": "ok"} + @expose_api_raw_anonymous_and_sessionless + def tus_hooks(self, trans, **kwds): + """ + Exposed as POST /api/job_files/hooks and /api/job_files/resumable_upload + """ + # Cannot reuse upload/hooks since it requires either an API key or a session, this + # route will authorize access on active job keys. + + # TODO: actually check authorization + return None + def __authorize_job_access(self, trans, encoded_job_id, **kwargs): for key in ["path", "job_key"]: if key not in kwargs: diff --git a/lib/galaxy/webapps/galaxy/buildapp.py b/lib/galaxy/webapps/galaxy/buildapp.py index a17a122d47da..e7071425bc3f 100644 --- a/lib/galaxy/webapps/galaxy/buildapp.py +++ b/lib/galaxy/webapps/galaxy/buildapp.py @@ -327,6 +327,18 @@ def populate_api_routes(webapp, app): ) webapp.mapper.connect("/api/upload/resumable_upload", controller="uploads", action="hooks") webapp.mapper.connect("/api/upload/hooks", controller="uploads", action="hooks", conditions=dict(method=["POST"])) + + webapp.mapper.connect( + "/api/job_files/resumable_upload/{session_id}", + controller="job_files", + action="tus_hooks", + conditions=dict(method=["PATCH"]), + ) + webapp.mapper.connect("/api/job_files/resumable_upload", controller="job_files", action="tus_hooks") + webapp.mapper.connect( + "/api/job_files/hooks", controller="job_files", action="tus_hooks", conditions=dict(method=["POST"]) + ) + webapp.mapper.resource( "revision", "revisions", @@ -1103,6 +1115,17 @@ def wrap_in_middleware(app, global_conf, application_stack, **local_conf): "max_size": application_stack.config.maximum_upload_file_size, }, ) + # TUS upload middleware for job files.... + app = wrap_if_allowed( + app, + stack, + TusMiddleware, + kwargs={ + "upload_path": urljoin(f"{application_stack.config.galaxy_url_prefix}/", "api/job_files/resumable_upload"), + "tmp_dir": application_stack.config.tus_upload_store or application_stack.config.new_file_path, + "max_size": application_stack.config.maximum_upload_file_size, + }, + ) # X-Forwarded-Host handling app = wrap_if_allowed(app, stack, XForwardedHostMiddleware) # Request ID middleware diff --git a/test/integration/test_job_files.py b/test/integration/test_job_files.py index 4ed02b350dd3..8b4dfc94eb7f 100644 --- a/test/integration/test_job_files.py +++ b/test/integration/test_job_files.py @@ -17,9 +17,11 @@ import io import os import tempfile +from typing import Dict import requests from sqlalchemy import select +from tusclient import client from galaxy import model from galaxy.model.base import ( @@ -35,6 +37,7 @@ TEST_INPUT_TEXT = "test input content\n" TEST_FILE_IO = io.StringIO("some initial text data") +TEST_TUS_CHUNK_SIZE = 1024 class TestJobFilesIntegration(integration_util.IntegrationTestCase): @@ -110,6 +113,38 @@ def files(): response = requests.post(post_url, data=data, files=files()) _assert_insufficient_permissions(response) + def test_write_with_tus(self): + # shared setup with above test + job, output_hda, _ = self.create_static_job_with_state("running") + job_id, job_key = self._api_job_keys(job) + path = self._app.object_store.get_filename(output_hda.dataset) + assert path + + upload_url = self._api_url("job_files/resumable_upload", use_key=False) + headers: Dict[str, str] = {} + my_client = client.TusClient(upload_url, headers=headers) + + storage = None + metadata: Dict[str, str] = {} + t_file = tempfile.NamedTemporaryFile("w") + t_file.write("some initial text data") + t_file.flush() + + input_path = t_file.name + + uploader = my_client.uploader(input_path, metadata=metadata, url_storage=storage) + uploader.chunk_size = TEST_TUS_CHUNK_SIZE + uploader.upload() + upload_session_url = uploader.url + assert upload_session_url + tus_session_id = upload_session_url.rsplit("/", 1)[1] + + data = {"path": path, "job_key": job_key, "session_id": tus_session_id} + post_url = self._api_url(f"jobs/{job_id}/files", use_key=False) + response = requests.post(post_url, data=data) + api_asserts.assert_status_code_is_ok(response) + assert open(path).read() == "some initial text data" + def test_write_protection(self): job, _, _ = self.create_static_job_with_state("running") job_id, job_key = self._api_job_keys(job)