Skip to content

Commit

Permalink
Allow job files to consume TUS uploads
Browse files Browse the repository at this point in the history
  • Loading branch information
jmchilton committed Jan 12, 2024
1 parent fb5199b commit 1088396
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 0 deletions.
35 changes: 35 additions & 0 deletions lib/galaxy/webapps/galaxy/api/job_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"""
import logging
import os
import re
import shutil

from galaxy import (
Expand Down Expand Up @@ -99,6 +100,14 @@ def create(self, trans, job_id, payload, **kwargs):
upload_store
), f"Filename provided by nginx ({file_path}) is not in correct directory ({upload_store})"
input_file = open(file_path)
elif "session_id" in payload:
# code stolen from basic.py
session_id = payload["session_id"]
upload_store = trans.app.config.tus_upload_store or trans.app.config.new_file_path
if re.match(r"^[\w-]+$", session_id) is None:
raise ValueError("Invalid session id format.")
local_filename = os.path.abspath(os.path.join(upload_store, session_id))
input_file = open(local_filename)
else:
input_file = payload.get("file", payload.get("__file", None)).file
target_dir = os.path.dirname(path)
Expand All @@ -114,6 +123,32 @@ def create(self, trans, job_id, payload, **kwargs):
pass
return {"message": "ok"}

@expose_api_raw_anonymous_and_sessionless
def tus_hook_patch(self, trans, **kwds):
"""
Exposed as PATCH /api/job_files/resumable_upload and POST /api/job_files/hooks.
I think based on the docs, a separate tusd server is needed for job files if
also hosting one for use facing uploads.
Setting up tusd for job files should just look like (I think):
tusd -host localhost -port 1080 -upload-dir=<galaxy_root>/database/tmp -hooks-http=<galaxy_url>/api/job_files/hooks
Shouldn't need the API key and session stuff the upload tusd server uses.
I would love to check the job state with __authorize_job_access on the first
POST but it seems like TusMiddleware doesn't default to coming in here for that
initial POST the way it does for the subsequent PATCHes. Ultimately, the upload
is still authorized before the write done with POST /api/jobs/<job_id>/files
so I think there is no route here to mess with user data - the worst of the security
issue that can be caused is filling up the sever with needless files that aren't
acted on. Since this endpoint is not meant for public consumption - all the job
files stuff and the TUS server should be blocked to public IPs anyway and restricted
to your Pulsar servers.
"""
return None

def __authorize_job_access(self, trans, encoded_job_id, **kwargs):
for key in ["path", "job_key"]:
if key not in kwargs:
Expand Down
23 changes: 23 additions & 0 deletions lib/galaxy/webapps/galaxy/buildapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,18 @@ def populate_api_routes(webapp, app):
)
webapp.mapper.connect("/api/upload/resumable_upload", controller="uploads", action="hooks")
webapp.mapper.connect("/api/upload/hooks", controller="uploads", action="hooks", conditions=dict(method=["POST"]))

webapp.mapper.connect(
"/api/job_files/resumable_upload/{session_id}",
controller="job_files",
action="tus_hook_patch",
conditions=dict(method=["PATCH"]),
)
webapp.mapper.connect("/api/job_files/resumable_upload", controller="job_files", action="tus_hook_post")
webapp.mapper.connect(
"/api/job_files/hooks", controller="job_files", action="tus_hook_post", conditions=dict(method=["POST"])
)

webapp.mapper.resource(
"revision",
"revisions",
Expand Down Expand Up @@ -1103,6 +1115,17 @@ def wrap_in_middleware(app, global_conf, application_stack, **local_conf):
"max_size": application_stack.config.maximum_upload_file_size,
},
)
# TUS upload middleware for job files....
app = wrap_if_allowed(
app,
stack,
TusMiddleware,
kwargs={
"upload_path": urljoin(f"{application_stack.config.galaxy_url_prefix}/", "api/job_files/resumable_upload"),
"tmp_dir": application_stack.config.tus_upload_store or application_stack.config.new_file_path,
"max_size": application_stack.config.maximum_upload_file_size,
},
)
# X-Forwarded-Host handling
app = wrap_if_allowed(app, stack, XForwardedHostMiddleware)
# Request ID middleware
Expand Down
35 changes: 35 additions & 0 deletions test/integration/test_job_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
import io
import os
import tempfile
from typing import Dict

import requests
from sqlalchemy import select
from tusclient import client

from galaxy import model
from galaxy.model.base import (
Expand All @@ -35,6 +37,7 @@

TEST_INPUT_TEXT = "test input content\n"
TEST_FILE_IO = io.StringIO("some initial text data")
TEST_TUS_CHUNK_SIZE = 1024


class TestJobFilesIntegration(integration_util.IntegrationTestCase):
Expand Down Expand Up @@ -110,6 +113,38 @@ def files():
response = requests.post(post_url, data=data, files=files())
_assert_insufficient_permissions(response)

def test_write_with_tus(self):
# shared setup with above test
job, output_hda, _ = self.create_static_job_with_state("running")
job_id, job_key = self._api_job_keys(job)
path = self._app.object_store.get_filename(output_hda.dataset)
assert path

upload_url = self._api_url(f"job_files/resumable_upload?job_key={job_key}", use_key=False)
headers: Dict[str, str] = {}
my_client = client.TusClient(upload_url, headers=headers)

storage = None
metadata: Dict[str, str] = {}
t_file = tempfile.NamedTemporaryFile("w")
t_file.write("some initial text data")
t_file.flush()

input_path = t_file.name

uploader = my_client.uploader(input_path, metadata=metadata, url_storage=storage)
uploader.chunk_size = TEST_TUS_CHUNK_SIZE
uploader.upload()
upload_session_url = uploader.url
assert upload_session_url
tus_session_id = upload_session_url.rsplit("/", 1)[1]

data = {"path": path, "job_key": job_key, "session_id": tus_session_id}
post_url = self._api_url(f"jobs/{job_id}/files", use_key=False)
response = requests.post(post_url, data=data)
api_asserts.assert_status_code_is_ok(response)
assert open(path).read() == "some initial text data"

def test_write_protection(self):
job, _, _ = self.create_static_job_with_state("running")
job_id, job_key = self._api_job_keys(job)
Expand Down

0 comments on commit 1088396

Please sign in to comment.