From 0d6df34216ee19812c5ccfd7cd1496369747fcdf Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Mon, 23 Jul 2018 12:48:55 -0400 Subject: [PATCH 1/4] Update to latest WES (wip) --- wes_client/wes_client_main.py | 12 +- wes_service/arvados_wes.py | 34 ++-- wes_service/cwl_runner.py | 36 ++-- .../workflow_execution_service.swagger.yaml | 168 ++++++++++-------- 4 files changed, 134 insertions(+), 116 deletions(-) diff --git a/wes_client/wes_client_main.py b/wes_client/wes_client_main.py index bc9658d..e5db4ff 100644 --- a/wes_client/wes_client_main.py +++ b/wes_client/wes_client_main.py @@ -62,17 +62,17 @@ def main(argv=sys.argv[1:]): http_client=http_client, config={"use_models": False}) if args.list: - response = client.WorkflowExecutionService.ListWorkflows(page_token=args.page, page_size=args.page_size) + response = client.WorkflowExecutionService.ListRuns(page_token=args.page, page_size=args.page_size) json.dump(response.result(), sys.stdout, indent=4) return 0 if args.log: - response = client.WorkflowExecutionService.GetWorkflowLog(workflow_id=args.log) + response = client.WorkflowExecutionService.GetRunLog(workflow_id=args.log) sys.stdout.write(response.result()["workflow_log"]["stderr"]) return 0 if args.get: - response = client.WorkflowExecutionService.GetWorkflowLog(workflow_id=args.get) + response = client.WorkflowExecutionService.GetRunLog(workflow_id=args.get) json.dump(response.result(), sys.stdout, indent=4) return 0 @@ -147,14 +147,14 @@ def fixpaths(d): sys.stdout.write(r["workflow_id"] + "\n") exit(0) - r = client.WorkflowExecutionService.GetWorkflowStatus(workflow_id=r["workflow_id"]).result() + r = client.WorkflowExecutionService.GetRunStatus(workflow_id=r["workflow_id"]).result() while r["state"] in ("QUEUED", "INITIALIZING", "RUNNING"): time.sleep(8) - r = client.WorkflowExecutionService.GetWorkflowStatus(workflow_id=r["workflow_id"]).result() + r = client.WorkflowExecutionService.GetRunStatus(workflow_id=r["workflow_id"]).result() logging.info("State is %s", r["state"]) - s = client.WorkflowExecutionService.GetWorkflowLog(workflow_id=r["workflow_id"]).result() + s = client.WorkflowExecutionService.GetRunLog(workflow_id=r["workflow_id"]).result() try: # TODO: Only works with Arvados atm diff --git a/wes_service/arvados_wes.py b/wes_service/arvados_wes.py index 7c04d9f..30e0cea 100644 --- a/wes_service/arvados_wes.py +++ b/wes_service/arvados_wes.py @@ -81,7 +81,7 @@ def GetServiceInfo(self): } @catch_exceptions - def ListWorkflows(self, page_size=None, page_token=None, tag_search=None, state_search=None): + def ListRuns(self, page_size=None, page_token=None, state_search=None): api = get_api() paging = [] @@ -100,13 +100,13 @@ def ListWorkflows(self, page_size=None, page_token=None, tag_search=None, state_ uuidmap = {c["uuid"]: statemap[c["state"]] for c in containers} - workflow_list = [{"workflow_id": cr["uuid"], + workflow_list = [{"run_id": cr["uuid"], "state": uuidmap.get(cr["container_uuid"])} for cr in requests if cr["command"] and cr["command"][0] == "arvados-cwl-runner"] return { "workflows": workflow_list, - "next_page_token": workflow_list[-1]["workflow_id"] if workflow_list else "" + "next_page_token": workflow_list[-1]["run_id"] if workflow_list else "" } def invoke_cwl_runner(self, cr_uuid, workflow_url, workflow_params, @@ -215,13 +215,13 @@ def RunWorkflow(self, workflow_params, workflow_type, workflow_type_version, project_uuid, tempdir)).start() - return {"workflow_id": cr["uuid"]} + return {"run_id": cr["uuid"]} @catch_exceptions - def GetWorkflowLog(self, workflow_id): + def GetRunLog(self, run_id): api = get_api() - request = api.container_requests().get(uuid=workflow_id).execute() + request = api.container_requests().get(uuid=run_id).execute() if request["container_uuid"]: container = api.containers().get(uuid=request["container_uuid"]).execute() # NOQA task_reqs = arvados.util.list_all(api.container_requests().list, filters=[["requesting_container_uuid", "=", container["uuid"]]]) @@ -273,7 +273,7 @@ def log_object(cr): return r r = { - "workflow_id": request["uuid"], + "run_id": request["uuid"], "request": { "workflow_url": "", "workflow_params": request["mounts"].get("/var/lib/cwl/cwl.input.json", {}).get("content", {}) @@ -287,30 +287,30 @@ def log_object(cr): return r @catch_exceptions - def CancelJob(self, workflow_id): # NOQA + def CancelRun(self, run_id): # NOQA api = get_api() - request = api.container_requests().update(uuid=workflow_id, body={"priority": 0}).execute() # NOQA - return {"workflow_id": request["uuid"]} + request = api.container_requests().update(uuid=run_id, body={"priority": 0}).execute() # NOQA + return {"run_id": request["uuid"]} @catch_exceptions - def GetWorkflowStatus(self, workflow_id): + def GetRunStatus(self, run_id): api = get_api() - request = api.container_requests().get(uuid=workflow_id).execute() + request = api.container_requests().get(uuid=run_id).execute() if request["container_uuid"]: container = api.containers().get(uuid=request["container_uuid"]).execute() # NOQA elif request["priority"] == 0: container = {"state": "Cancelled"} else: container = {"state": "Queued"} - return {"workflow_id": request["uuid"], + return {"run_id": request["uuid"], "state": statemap[container["state"]]} -def dynamic_logs(workflow_id, logstream): +def dynamic_logs(run_id, logstream): api = get_api() - cr = api.container_requests().get(uuid=workflow_id).execute() + cr = api.container_requests().get(uuid=run_id).execute() l1 = [t["properties"]["text"] - for t in api.logs().list(filters=[["object_uuid", "=", workflow_id], + for t in api.logs().list(filters=[["object_uuid", "=", run_id], ["event_type", "=", logstream]], order="created_at desc", limit=100).execute()["items"]] @@ -327,5 +327,5 @@ def dynamic_logs(workflow_id, logstream): def create_backend(app, opts): ab = ArvadosBackend(opts) - app.app.route('/ga4gh/wes/v1/workflows//x-dynamic-logs/')(dynamic_logs) + app.app.route('/ga4gh/wes/v1/runs//x-dynamic-logs/')(dynamic_logs) return ab diff --git a/wes_service/cwl_runner.py b/wes_service/cwl_runner.py index de2ca50..ccd5718 100644 --- a/wes_service/cwl_runner.py +++ b/wes_service/cwl_runner.py @@ -13,10 +13,10 @@ class Workflow(object): - def __init__(self, workflow_id): + def __init__(self, run_id): super(Workflow, self).__init__() - self.workflow_id = workflow_id - self.workdir = os.path.join(os.getcwd(), "workflows", self.workflow_id) + self.run_id = run_id + self.workdir = os.path.join(os.getcwd(), "workflows", self.run_id) def run(self, request, opts): """ @@ -37,7 +37,7 @@ def run(self, request, opts): :param dict request: A dictionary containing the cwl/json information. :param wes_service.util.WESBackend opts: contains the user's arguments; specifically the runner and runner options - :return: {"workflow_id": self.workflow_id, "state": state} + :return: {"run_id": self.run_id, "state": state} """ os.makedirs(self.workdir) outdir = os.path.join(self.workdir, "outdir") @@ -117,7 +117,7 @@ def getstatus(self): state, exit_code = self.getstate() return { - "workflow_id": self.workflow_id, + "run_id": self.run_id, "state": state } @@ -137,7 +137,7 @@ def getlog(self): outputobj = json.load(outputtemp) return { - "workflow_id": self.workflow_id, + "run_id": self.run_id, "request": request, "state": state, "workflow_log": { @@ -169,14 +169,14 @@ def GetServiceInfo(self): "key_values": {} } - def ListWorkflows(self): + def ListRuns(self, page_size=None, page_token=None, state_search=None): # FIXME #15 results don't page wf = [] for l in os.listdir(os.path.join(os.getcwd(), "workflows")): if os.path.isdir(os.path.join(os.getcwd(), "workflows", l)): wf.append(Workflow(l)) - workflows = [{"workflow_id": w.workflow_id, "state": w.getstate()[0]} for w in wf] # NOQA + workflows = [{"run_id": w.run_id, "state": w.getstate()[0]} for w in wf] # NOQA return { "workflows": workflows, "next_page_token": "" @@ -204,23 +204,23 @@ def RunWorkflow(self): if index > 0: body["workflow_url"] = body["workflow_url"][index:] - workflow_id = uuid.uuid4().hex - job = Workflow(workflow_id) + run_id = uuid.uuid4().hex + job = Workflow(run_id) job.run(body, self) - return {"workflow_id": workflow_id} + return {"run_id": run_id} - def GetWorkflowLog(self, workflow_id): - job = Workflow(workflow_id) + def GetRunLog(self, run_id): + job = Workflow(run_id) return job.getlog() - def CancelJob(self, workflow_id): - job = Workflow(workflow_id) + def CancelRun(self, run_id): + job = Workflow(run_id) job.cancel() - return {"workflow_id": workflow_id} + return {"run_id": run_id} - def GetWorkflowStatus(self, workflow_id): - job = Workflow(workflow_id) + def GetRunStatus(self, run_id): + job = Workflow(run_id) return job.getstatus() diff --git a/wes_service/openapi/workflow_execution_service.swagger.yaml b/wes_service/openapi/workflow_execution_service.swagger.yaml index a3f1258..d0c15b1 100644 --- a/wes_service/openapi/workflow_execution_service.swagger.yaml +++ b/wes_service/openapi/workflow_execution_service.swagger.yaml @@ -1,8 +1,8 @@ -basePath: /ga4gh/wes/v1 +basePath: '/ga4gh/wes/v1' swagger: '2.0' info: title: Workflow Execution Service - version: 0.2.1 + version: 0.3.0 schemes: - http - https @@ -14,8 +14,7 @@ paths: /service-info: get: summary: |- - Get information about Workflow Execution Service. May include information related (but - not limited to) the workflow descriptor formats, versions supported, the WES API versions supported, and information about general the service availability. + Get information about Workflow Execution Service. May include information related (but not limited to) the workflow descriptor formats, versions supported, the WES API versions supported, and information about general the service availability. x-swagger-router-controller: ga4gh.wes.server operationId: GetServiceInfo responses: @@ -41,20 +40,23 @@ paths: $ref: '#/definitions/ErrorResponse' tags: - WorkflowExecutionService - /workflows: + /runs: get: summary: |- - List the workflows, this endpoint will list the workflows in order of oldest to newest. - There is no guarantee of live updates as the user traverses the pages, the behavior should be - decided (and documented) by each implementation. - To monitor a given execution, use GetWorkflowStatus or GetWorkflowLog. + List the workflow runs. This should be provided in a stable + ordering, however the ordering of this list is implementation + dependent. When paging through the list, the client should + not make assumptions about live updates, but should assume the + contents of the list reflect the workflow list at the moment + that the first page is requested. To monitor a specific + workflow run, use GetRunStatus or GetRunLog. x-swagger-router-controller: ga4gh.wes.server - operationId: ListWorkflows + operationId: ListRuns responses: '200': description: '' schema: - $ref: '#/definitions/WorkflowListResponse' + $ref: '#/definitions/RunListResponse' '400': description: The request is malformed. schema: @@ -75,7 +77,14 @@ paths: - name: page_size description: |- OPTIONAL - Number of workflows to return in a page. + The preferred number of workflow runs to return in a page. + If not provided, the implementation should use a default page size. + The implementation must not return more items + than "page_size", but it may return fewer. Clients should + not assume that if fewer than "page_size" items is + returned that all items have been returned. The + availability of additional pages is indicated by the value + of "next_page_token" in the response. in: query required: false type: integer @@ -83,19 +92,11 @@ paths: - name: page_token description: |- OPTIONAL - Token to use to indicate where to start getting results. If unspecified, returns the first + Token to use to indicate where to start getting results. If unspecified, return the first page of results. in: query required: false type: string - - name: tag_search - description: |- - OPTIONAL - For each key, if the key's value is empty string then match workflows that are tagged with - this key regardless of value. - in: query - required: false - type: string tags: - WorkflowExecutionService post: @@ -139,7 +140,7 @@ paths: '200': description: '' schema: - $ref: '#/definitions/WorkflowRunId' + $ref: '#/definitions/RunId' '400': description: The request is malformed. schema: @@ -194,22 +195,22 @@ paths: format: binary tags: - WorkflowExecutionService - '/workflows/{workflow_id}': + /runs/{run_id}: get: - summary: Get detailed info about a running workflow. + summary: Get detailed info about a workflow run. x-swagger-router-controller: ga4gh.wes.server - operationId: GetWorkflowLog + operationId: GetRunLog responses: '200': description: '' schema: - $ref: '#/definitions/WorkflowLog' + $ref: '#/definitions/RunLog' '401': description: The request is unauthorized. schema: $ref: '#/definitions/ErrorResponse' '404': - description: The requested Workflow found. + description: The requested workflow run not found. schema: $ref: '#/definitions/ErrorResponse' '403': @@ -221,7 +222,7 @@ paths: schema: $ref: '#/definitions/ErrorResponse' parameters: - - name: workflow_id + - name: run_id in: path required: true type: string @@ -230,18 +231,18 @@ paths: delete: summary: Cancel a running workflow. x-swagger-router-controller: ga4gh.wes.server - operationId: CancelJob + operationId: CancelRun responses: '200': description: '' schema: - $ref: '#/definitions/WorkflowRunId' + $ref: '#/definitions/RunId' '401': description: The request is unauthorized. schema: $ref: '#/definitions/ErrorResponse' '404': - description: The requested Workflow wasn't found. + description: The requested workflow run wasn't found. schema: $ref: '#/definitions/ErrorResponse' '403': @@ -253,28 +254,28 @@ paths: schema: $ref: '#/definitions/ErrorResponse' parameters: - - name: workflow_id + - name: run_id in: path required: true type: string tags: - WorkflowExecutionService - '/workflows/{workflow_id}/status': + /runs/{run_id}/status: get: - summary: Get quick status info about a running workflow. + summary: Get quick status info about a workflow run. x-swagger-router-controller: ga4gh.wes.server - operationId: GetWorkflowStatus + operationId: GetRunStatus responses: '200': description: '' schema: - $ref: '#/definitions/WorkflowStatus' + $ref: '#/definitions/RunStatus' '401': description: The request is unauthorized. schema: $ref: '#/definitions/ErrorResponse' '404': - description: The requested Workflow wasn't found. + description: The requested workflow run wasn't found. schema: $ref: '#/definitions/ErrorResponse' '403': @@ -286,7 +287,7 @@ paths: schema: $ref: '#/definitions/ErrorResponse' parameters: - - name: workflow_id + - name: run_id in: path required: true type: string @@ -315,19 +316,29 @@ definitions: type: array items: type: string - title: The command line that was run + title: The command line that was executed start_time: type: string - title: When the command was executed + title: When the command started executing, in ISO 8601 format "%Y-%m-%dT%H:%M:%SZ" end_time: type: string - title: When the command completed + title: When the command stopped executing (completed, failed, or cancelled), in ISO 8601 format "%Y-%m-%dT%H:%M:%SZ" stdout: type: string - title: Sample of stdout (not guaranteed to be entire log) + title: |- + A URL to retrieve standard output logs of the workflow run or + task. This URL may change between status requests, or may + not be available until the task or workflow has finished + execution. Should be available using the same credentials + used to access the WES endpoint. stderr: type: string - title: Sample of stderr (not guaranteed to be entire log) + title: |- + A URL to retrieve standard error logs of the workflow run or + task. This URL may change between status requests, or may + not be available until the task or workflow has finished + execution. Should be available using the same credentials + used to access the WES endpoint. exit_code: type: integer format: int32 @@ -376,13 +387,20 @@ definitions: type: integer format: int64 description: |- - The system statistics, key is the statistic, value is the count of workflows in that state. + The system statistics, key is the statistic, value is the count of runs in that state. See the State enum for the possible keys. auth_instructions_url: type: string description: |- - A URL that will help a in generating the tokens necessary to run a workflow using this - service. + A web page URL with information about how to get an + authorization token necessary to use a specific endpoint. + contact_info: + type: string + description: |- + An email address or web page URL with contact information + for the operator of a specific WES endpoint. Users of the + endpoint should use this to report problems or security + vulnerabilities. tags: type: object additionalProperties: @@ -428,62 +446,62 @@ definitions: for example an upload failed due to network issues, the worker's ran out of disk space, etc. - CANCELED: The task was canceled by the user. - title: Enumeration of states for a given workflow request - WorkflowDescription: + title: Enumeration of states for a given run request + RunDescription: type: object properties: - workflow_id: + run_id: type: string title: REQUIRED state: $ref: '#/definitions/State' title: REQUIRED - title: 'Small description of workflows, returned by server during listing' - WorkflowListResponse: + title: 'Small description of a workflow run, returned by server during listing' + RunListResponse: type: object properties: - workflows: + runs: type: array items: - $ref: '#/definitions/WorkflowDescription' - description: A list of workflows that the service has executed or is executing. + $ref: '#/definitions/RunDescription' + description: A list of workflow runs that the service has executed or is executing. next_page_token: type: string description: |- - A token, which when provided in a workflow_list_request, allows one to retrieve the next page - of results. - description: The service will return a workflow_list_response when receiving a successful workflow_list_request. - WorkflowLog: + A token which may be supplied as "page_token" in workflow run list request to get the next page + of results. An empty string indicates there are no more items to return. + description: The service will return a RunListResponse when receiving a successful RunListRequest. + RunLog: type: object properties: - workflow_id: + run_id: type: string - title: workflow ID + title: workflow run ID request: - $ref: '#/definitions/WorkflowRequest' + $ref: '#/definitions/RunRequest' description: The original request message used to initiate this execution. state: $ref: '#/definitions/State' title: state - workflow_log: + run_log: $ref: '#/definitions/Log' title: 'the logs, and other key info like timing and exit code, for the overall run of this workflow' task_logs: type: array items: $ref: '#/definitions/Log' - title: 'the logs, and other key info like timing and exit code, for each step in the workflow' + title: 'the logs, and other key info like timing and exit code, for each step in the workflow run' outputs: $ref: '#/definitions/WesObject' title: the outputs - WorkflowRequest: + RunRequest: type: object properties: workflow_params: $ref: '#/definitions/WesObject' description: |- REQUIRED - The workflow parameterization document (typically a JSON file), includes all parameterizations for the workflow + The workflow run parameterization document (typically a JSON file), includes all parameterizations for the run including input and output file locations. workflow_type: type: string @@ -501,7 +519,7 @@ definitions: type: string title: |- OPTIONAL - A key-value map of arbitrary metadata outside the scope of the workflow_params but useful to track with this workflow request + A key-value map of arbitrary metadata outside the scope of the run_params but useful to track with this run request workflow_engine_parameters: type: object additionalProperties: @@ -518,20 +536,20 @@ definitions: When workflow attachments files are provided, the `workflow_url` may be a relative path corresponding to one of the attachments. description: |- - To execute a workflow, send a workflow request including all the details needed to begin downloading + To execute a workflow, send a run request including all the details needed to begin downloading and executing a given workflow. - WorkflowRunId: + RunId: type: object properties: - workflow_id: + run_id: type: string - title: workflow ID - WorkflowStatus: + title: workflow run ID + RunStatus: type: object properties: - workflow_id: + run_id: type: string - title: workflow ID + title: workflow run ID state: $ref: '#/definitions/State' title: state @@ -546,7 +564,7 @@ definitions: an array of one or more acceptable types for the Workflow Type. For example, to send a base64 encoded WDL gzip, one could would offer "base64_wdl1.0_gzip". By setting this value, and the path of the main WDL - to be executed in the workflow_url to "main.wdl" in the WorkflowRequest. + to be executed in the workflow_url to "main.wdl" in the RunRequest. description: Available workflow types supported by a given instance of the service. WesObject: type: object @@ -563,4 +581,4 @@ definitions: description: A detailed error message. status_code: type: integer - description: The integer representing the HTTP status code (e.g. 200, 404). \ No newline at end of file + description: The integer representing the HTTP status code (e.g. 200, 404). From f85f80528800d7543fef75ad6bffab606ae4bee2 Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Mon, 23 Jul 2018 15:24:29 -0400 Subject: [PATCH 2/4] Passing some tests --- setup.py | 2 +- test/test_integration.py | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/setup.py b/setup.py index 650a715..50b309a 100644 --- a/setup.py +++ b/setup.py @@ -11,7 +11,7 @@ long_description = readmeFile.read() setup(name='wes-service', - version='2.4', + version='2.5', description='GA4GH Workflow Execution Service reference implementation', long_description=long_description, author='GA4GH Containers and Workflows task team', diff --git a/test/test_integration.py b/test/test_integration.py index e04ec81..52e6c54 100644 --- a/test/test_integration.py +++ b/test/test_integration.py @@ -60,7 +60,7 @@ def test_multipart_upload(self): def run_md5sum(cwl_input): """Pass a local md5sum cwl to the wes-service server, and return the path of the output file that was created.""" - endpoint = 'http://localhost:8080/ga4gh/wes/v1/workflows' + endpoint = 'http://localhost:8080/ga4gh/wes/v1/runs' params = {'output_file': {'path': '/tmp/md5sum.txt', 'class': 'File'}, 'input_file': {'path': '../../testdata/md5sum.input', 'class': 'File'}} @@ -71,12 +71,12 @@ def run_md5sum(cwl_input): else: parts.append(("workflow_url", cwl_input)) response = requests.post(endpoint, files=parts).json() - output_dir = os.path.abspath(os.path.join('workflows', response['workflow_id'], 'outdir')) - return os.path.join(output_dir, 'md5sum.txt'), response['workflow_id'] + output_dir = os.path.abspath(os.path.join('workflows', response['run_id'], 'outdir')) + return os.path.join(output_dir, 'md5sum.txt'), response['run_id'] def get_log_request(run_id): - endpoint = 'http://localhost:8080/ga4gh/wes/v1/workflows/{}'.format(run_id) + endpoint = 'http://localhost:8080/ga4gh/wes/v1/runs/{}'.format(run_id) return requests.get(endpoint).json() From 4b1afce8cee9924abc850bc7c0e791994d798fbf Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Mon, 23 Jul 2018 16:21:43 -0400 Subject: [PATCH 3/4] More fixes for 0.3.0 WES --- wes_client/wes_client_main.py | 18 +++++++++++------- wes_service/arvados_wes.py | 21 ++++----------------- wes_service/cwl_runner.py | 23 ++--------------------- wes_service/util.py | 29 ++++++++++++++++++++++++++++- 4 files changed, 45 insertions(+), 46 deletions(-) diff --git a/wes_client/wes_client_main.py b/wes_client/wes_client_main.py index e5db4ff..1b66d69 100644 --- a/wes_client/wes_client_main.py +++ b/wes_client/wes_client_main.py @@ -81,6 +81,10 @@ def main(argv=sys.argv[1:]): json.dump(response.result(), sys.stdout, indent=4) return 0 + if not args.job_order: + logging.error("Missing job order") + return 1 + loader = schema_salad.ref_resolver.Loader({ "location": {"@type": "@id"}, "path": {"@type": "@id"} @@ -102,7 +106,7 @@ def fixpaths(d): visit(input_dict, fixpaths) workflow_url = args.workflow_url - if not workflow_url.startswith("/") and ":" not in workflow_url: + if ":" not in workflow_url: workflow_url = "file://" + os.path.abspath(workflow_url) if args.quiet: @@ -131,7 +135,7 @@ def fixpaths(d): else: parts.append(("workflow_url", workflow_url)) - postresult = http_client.session.post("%s://%s/ga4gh/wes/v1/workflows" % (args.proto, args.host), + postresult = http_client.session.post("%s://%s/ga4gh/wes/v1/runs" % (args.proto, args.host), files=parts, headers={"Authorization": args.auth}) @@ -142,19 +146,19 @@ def fixpaths(d): exit(1) if args.wait: - logging.info("Workflow id is %s", r["workflow_id"]) + logging.info("Workflow run id is %s", r["run_id"]) else: - sys.stdout.write(r["workflow_id"] + "\n") + sys.stdout.write(r["run_id"] + "\n") exit(0) - r = client.WorkflowExecutionService.GetRunStatus(workflow_id=r["workflow_id"]).result() + r = client.WorkflowExecutionService.GetRunStatus(run_id=r["run_id"]).result() while r["state"] in ("QUEUED", "INITIALIZING", "RUNNING"): time.sleep(8) - r = client.WorkflowExecutionService.GetRunStatus(workflow_id=r["workflow_id"]).result() + r = client.WorkflowExecutionService.GetRunStatus(run_id=r["run_id"]).result() logging.info("State is %s", r["state"]) - s = client.WorkflowExecutionService.GetRunLog(workflow_id=r["workflow_id"]).result() + s = client.WorkflowExecutionService.GetRunLog(run_id=r["run_id"]).result() try: # TODO: Only works with Arvados atm diff --git a/wes_service/arvados_wes.py b/wes_service/arvados_wes.py index 30e0cea..811d513 100644 --- a/wes_service/arvados_wes.py +++ b/wes_service/arvados_wes.py @@ -156,23 +156,10 @@ def invoke_cwl_runner(self, cr_uuid, workflow_url, workflow_params, workflow_descriptor_file.close() @catch_exceptions - def RunWorkflow(self, workflow_params, workflow_type, workflow_type_version, - workflow_url, workflow_descriptor, workflow_engine_parameters=None, tags=None): - tempdir = tempfile.mkdtemp() - body = {} - for k, ls in connexion.request.files.iterlists(): - for v in ls: - if k == "workflow_descriptor": - filename = secure_filename(v.filename) - v.save(os.path.join(tempdir, filename)) - elif k in ("workflow_params", "tags", "workflow_engine_parameters"): - body[k] = json.loads(v.read()) - else: - body[k] = v.read() - body["workflow_url"] = "file:///%s/%s" % (tempdir, body["workflow_url"]) - - if body["workflow_type"] != "CWL" or body["workflow_type_version"] != "v1.0": # NOQA - return + def RunWorkflow(self, **args): + tempdir, body = self.collect_attachments() + + print(body) if not connexion.request.headers.get('Authorization'): raise MissingAuthorization() diff --git a/wes_service/cwl_runner.py b/wes_service/cwl_runner.py index ccd5718..414f1b4 100644 --- a/wes_service/cwl_runner.py +++ b/wes_service/cwl_runner.py @@ -182,27 +182,8 @@ def ListRuns(self, page_size=None, page_token=None, state_search=None): "next_page_token": "" } - def RunWorkflow(self): - tempdir = tempfile.mkdtemp() - body = {} - for k, ls in connexion.request.files.iterlists(): - for v in ls: - if k == "workflow_descriptor": - filename = secure_filename(v.filename) - v.save(os.path.join(tempdir, filename)) - elif k in ("workflow_params", "tags", "workflow_engine_parameters"): - body[k] = json.loads(v.read()) - else: - body[k] = v.read() - - if body['workflow_type'] != "CWL" or \ - body['workflow_type_version'] != "v1.0": - return - - body["workflow_url"] = "file:///%s/%s" % (tempdir, body["workflow_url"]) - index = body["workflow_url"].find("http") - if index > 0: - body["workflow_url"] = body["workflow_url"][index:] + def RunWorkflow(self, **args): + tempdir, body = self.collect_attachments() run_id = uuid.uuid4().hex job = Workflow(run_id) diff --git a/wes_service/util.py b/wes_service/util.py index 9f6ba37..9b0c315 100644 --- a/wes_service/util.py +++ b/wes_service/util.py @@ -1,5 +1,10 @@ -from six import itervalues +import tempfile +import json +import os +from six import itervalues +import connexion +from werkzeug.utils import secure_filename def visit(d, op): """Recursively call op(d) for all list subelements and dictionary 'values' that d may have.""" @@ -35,3 +40,25 @@ def getoptlist(self, p): if k == p: optlist.append(v) return optlist + + def collect_attachments(self): + tempdir = tempfile.mkdtemp() + body = {} + for k, ls in connexion.request.files.iterlists(): + for v in ls: + if k == "workflow_descriptor": + filename = secure_filename(v.filename) + v.save(os.path.join(tempdir, filename)) + elif k in ("workflow_params", "tags", "workflow_engine_parameters"): + body[k] = json.loads(v.read()) + else: + body[k] = v.read() + + if body['workflow_type'] != "CWL" or \ + body['workflow_type_version'] != "v1.0": + return + + if ":" not in body["workflow_url"]: + body["workflow_url"] = "file://%s" % os.path.join(tempdir, secure_filename(body["workflow_url"])) + + return (tempdir, body) From dd5d6c8b05aabb9c3a00a443f2b704ca68de64e7 Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Mon, 23 Jul 2018 16:35:36 -0400 Subject: [PATCH 4/4] flake8 fixes --- wes_service/arvados_wes.py | 1 - wes_service/cwl_runner.py | 4 ---- wes_service/util.py | 1 + 3 files changed, 1 insertion(+), 5 deletions(-) diff --git a/wes_service/arvados_wes.py b/wes_service/arvados_wes.py index 811d513..a09c945 100644 --- a/wes_service/arvados_wes.py +++ b/wes_service/arvados_wes.py @@ -13,7 +13,6 @@ import shutil from wes_service.util import visit, WESBackend -from werkzeug.utils import secure_filename class MissingAuthorization(Exception): diff --git a/wes_service/cwl_runner.py b/wes_service/cwl_runner.py index 414f1b4..cbf5411 100644 --- a/wes_service/cwl_runner.py +++ b/wes_service/cwl_runner.py @@ -2,13 +2,9 @@ import json import os import subprocess -import tempfile import urllib import uuid -import connexion -from werkzeug.utils import secure_filename - from wes_service.util import WESBackend diff --git a/wes_service/util.py b/wes_service/util.py index 9b0c315..d57837d 100644 --- a/wes_service/util.py +++ b/wes_service/util.py @@ -6,6 +6,7 @@ import connexion from werkzeug.utils import secure_filename + def visit(d, op): """Recursively call op(d) for all list subelements and dictionary 'values' that d may have.""" op(d)