Skip to content

Commit

Permalink
add ability to stop tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
molotgor committed Jun 25, 2024
1 parent e9f9b7e commit c86c2e5
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 14 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ docker compose up
* defined `PYTHONPATH`, `PIP_TARGET` environment variables
* updated compose:
* added `python_lib` volume
* added saving of current tasks
* task contains status(success, failed, in progress) and id using which task can be stopped
* added end-point `/stop` for stopping requested task
* updated end-point `/result` it now requests task by id and returns file, reason for failed run or informs that task is 'in progress' depending on task status

### 0.0.3

Expand Down
94 changes: 80 additions & 14 deletions server.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
notebooksDir: str = '/home/jupyter-notebook/'
resultsDir: str = '/home/jupyter-notebook/results/'
logDir: str = '/home/jupyter-notebook/logs/'
tasks: dict = {}


def notebooksReg(path):
Expand Down Expand Up @@ -255,20 +256,30 @@ async def reqArguments(req: Request):
return web.json_response(params)


async def launchNotebook(input, arguments=None, file_name=None):
global serverStatus
async def launchNotebook(input, arguments, file_name, task_id):
global tasks
print('launching notebook {input} with {arguments}'.format(input=input, arguments=arguments))
logOut: str = (logDir + '/%s.log.ipynb' % file_name) if logDir and file_name else None
try:
with pm.utils.chdir(input[:input.rfind('/')]):
input = input[input.rfind('/')+1:]
pm.execute_notebook(input, logOut, arguments)
print('successfully launched notebook {input}'.format(input=input))
if tasks.get(task_id):
tasks[task_id] = {
'status': 'success',
'result': arguments.get('output_path')
}
except Exception as error:
print('failed to launch notebook {input}'.format(input=input))
print(error)
return web.HTTPInternalServerError(reason=error)
if tasks.get(task_id):
tasks[task_id] = {
'status': 'failed',
'result': error
}
finally:
print(asyncio.all_tasks())
print('ended launch notebook {input} with {arguments}'.format(input=input, arguments=arguments))


async def reqLaunch(req: Request):
Expand All @@ -289,6 +300,8 @@ async def reqLaunch(req: Request):
"503":
description: server is currently busy.
"""
from uuid import uuid4
global tasks
path = req.rel_url.query.get('path')
print('/execute?path={path}'.format(path=str(path)))
if not req.can_read_body:
Expand All @@ -305,8 +318,14 @@ async def reqLaunch(req: Request):
output_path = resultsDir + '/%s.jsonl' % str(file_name)
parameters = await req.json()
parameters['output_path'] = output_path
asyncio.shield(asyncio.create_task(launchNotebook(pathConverted, parameters, file_name)))
return web.json_response({'path': replacePathLocalToServer(output_path)})
task_id = str(uuid4())
job = spawn(req, launchNotebook(pathConverted, parameters, file_name, task_id))
tasks[task_id] = {
'status': 'in progress',
'job': job
}
asyncio.shield(job)
return web.json_response({'path': replacePathLocalToServer(output_path), 'task_id': task_id})


async def reqResult(req: Request):
Expand All @@ -327,16 +346,61 @@ async def reqResult(req: Request):
"503":
description: server is currently busy.
"""
path = req.rel_url.query.get('path')
print('/result?path={path}'.format(path=str(path)))
pathConverted = path and replacePathServerToLocal(path)
if not path or not os.path.isfile(pathConverted):
global tasks
task_id = req.rel_url.query.get('id')
print('/result?id={task_id}'.format(task_id=str(task_id)))
task = tasks.get(task_id, None)
print(task)
if not task:
return web.HTTPNotFound()
status = task.get('status', None)
print(status)
if status == 'in progress':
return web.json_response({'status': status})
elif status == 'success':
path = task.get('result','')
pathConverted = replacePathServerToLocal(path)
if not path or not os.path.isfile(pathConverted):
return web.HTTPNotFound()
file = open(pathConverted, "r")
content = file.read()
file.close()
return web.json_response({'status': status, 'result': content})
elif status == 'failed':
error = task.get('result', Exception())
print(error)
return web.json_response({'status': status, 'result': str(error)})
else:
return web.HTTPNotFound()
file = open(pathConverted, "r")
content = file.read()
file.close()
return web.json_response({'result': content})

async def reqStop(req: Request):
"""
---
description: This end-point allows to stop task by id.
tags:
- Execution operation
produces:
- application/json
responses:
"200":
description: successful operation. Return resulting file's json.
"400":
description: body with parameters not present.
"404":
description: requested file doesn't exist.
"503":
description: server is currently busy.
"""
global tasks
task_id = req.rel_url.query.get('id')
print('/stop?id={task_id}'.format(task_id=str(task_id)))
task = tasks.pop(task_id, None)
try:
if task:
await task.job.close()
except:
return web.HTTPInternalServerError(reason='failed to stop process')
return web.HTTPOk()

if __name__ == '__main__':
parser = ArgumentParser()
Expand All @@ -347,6 +411,7 @@ async def reqResult(req: Request):

app = web.Application()


setup(app)
app.router.add_route('GET', "/status", reqStatus)
app.router.add_route('GET', "/files/all", reqFiles)
Expand All @@ -355,6 +420,7 @@ async def reqResult(req: Request):
app.router.add_route('GET', "/files", reqArguments)
app.router.add_route('POST', "/execute", reqLaunch)
app.router.add_route('GET', "/result", reqResult)
app.router.add_route('POST', "/stop", reqStop)
setup_swagger(app)
print('starting server')
web.run_app(app)

0 comments on commit c86c2e5

Please sign in to comment.