diff --git a/lib/galaxy/webapps/galaxy/api/job_files.py b/lib/galaxy/webapps/galaxy/api/job_files.py index 397205b9a382..604baaa1bdd4 100644 --- a/lib/galaxy/webapps/galaxy/api/job_files.py +++ b/lib/galaxy/webapps/galaxy/api/job_files.py @@ -3,6 +3,7 @@ """ import logging import os +import re import shutil from galaxy import ( @@ -99,6 +100,14 @@ def create(self, trans, job_id, payload, **kwargs): upload_store ), f"Filename provided by nginx ({file_path}) is not in correct directory ({upload_store})" input_file = open(file_path) + elif "session_id" in payload: + # code stolen from basic.py + session_id = payload["session_id"] + upload_store = trans.app.config.tus_upload_store or trans.app.config.new_file_path + if re.match(r"^[\w-]+$", session_id) is None: + raise ValueError("Invalid session id format.") + local_filename = os.path.abspath(os.path.join(upload_store, session_id)) + input_file = open(local_filename) else: input_file = payload.get("file", payload.get("__file", None)).file target_dir = os.path.dirname(path) @@ -114,6 +123,17 @@ def create(self, trans, job_id, payload, **kwargs): pass return {"message": "ok"} + @expose_api_raw_anonymous_and_sessionless + def tus_hooks(self, trans, **kwds): + """ + Exposed as POST /api/job_files/hooks and /api/job_files/resumable_upload + """ + # Cannot reuse upload/hooks since it requires either an API key or a session, this + # route will authorize access on active job keys. + + # TODO: actually check authorization + return None + def __authorize_job_access(self, trans, encoded_job_id, **kwargs): for key in ["path", "job_key"]: if key not in kwargs: diff --git a/lib/galaxy/webapps/galaxy/buildapp.py b/lib/galaxy/webapps/galaxy/buildapp.py index a17a122d47da..4b1da362c39e 100644 --- a/lib/galaxy/webapps/galaxy/buildapp.py +++ b/lib/galaxy/webapps/galaxy/buildapp.py @@ -327,6 +327,18 @@ def populate_api_routes(webapp, app): ) webapp.mapper.connect("/api/upload/resumable_upload", controller="uploads", action="hooks") webapp.mapper.connect("/api/upload/hooks", controller="uploads", action="hooks", conditions=dict(method=["POST"])) + + webapp.mapper.connect( + "/api/job_files/resumable_upload/{session_id}", + controller="job_files", + action="tus_hooks", + conditions=dict(method=["PATCH"]), + ) + webapp.mapper.connect("/api/job_files/resumable_upload", controller="job_files", action="tus_hooks") + webapp.mapper.connect( + "/api/job_files/hooks", controller="job_files", action="tus_hooks", conditions=dict(method=["POST"]) + ) + webapp.mapper.resource( "revision", "revisions", diff --git a/test/integration/test_job_files.py b/test/integration/test_job_files.py index 4ed02b350dd3..8de749aee227 100644 --- a/test/integration/test_job_files.py +++ b/test/integration/test_job_files.py @@ -20,6 +20,7 @@ import requests from sqlalchemy import select +from tusclient import client from galaxy import model from galaxy.model.base import ( @@ -35,6 +36,7 @@ TEST_INPUT_TEXT = "test input content\n" TEST_FILE_IO = io.StringIO("some initial text data") +TEST_TUS_CHUNK_SIZE = 1024 class TestJobFilesIntegration(integration_util.IntegrationTestCase): @@ -110,6 +112,36 @@ def files(): response = requests.post(post_url, data=data, files=files()) _assert_insufficient_permissions(response) + def test_write_with_tus(self): + # shared setup with above test + job, output_hda, _ = self.create_static_job_with_state("running") + job_id, job_key = self._api_job_keys(job) + path = self._app.object_store.get_filename(output_hda.dataset) + assert path + + upload_url = self._api_url("job_files/resumable_upload", use_key=False) + headers = {} + my_client = client.TusClient(upload_url, headers=headers) + + storage = None + metadata = {} + t_file = tempfile.NamedTemporaryFile("w") + t_file.write("some initial text data") + t_file.flush() + + input_path = t_file.name + + uploader = my_client.uploader(input_path, metadata=metadata, url_storage=storage) + uploader.chunk_size = TEST_TUS_CHUNK_SIZE + uploader.upload() + tus_session_id = uploader.url.rsplit("/", 1)[1] + + data = {"path": path, "job_key": job_key, "session_id": tus_session_id} + post_url = self._api_url(f"jobs/{job_id}/files", use_key=False) + response = requests.post(post_url, data=data) + api_asserts.assert_status_code_is_ok(response) + assert open(path).read() == "some initial text data" + def test_write_protection(self): job, _, _ = self.create_static_job_with_state("running") job_id, job_key = self._api_job_keys(job)