Skip to content

Commit

Permalink
Allow job files to consume TUS uploads
Browse files Browse the repository at this point in the history
  • Loading branch information
jmchilton committed Jan 4, 2024
1 parent fe971b2 commit 6ae07d9
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 0 deletions.
20 changes: 20 additions & 0 deletions lib/galaxy/webapps/galaxy/api/job_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"""
import logging
import os
import re
import shutil

from galaxy import (
Expand Down Expand Up @@ -99,6 +100,14 @@ def create(self, trans, job_id, payload, **kwargs):
upload_store
), f"Filename provided by nginx ({file_path}) is not in correct directory ({upload_store})"
input_file = open(file_path)
elif "session_id" in payload:
# code stolen from basic.py
session_id = payload["session_id"]
upload_store = trans.app.config.tus_upload_store or trans.app.config.new_file_path
if re.match(r"^[\w-]+$", session_id) is None:
raise ValueError("Invalid session id format.")
local_filename = os.path.abspath(os.path.join(upload_store, session_id))
input_file = open(local_filename)
else:
input_file = payload.get("file", payload.get("__file", None)).file
target_dir = os.path.dirname(path)
Expand All @@ -114,6 +123,17 @@ def create(self, trans, job_id, payload, **kwargs):
pass
return {"message": "ok"}

@expose_api_raw_anonymous_and_sessionless
def tus_hooks(self, trans, **kwds):
"""
Exposed as POST /api/job_files/hooks and /api/job_files/resumable_upload
"""
# Cannot reuse upload/hooks since it requires either an API key or a session, this
# route will authorize access on active job keys.

# TODO: actually check authorization
return None

def __authorize_job_access(self, trans, encoded_job_id, **kwargs):
for key in ["path", "job_key"]:
if key not in kwargs:
Expand Down
12 changes: 12 additions & 0 deletions lib/galaxy/webapps/galaxy/buildapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,18 @@ def populate_api_routes(webapp, app):
)
webapp.mapper.connect("/api/upload/resumable_upload", controller="uploads", action="hooks")
webapp.mapper.connect("/api/upload/hooks", controller="uploads", action="hooks", conditions=dict(method=["POST"]))

webapp.mapper.connect(
"/api/job_files/resumable_upload/{session_id}",
controller="job_files",
action="tus_hooks",
conditions=dict(method=["PATCH"]),
)
webapp.mapper.connect("/api/job_files/resumable_upload", controller="job_files", action="tus_hooks")
webapp.mapper.connect(
"/api/job_files/hooks", controller="job_files", action="tus_hooks", conditions=dict(method=["POST"])
)

webapp.mapper.resource(
"revision",
"revisions",
Expand Down
32 changes: 32 additions & 0 deletions test/integration/test_job_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import requests
from sqlalchemy import select
from tusclient import client

from galaxy import model
from galaxy.model.base import (
Expand All @@ -35,6 +36,7 @@

TEST_INPUT_TEXT = "test input content\n"
TEST_FILE_IO = io.StringIO("some initial text data")
TEST_TUS_CHUNK_SIZE = 1024


class TestJobFilesIntegration(integration_util.IntegrationTestCase):
Expand Down Expand Up @@ -110,6 +112,36 @@ def files():
response = requests.post(post_url, data=data, files=files())
_assert_insufficient_permissions(response)

def test_write_with_tus(self):
# shared setup with above test
job, output_hda, _ = self.create_static_job_with_state("running")
job_id, job_key = self._api_job_keys(job)
path = self._app.object_store.get_filename(output_hda.dataset)
assert path

upload_url = self._api_url("job_files/resumable_upload", use_key=False)
headers = {}
my_client = client.TusClient(upload_url, headers=headers)

storage = None
metadata = {}
t_file = tempfile.NamedTemporaryFile("w")
t_file.write("some initial text data")
t_file.flush()

input_path = t_file.name

uploader = my_client.uploader(input_path, metadata=metadata, url_storage=storage)
uploader.chunk_size = TEST_TUS_CHUNK_SIZE
uploader.upload()
tus_session_id = uploader.url.rsplit("/", 1)[1]

data = {"path": path, "job_key": job_key, "session_id": tus_session_id}
post_url = self._api_url(f"jobs/{job_id}/files", use_key=False)
response = requests.post(post_url, data=data)
api_asserts.assert_status_code_is_ok(response)
assert open(path).read() == "some initial text data"

def test_write_protection(self):
job, _, _ = self.create_static_job_with_state("running")
job_id, job_key = self._api_job_keys(job)
Expand Down

0 comments on commit 6ae07d9

Please sign in to comment.