Skip to content

Commit

Permalink
Allow job files to consume TUS uploads
Browse files Browse the repository at this point in the history
  • Loading branch information
jmchilton committed Jan 4, 2024
1 parent fe971b2 commit 6e5995b
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 0 deletions.
30 changes: 30 additions & 0 deletions lib/galaxy/webapps/galaxy/api/job_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"""
import logging
import os
import re
import shutil

from galaxy import (
Expand Down Expand Up @@ -99,6 +100,14 @@ def create(self, trans, job_id, payload, **kwargs):
upload_store
), f"Filename provided by nginx ({file_path}) is not in correct directory ({upload_store})"
input_file = open(file_path)
elif "session_id" in payload:
# code stolen from basic.py
session_id = payload["session_id"]
upload_store = trans.app.config.tus_upload_store or trans.app.config.new_file_path
if re.match(r"^[\w-]+$", session_id) is None:
raise ValueError("Invalid session id format.")
local_filename = os.path.abspath(os.path.join(upload_store, session_id))
input_file = open(local_filename)
else:
input_file = payload.get("file", payload.get("__file", None)).file
target_dir = os.path.dirname(path)
Expand All @@ -114,6 +123,27 @@ def create(self, trans, job_id, payload, **kwargs):
pass
return {"message": "ok"}

@expose_api_raw_anonymous_and_sessionless
def tus_hooks(self, trans, **kwds):
"""
Exposed as POST /api/job_files/hooks and /api/job_files/resumable_upload
I think based on the docs, a separate tusd server is needed for job files.
Setting up tusd for job files should just look like (I think):
tusd -host localhost -port 1080 -upload-dir=<galaxy_root>/database/tmp -hooks-http=<galaxy_url>/api/job_files/hooks
Shouldn't need the API key and session stuff the upload tusd server uses.
If job files server isn't exposed externally, probably not reason to do any
hook-based authorization/authentication checks at all. The job keys during
the final job files write calls will be sufficient.
"""
# Cannot reuse upload/hooks since it requires either an API key or a session, this
# route will authorize access on active job keys.

# TODO: actually check authorization (if coming on hooks path)
return None

def __authorize_job_access(self, trans, encoded_job_id, **kwargs):
for key in ["path", "job_key"]:
if key not in kwargs:
Expand Down
23 changes: 23 additions & 0 deletions lib/galaxy/webapps/galaxy/buildapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,18 @@ def populate_api_routes(webapp, app):
)
webapp.mapper.connect("/api/upload/resumable_upload", controller="uploads", action="hooks")
webapp.mapper.connect("/api/upload/hooks", controller="uploads", action="hooks", conditions=dict(method=["POST"]))

webapp.mapper.connect(
"/api/job_files/resumable_upload/{session_id}",
controller="job_files",
action="tus_hooks",
conditions=dict(method=["PATCH"]),
)
webapp.mapper.connect("/api/job_files/resumable_upload", controller="job_files", action="tus_hooks")
webapp.mapper.connect(
"/api/job_files/hooks", controller="job_files", action="tus_hooks", conditions=dict(method=["POST"])
)

webapp.mapper.resource(
"revision",
"revisions",
Expand Down Expand Up @@ -1103,6 +1115,17 @@ def wrap_in_middleware(app, global_conf, application_stack, **local_conf):
"max_size": application_stack.config.maximum_upload_file_size,
},
)
# TUS upload middleware for job files....
app = wrap_if_allowed(
app,
stack,
TusMiddleware,
kwargs={
"upload_path": urljoin(f"{application_stack.config.galaxy_url_prefix}/", "api/job_files/resumable_upload"),
"tmp_dir": application_stack.config.tus_upload_store or application_stack.config.new_file_path,
"max_size": application_stack.config.maximum_upload_file_size,
},
)
# X-Forwarded-Host handling
app = wrap_if_allowed(app, stack, XForwardedHostMiddleware)
# Request ID middleware
Expand Down
35 changes: 35 additions & 0 deletions test/integration/test_job_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
import io
import os
import tempfile
from typing import Dict

import requests
from sqlalchemy import select
from tusclient import client

from galaxy import model
from galaxy.model.base import (
Expand All @@ -35,6 +37,7 @@

TEST_INPUT_TEXT = "test input content\n"
TEST_FILE_IO = io.StringIO("some initial text data")
TEST_TUS_CHUNK_SIZE = 1024


class TestJobFilesIntegration(integration_util.IntegrationTestCase):
Expand Down Expand Up @@ -110,6 +113,38 @@ def files():
response = requests.post(post_url, data=data, files=files())
_assert_insufficient_permissions(response)

def test_write_with_tus(self):
# shared setup with above test
job, output_hda, _ = self.create_static_job_with_state("running")
job_id, job_key = self._api_job_keys(job)
path = self._app.object_store.get_filename(output_hda.dataset)
assert path

upload_url = self._api_url("job_files/resumable_upload", use_key=False)
headers: Dict[str, str] = {}
my_client = client.TusClient(upload_url, headers=headers)

storage = None
metadata: Dict[str, str] = {}
t_file = tempfile.NamedTemporaryFile("w")
t_file.write("some initial text data")
t_file.flush()

input_path = t_file.name

uploader = my_client.uploader(input_path, metadata=metadata, url_storage=storage)
uploader.chunk_size = TEST_TUS_CHUNK_SIZE
uploader.upload()
upload_session_url = uploader.url
assert upload_session_url
tus_session_id = upload_session_url.rsplit("/", 1)[1]

data = {"path": path, "job_key": job_key, "session_id": tus_session_id}
post_url = self._api_url(f"jobs/{job_id}/files", use_key=False)
response = requests.post(post_url, data=data)
api_asserts.assert_status_code_is_ok(response)
assert open(path).read() == "some initial text data"

def test_write_protection(self):
job, _, _ = self.create_static_job_with_state("running")
job_id, job_key = self._api_job_keys(job)
Expand Down

0 comments on commit 6e5995b

Please sign in to comment.