From 1088396fea9722392ee4b444536df6af23a08c82 Mon Sep 17 00:00:00 2001 From: John Chilton Date: Thu, 4 Jan 2024 13:28:32 -0500 Subject: [PATCH] Allow job files to consume TUS uploads --- lib/galaxy/webapps/galaxy/api/job_files.py | 35 ++++++++++++++++++++++ lib/galaxy/webapps/galaxy/buildapp.py | 23 ++++++++++++++ test/integration/test_job_files.py | 35 ++++++++++++++++++++++ 3 files changed, 93 insertions(+) diff --git a/lib/galaxy/webapps/galaxy/api/job_files.py b/lib/galaxy/webapps/galaxy/api/job_files.py index 397205b9a382..33ac29e445d5 100644 --- a/lib/galaxy/webapps/galaxy/api/job_files.py +++ b/lib/galaxy/webapps/galaxy/api/job_files.py @@ -3,6 +3,7 @@ """ import logging import os +import re import shutil from galaxy import ( @@ -99,6 +100,14 @@ def create(self, trans, job_id, payload, **kwargs): upload_store ), f"Filename provided by nginx ({file_path}) is not in correct directory ({upload_store})" input_file = open(file_path) + elif "session_id" in payload: + # code stolen from basic.py + session_id = payload["session_id"] + upload_store = trans.app.config.tus_upload_store or trans.app.config.new_file_path + if re.match(r"^[\w-]+$", session_id) is None: + raise ValueError("Invalid session id format.") + local_filename = os.path.abspath(os.path.join(upload_store, session_id)) + input_file = open(local_filename) else: input_file = payload.get("file", payload.get("__file", None)).file target_dir = os.path.dirname(path) @@ -114,6 +123,32 @@ def create(self, trans, job_id, payload, **kwargs): pass return {"message": "ok"} + @expose_api_raw_anonymous_and_sessionless + def tus_hook_patch(self, trans, **kwds): + """ + Exposed as PATCH /api/job_files/resumable_upload and POST /api/job_files/hooks. + + I think based on the docs, a separate tusd server is needed for job files if + also hosting one for use facing uploads. + + Setting up tusd for job files should just look like (I think): + + tusd -host localhost -port 1080 -upload-dir=/database/tmp -hooks-http=/api/job_files/hooks + + Shouldn't need the API key and session stuff the upload tusd server uses. + + I would love to check the job state with __authorize_job_access on the first + POST but it seems like TusMiddleware doesn't default to coming in here for that + initial POST the way it does for the subsequent PATCHes. Ultimately, the upload + is still authorized before the write done with POST /api/jobs//files + so I think there is no route here to mess with user data - the worst of the security + issue that can be caused is filling up the sever with needless files that aren't + acted on. Since this endpoint is not meant for public consumption - all the job + files stuff and the TUS server should be blocked to public IPs anyway and restricted + to your Pulsar servers. + """ + return None + def __authorize_job_access(self, trans, encoded_job_id, **kwargs): for key in ["path", "job_key"]: if key not in kwargs: diff --git a/lib/galaxy/webapps/galaxy/buildapp.py b/lib/galaxy/webapps/galaxy/buildapp.py index a17a122d47da..42a04bee0213 100644 --- a/lib/galaxy/webapps/galaxy/buildapp.py +++ b/lib/galaxy/webapps/galaxy/buildapp.py @@ -327,6 +327,18 @@ def populate_api_routes(webapp, app): ) webapp.mapper.connect("/api/upload/resumable_upload", controller="uploads", action="hooks") webapp.mapper.connect("/api/upload/hooks", controller="uploads", action="hooks", conditions=dict(method=["POST"])) + + webapp.mapper.connect( + "/api/job_files/resumable_upload/{session_id}", + controller="job_files", + action="tus_hook_patch", + conditions=dict(method=["PATCH"]), + ) + webapp.mapper.connect("/api/job_files/resumable_upload", controller="job_files", action="tus_hook_post") + webapp.mapper.connect( + "/api/job_files/hooks", controller="job_files", action="tus_hook_post", conditions=dict(method=["POST"]) + ) + webapp.mapper.resource( "revision", "revisions", @@ -1103,6 +1115,17 @@ def wrap_in_middleware(app, global_conf, application_stack, **local_conf): "max_size": application_stack.config.maximum_upload_file_size, }, ) + # TUS upload middleware for job files.... + app = wrap_if_allowed( + app, + stack, + TusMiddleware, + kwargs={ + "upload_path": urljoin(f"{application_stack.config.galaxy_url_prefix}/", "api/job_files/resumable_upload"), + "tmp_dir": application_stack.config.tus_upload_store or application_stack.config.new_file_path, + "max_size": application_stack.config.maximum_upload_file_size, + }, + ) # X-Forwarded-Host handling app = wrap_if_allowed(app, stack, XForwardedHostMiddleware) # Request ID middleware diff --git a/test/integration/test_job_files.py b/test/integration/test_job_files.py index 4ed02b350dd3..1c14ce81f284 100644 --- a/test/integration/test_job_files.py +++ b/test/integration/test_job_files.py @@ -17,9 +17,11 @@ import io import os import tempfile +from typing import Dict import requests from sqlalchemy import select +from tusclient import client from galaxy import model from galaxy.model.base import ( @@ -35,6 +37,7 @@ TEST_INPUT_TEXT = "test input content\n" TEST_FILE_IO = io.StringIO("some initial text data") +TEST_TUS_CHUNK_SIZE = 1024 class TestJobFilesIntegration(integration_util.IntegrationTestCase): @@ -110,6 +113,38 @@ def files(): response = requests.post(post_url, data=data, files=files()) _assert_insufficient_permissions(response) + def test_write_with_tus(self): + # shared setup with above test + job, output_hda, _ = self.create_static_job_with_state("running") + job_id, job_key = self._api_job_keys(job) + path = self._app.object_store.get_filename(output_hda.dataset) + assert path + + upload_url = self._api_url(f"job_files/resumable_upload?job_key={job_key}", use_key=False) + headers: Dict[str, str] = {} + my_client = client.TusClient(upload_url, headers=headers) + + storage = None + metadata: Dict[str, str] = {} + t_file = tempfile.NamedTemporaryFile("w") + t_file.write("some initial text data") + t_file.flush() + + input_path = t_file.name + + uploader = my_client.uploader(input_path, metadata=metadata, url_storage=storage) + uploader.chunk_size = TEST_TUS_CHUNK_SIZE + uploader.upload() + upload_session_url = uploader.url + assert upload_session_url + tus_session_id = upload_session_url.rsplit("/", 1)[1] + + data = {"path": path, "job_key": job_key, "session_id": tus_session_id} + post_url = self._api_url(f"jobs/{job_id}/files", use_key=False) + response = requests.post(post_url, data=data) + api_asserts.assert_status_code_is_ok(response) + assert open(path).read() == "some initial text data" + def test_write_protection(self): job, _, _ = self.create_static_job_with_state("running") job_id, job_key = self._api_job_keys(job)