From 9a915bc3738aa94fa67ab14a8261b38267ce53aa Mon Sep 17 00:00:00 2001 From: "nikita.smirnov" Date: Fri, 5 Jul 2024 19:03:09 +0400 Subject: [PATCH] Refactored --- server.py | 140 +++++++++++++++++++++++++++--------------------------- 1 file changed, 71 insertions(+), 69 deletions(-) diff --git a/server.py b/server.py index 78882f8..bc676cf 100644 --- a/server.py +++ b/server.py @@ -12,22 +12,22 @@ # See the License for the specific language governing permissions and # limitations under the License. +import asyncio +import json +import logging.config import os - -from log_configuratior import configureLogging - -os.system('pip list') +from argparse import ArgumentParser +from datetime import datetime, timezone import papermill as pm -from aiohttp.web_request import Request from aiohttp import web -from aiojobs.aiohttp import setup, spawn +from aiohttp.web_request import Request from aiohttp_swagger import * -import json -import datetime -import asyncio -from argparse import ArgumentParser -import logging.config +from aiojobs.aiohttp import setup, spawn + +from log_configuratior import configureLogging + +os.system('pip list') serverStatus: str = 'idle' notebooksDir: str = '/home/jupyter-notebook/' @@ -137,28 +137,28 @@ async def reqNotebooks(req: Request): description: failed operation when queried directory doesn't exist or requested path didn't start with ./notebooks. """ global logger - path = req.rel_url.query.get('path', '') - logger.info('/files/notebooks?path={path}'.format(path=str(path))) - if path == '': + path_arg = req.rel_url.query.get('path', '') + logger.info('/files/notebooks?path={path}'.format(path=str(path_arg))) + if path_arg == '': + dirs_note = [] if os.path.isdir(notebooksDir): - dirsNote = list(map(replacePathLocalToServer, getDirs(notebooksDir))) + dirs_note = list(map(replacePathLocalToServer, getDirs(notebooksDir))) files = list(map(replacePathLocalToServer, getFiles(notebooksDir, '.ipynb'))) return web.json_response({ - 'directories': dirsNote, + 'directories': dirs_note, 'files': files }) try: - pathConverted = replacePathServerToLocal(path) + path_converted = replacePathServerToLocal(path_arg) except: return web.HTTPNotFound(reason="Requested path didn't start with ./notebooks") - dirsNote = [] - if path: - if os.path.isdir(pathConverted): - dirs = list(map(replacePathLocalToServer, getDirs(pathConverted))) - files = list(map(replacePathLocalToServer, getFiles(pathConverted, '.ipynb'))) + if path_arg: + if os.path.isdir(path_converted): + dirs = list(map(replacePathLocalToServer, getDirs(path_converted))) + files = list(map(replacePathLocalToServer, getFiles(path_converted, '.ipynb'))) return web.json_response({ 'directories': dirs, 'files': files @@ -187,34 +187,34 @@ async def reqJsons(req: Request): description: failed operation when queried directory doesn't exist or requested path didn't start with ./results or ./notebooks. """ global logger - path = req.rel_url.query.get('path', '') - logger.info('/files/results?path={path}'.format(path=str(path))) + path_arg = req.rel_url.query.get('path', '') + logger.info('/files/results?path={path}'.format(path=str(path_arg))) - if path == '': + if path_arg == '': + dirs_res = [] + dirs_note = [] if os.path.isdir(resultsDir): - dirsRes = list(map(replacePathLocalToServer, getDirs(resultsDir))) + dirs_res = list(map(replacePathLocalToServer, getDirs(resultsDir))) if os.path.isdir(notebooksDir): - dirsNote = list(map(replacePathLocalToServer, getDirs(notebooksDir))) + dirs_note = list(map(replacePathLocalToServer, getDirs(notebooksDir))) - filesRes = list(map(replacePathLocalToServer, getFiles(resultsDir, '.jsonl'))) - filesNote = list(map(replacePathLocalToServer, getFiles(notebooksDir, '.jsonl'))) + files_res = list(map(replacePathLocalToServer, getFiles(resultsDir, '.jsonl'))) + files_note = list(map(replacePathLocalToServer, getFiles(notebooksDir, '.jsonl'))) return web.json_response({ - 'directories': list({*dirsNote, *dirsRes}), - 'files': list({*filesNote, *filesRes}) + 'directories': list({*dirs_note, *dirs_res}), + 'files': list({*files_note, *files_res}) }) try: - pathConverted = replacePathServerToLocal(path) + path_converted = replacePathServerToLocal(path_arg) except: return web.HTTPNotFound(reason="Requested path didn't start with ./results or ./notebooks") - dirsNote = [] - dirsRes = [] - if path: - if os.path.isdir(pathConverted): - dirs = list(map(replacePathLocalToServer, getDirs(pathConverted))) - files = list(map(replacePathLocalToServer, getFiles(pathConverted, '.jsonl'))) + if path_arg: + if os.path.isdir(path_converted): + dirs = list(map(replacePathLocalToServer, getDirs(path_converted))) + files = list(map(replacePathLocalToServer, getFiles(path_converted, '.jsonl'))) return web.json_response({ 'directories': dirs, 'files': files @@ -255,33 +255,35 @@ async def reqArguments(req: Request): return web.json_response(params) -async def launchNotebook(input, arguments, file_name, task_id): +async def launch_notebook(input_path, arguments, file_name, task_id): global tasks global logger - logger.info('launching notebook {input} with {arguments}'.format(input=input, arguments=arguments)) - logOut: str = (logDir + '/%s.log.ipynb' % file_name) if logDir and file_name else None + logger.info(f'launching notebook {input_path} with {arguments}') + start_execution = datetime.now() + log_out: str = (logDir + '/%s.log.ipynb' % file_name) if logDir and file_name else None try: - with pm.utils.chdir(input[:input.rfind('/')]): - input = input[input.rfind('/')+1:] - pm.execute_notebook(input, logOut, arguments) - logger.info('successfully launched notebook {input}'.format(input=input)) + with pm.utils.chdir(input_path[:input_path.rfind('/')]): + input_path = input_path[input_path.rfind('/') + 1:] + pm.execute_notebook(input_path, log_out, arguments) + logger.debug(f'successfully launched notebook {input_path}') if tasks.get(task_id): tasks[task_id] = { 'status': 'success', 'result': arguments.get('output_path') } except Exception as error: - logger.error('failed to launch notebook {input}'.format(input=input), error) + logger.error(f'failed to launch notebook {input_path}', error) if tasks.get(task_id): tasks[task_id] = { 'status': 'failed', 'result': error } finally: - logger.info('ended launch notebook {input} with {arguments}'.format(input=input, arguments=arguments)) + spent_time = (datetime.now() - start_execution).total_seconds() + logger.info(f'ended launch notebook {input_path} with {arguments} spent_time {spent_time} sec') -async def reqLaunch(req: Request): +async def req_launch(req: Request): """ --- description: This end-point allows to start notebook. Query requires path to notebook. Body requred to be dictionary of parameters. @@ -302,26 +304,26 @@ async def reqLaunch(req: Request): from uuid import uuid4 global tasks global logger - path = req.rel_url.query.get('path', '') - logger.info('/execute?path={path}'.format(path=str(path))) + path_arg = req.rel_url.query.get('path', '') + logger.info('/execute?path={path}'.format(path=str(path_arg))) if not req.can_read_body: return web.HTTPBadRequest(reason='Body with parameters not present') try: - pathConverted = replacePathServerToLocal(path) + path_converted = replacePathServerToLocal(path_arg) except: return web.HTTPNotFound(reason="Requested path didn't start with ./notebooks") - if not path or not os.path.isfile(pathConverted): + if not path_arg or not os.path.isfile(path_converted): return web.HTTPNotFound() if not os.path.exists(resultsDir): return web.HTTPInternalServerError(reason='No output directory') - notebookName = pathConverted.split('/')[-1].split('.')[0]; - timestamp = datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%dT%H-%M-%S-%f") - file_name = notebookName + '_' + timestamp + notebook_name = path_converted.split('/')[-1].split('.')[0] + timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H-%M-%S-%f") + file_name = notebook_name + '_' + timestamp output_path = resultsDir + '/%s.jsonl' % str(file_name) parameters = await req.json() parameters['output_path'] = output_path task_id = str(uuid4()) - job = spawn(req, launchNotebook(pathConverted, parameters, file_name, task_id)) + job = spawn(req, launch_notebook(path_converted, parameters, file_name, task_id)) tasks[task_id] = { 'status': 'in progress', 'job': job @@ -330,7 +332,7 @@ async def reqLaunch(req: Request): return web.json_response({'task_id': task_id}) -async def reqFile(req: Request): +async def req_file(req: Request): """ --- description: This end-point allows to get file from requested path. Query requires path to file. @@ -384,26 +386,26 @@ async def reqResult(req: Request): task_id = req.rel_url.query.get('id') logger.info('/result?id={task_id}'.format(task_id=str(task_id))) task = tasks.get(task_id, None) - if not task: + if task is None: return web.HTTPNotFound(reason="Requested task doesn't exist") status = task.get('status', None) if status == 'in progress': return web.json_response({'status': status}) elif status == 'success': - path = task.get('result','') - if not path or not os.path.isfile(path): + path_param = task.get('result','') + if not path_param or not os.path.isfile(path_param): return web.HTTPNotFound(reason="Resulting file doesn't exist") - file = open(path, "r") + file = open(path_param, "r") content = file.read() file.close() - return web.json_response({'status': status, 'result': content, 'path': replacePathLocalToServer(path) }) + return web.json_response({'status': status, 'result': content, 'path': replacePathLocalToServer(path_param) }) elif status == 'failed': error = task.get('result', Exception()) return web.json_response({'status': status, 'result': str(error)}) else: return web.HTTPNotFound() -async def reqStop(req: Request): +async def req_stop(req: Request): """ --- description: This end-point allows to stop task. Query requires task id which will be stopped. @@ -432,9 +434,9 @@ async def reqStop(req: Request): if __name__ == '__main__': parser = ArgumentParser() parser.add_argument('config') - path = vars(parser.parse_args()).get('config') - if (path): - readConf(path) + cfg_path = vars(parser.parse_args()).get('config') + if (cfg_path): + readConf(cfg_path) app = web.Application() @@ -443,10 +445,10 @@ async def reqStop(req: Request): app.router.add_route('GET', "/files/notebooks", reqNotebooks) app.router.add_route('GET', "/files/results", reqJsons) app.router.add_route('GET', "/files", reqArguments) - app.router.add_route('GET', "/file", reqFile) - app.router.add_route('POST', "/execute", reqLaunch) + app.router.add_route('GET', "/file", req_file) + app.router.add_route('POST', "/execute", req_launch) app.router.add_route('GET', "/result", reqResult) - app.router.add_route('POST', "/stop", reqStop) + app.router.add_route('POST', "/stop", req_stop) setup_swagger(app) logger.info('starting server') web.run_app(app) \ No newline at end of file