Skip to content

Commit

Permalink
Cwltool client fixes (#36)
Browse files Browse the repository at this point in the history
* Added support for multi part upload, fixed test to account for changes, included new test for checking multipart upload

* Adding development instructions to the readme

* Fix multipart upload for cwl_runner
Muted workflow_log for service. Arvados assumptions are being made yielding
error logs when running cwl-runner.
Minor flake8 changes in the whole infrastructure.

* Flake8 compliance with fixes.
Https request for tool definition in tool descriptor.
Unable to provision local files to temp dir.

* Nit changes.
  • Loading branch information
achave11-ucsc authored and david4096 committed Jul 20, 2018
1 parent 968b558 commit 7d77a6c
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 21 deletions.
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,12 @@ $ export WES_API_PROTO=http

Then, when you call `wes-client` these defaults will be used in place of the
flags, `--host`, `--auth`, and `proto` respectively.

## Development
If you would like to develop against `workflow-service` make sure you pass the provided test and it is flake8 compliant
#### Run test
From path `workflow-service` run

```
$ pytest && flake8
```
48 changes: 38 additions & 10 deletions test/test_integration.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from __future__ import absolute_import

import json
import unittest
import time
import os
Expand All @@ -10,6 +12,7 @@

class IntegrationTest(unittest.TestCase):
"""A baseclass that's inherited for use with different cwl backends."""

def setUp(self):
"""Start a (local) wes-service server to make requests against."""
raise NotImplementedError
Expand All @@ -30,28 +33,51 @@ def tearDown(self):
def test_dockstore_md5sum(self):
"""Fetch the md5sum cwl from dockstore, run it on the wes-service server, and check for the correct output."""
cwl_dockstore_url = 'https://dockstore.org:8443/api/ga4gh/v2/tools/quay.io%2Fbriandoconnor%2Fdockstore-tool-md5sum/versions/master/plain-CWL/descriptor/%2FDockstore.cwl'
output_filepath = run_md5sum(cwl_input=cwl_dockstore_url)
output_filepath, _ = run_md5sum(cwl_input=cwl_dockstore_url)

self.assertTrue(check_for_file(output_filepath), 'Output file was not found: ' + str(output_filepath))
shutil.rmtree('workflows')

def test_local_md5sum(self):
"""Pass a local md5sum cwl to the wes-service server, and check for the correct output."""
cwl_local_path = os.path.abspath('testdata/md5sum.cwl')
output_filepath = run_md5sum(cwl_input='file://' + cwl_local_path)
output_filepath, _ = run_md5sum(cwl_input='file://' + cwl_local_path)

self.assertTrue(check_for_file(output_filepath), 'Output file was not found: ' + str(output_filepath))
shutil.rmtree('workflows')

def test_multipart_upload(self):
"""Pass a local md5sum cwl to the wes-service server, and check for uploaded file in service."""
cwl_local_path = os.path.abspath('testdata/md5sum.cwl')
_, run_id = run_md5sum(cwl_input='file://' + cwl_local_path)

get_response = get_log_request(run_id)["request"]

self.assertTrue(check_for_file(get_response["workflow_url"][7:]), 'Output file was not found: '
+ get_response["workflow_url"][:7])
shutil.rmtree('workflows')


def run_md5sum(cwl_input):
"""Pass a local md5sum cwl to the wes-service server, and return the path of the output file that was created."""
endpoint = 'http://localhost:8080/ga4gh/wes/v1/workflows'
params = {'output_file': {'path': '/tmp/md5sum.txt', 'class': 'File'}, 'input_file': {'path': '../../testdata/md5sum.input', 'class': 'File'}}
body = {'workflow_url': cwl_input, 'workflow_params': params, 'workflow_type': 'CWL', 'workflow_type_version': 'v1.0'}
response = requests.post(endpoint, json=body).json()
params = {'output_file': {'path': '/tmp/md5sum.txt', 'class': 'File'},
'input_file': {'path': '../../testdata/md5sum.input', 'class': 'File'}}

parts = [("workflow_params", json.dumps(params)), ("workflow_type", "CWL"), ("workflow_type_version", "v1.0")]
if cwl_input.startswith("file://"):
parts.append(("workflow_descriptor", ("md5sum.cwl", open(cwl_input[7:], "rb"))))
parts.append(("workflow_url", os.path.basename(cwl_input[7:])))
else:
parts.append(("workflow_url", cwl_input))
response = requests.post(endpoint, files=parts).json()
output_dir = os.path.abspath(os.path.join('workflows', response['workflow_id'], 'outdir'))
return os.path.join(output_dir, 'md5sum.txt')
return os.path.join(output_dir, 'md5sum.txt'), response['workflow_id']


def get_log_request(run_id):
endpoint = 'http://localhost:8080/ga4gh/wes/v1/workflows/{}'.format(run_id)
return requests.get(endpoint).json()


def get_server_pids():
Expand All @@ -77,18 +103,21 @@ def check_for_file(filepath, seconds=20):

class CwltoolTest(IntegrationTest):
"""Test using cwltool."""

def setUp(self):
"""
Start a (local) wes-service server to make requests against.
Use cwltool as the wes-service server 'backend'.
"""
self.wes_server_process = subprocess.Popen('python {}'.format(os.path.abspath('wes_service/wes_service_main.py')),
shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
self.wes_server_process = subprocess.Popen(
'python {}'.format(os.path.abspath('wes_service/wes_service_main.py')),
shell=True)
time.sleep(5)


class ToilTest(IntegrationTest):
"""Test using Toil."""

def setUp(self):
"""
Start a (local) wes-service server to make requests against.
Expand All @@ -97,13 +126,12 @@ def setUp(self):
self.wes_server_process = subprocess.Popen('python {} '
'--opt runner=cwltoil --opt extra=--logLevel=CRITICAL'
''.format(os.path.abspath('wes_service/wes_service_main.py')),
shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
shell=True)
time.sleep(5)


# Prevent pytest/unittest's discovery from attempting to discover the base test class.
del IntegrationTest


if __name__ == '__main__':
unittest.main() # run all tests
2 changes: 1 addition & 1 deletion testdata/md5sum.cwl
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ outputs:

steps:
md5sum:
run: dockstore-tool-md5sum.cwl
run: https://raw.githubusercontent.com/common-workflow-language/workflow-service/master/testdata/dockstore-tool-md5sum.cwl
in:
input_file: input_file
out: [output_file]
Expand Down
4 changes: 2 additions & 2 deletions wes_client/wes_client_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import logging
import schema_salad.ref_resolver
import requests
from requests.exceptions import MissingSchema
from requests.exceptions import InvalidSchema
from wes_service.util import visit
from bravado.client import SwaggerClient
from bravado.requests_client import RequestsClient
Expand Down Expand Up @@ -161,7 +161,7 @@ def fixpaths(d):
logging.info(str(s["workflow_log"]["stderr"]))
logs = requests.get(s["workflow_log"]["stderr"], headers={"Authorization": args.auth}).text
logging.info("Workflow log:\n" + logs)
except MissingSchema:
except InvalidSchema:
logging.info("Workflow log:\n" + str(s["workflow_log"]["stderr"]))

if "fields" in s["outputs"] and s["outputs"]["fields"] is None:
Expand Down
39 changes: 31 additions & 8 deletions wes_service/cwl_runner.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
from __future__ import print_function
import json
import os
import subprocess
import tempfile
import urllib
import uuid

import connexion
from werkzeug.utils import secure_filename

from wes_service.util import WESBackend


Expand Down Expand Up @@ -41,14 +46,13 @@ def run(self, request, opts):
with open(os.path.join(self.workdir, "request.json"), "w") as f:
json.dump(request, f)

input_json = os.path.join(self.workdir, "cwl.input.json")
with open(input_json, "w") as inputtemp:
with open(os.path.join(
self.workdir, "cwl.input.json"), "w") as inputtemp:
json.dump(request["workflow_params"], inputtemp)

if request.get("workflow_descriptor"):
workflow_descriptor = request.get('workflow_descriptor')
with open(os.path.join(self.workdir, "workflow.cwl"), "w") as f:
# FIXME #14 workflow_descriptor isn't defined
f.write(workflow_descriptor)
workflow_url = urllib.pathname2url(os.path.join(self.workdir, "workflow.cwl"))
else:
Expand All @@ -58,8 +62,8 @@ def run(self, request, opts):
stderr = open(os.path.join(self.workdir, "stderr"), "w")

runner = opts.getopt("runner", default="cwl-runner")
extra = opts.getoptlist("extra") # if the user specified none, returns []
command_args = [runner] + extra + [workflow_url, input_json]
extra = opts.getoptlist("extra")
command_args = [runner] + extra + [workflow_url, inputtemp.name]
proc = subprocess.Popen(command_args,
stdout=output,
stderr=stderr,
Expand Down Expand Up @@ -178,12 +182,31 @@ def ListWorkflows(self):
"next_page_token": ""
}

def RunWorkflow(self, body):
# FIXME Add error responses #16
if body["workflow_type"] == "CWL" and body["workflow_type_version"] != "v1.0":
def RunWorkflow(self):
tempdir = tempfile.mkdtemp()
body = {}
for k, ls in connexion.request.files.iterlists():
for v in ls:
if k == "workflow_descriptor":
filename = secure_filename(v.filename)
v.save(os.path.join(tempdir, filename))
elif k in ("workflow_params", "tags", "workflow_engine_parameters"):
body[k] = json.loads(v.read())
else:
body[k] = v.read()

if body['workflow_type'] != "CWL" or \
body['workflow_type_version'] != "v1.0":
return

body["workflow_url"] = "file:///%s/%s" % (tempdir, body["workflow_url"])
index = body["workflow_url"].find("http")
if index > 0:
body["workflow_url"] = body["workflow_url"][index:]

workflow_id = uuid.uuid4().hex
job = Workflow(workflow_id)

job.run(body, self)
return {"workflow_id": workflow_id}

Expand Down

0 comments on commit 7d77a6c

Please sign in to comment.