diff --git a/README.md b/README.md index 6c970e8..1950494 100644 --- a/README.md +++ b/README.md @@ -123,7 +123,14 @@ docker compose build ### 0.0.7 -* Custom engine holds separate papermill notebook client for each file. +* j-sp generates cookies with `engine_user_id` field to identify user for creating unique python engine. +* Custom engine holds separate papermill notebook client for each `engine_user_id` and file combination. +* update local run with jupyter-notebook: + * updated th2-rpt-viewer: + * added pycode parameter type + * added ability to save/load presets for notebooks + * compare mode was changed to have ability to launch notebooks + * added ability to move to nearest chunk in compare mode ### 0.0.6 diff --git a/example/example.ipynb b/example/example.ipynb new file mode 100644 index 0000000..80719b6 --- /dev/null +++ b/example/example.ipynb @@ -0,0 +1,87 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 9, + "metadata": { + "tags": [ + "parameters" + ] + }, + "outputs": [], + "source": [ + "output_path='output.jsonl'\n", + "lambda_pycode = \"\"\"lambda a:a['weight'] < 300\"\"\"" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "0 5 50\n" + ] + } + ], + "source": [ + "import json\n", + "import random\n", + "from datetime import datetime\n", + "amount = 100\n", + "\n", + "displayTable = [\n", + " ['bid order', 'bid rate', 'bid qty', 'ask order', 'ask rate', 'ask qty'],\n", + " ['424', '0.61500000', '101', '', '', ''],\n", + " ['424', '0.61500000', '101', '', '', ''],\n", + "]\n", + "\n", + "timestamp = 1725539650\n", + "amount = 200\n", + "pos = 0\n", + "conver = 1000000000\n", + "\n", + "testArr = []\n", + "\n", + "while pos < amount:\n", + " timestamp_in_nano = timestamp*1000000000\n", + " weight = random.randrange(1, 1000)\n", + " timest_obj = datetime.fromtimestamp(timestamp)\n", + " testArr.append({ '#display-timestamp': timestamp_in_nano, '#display-name': str(pos) + ' ' + str(timest_obj), '#display-table': displayTable, 'weight': weight})\n", + " pos = pos + 1\n", + " timestamp += random.randrange(1, 100)\n", + "\n", + "filtered = list(filter(eval(lambda_pycode), testArr))\n", + "\n", + "with open(output_path, \"w\") as out_file:\n", + " for orig_order in filtered:\n", + " json.dump(orig_order, out_file)\n", + " out_file.write('\\n')" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.0" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/json_stream_provider/custom_engines.py b/json_stream_provider/custom_engines.py index 6c53c72..2a9d739 100644 --- a/json_stream_provider/custom_engines.py +++ b/json_stream_provider/custom_engines.py @@ -20,26 +20,28 @@ from papermill.engines import NBClientEngine, NotebookExecutionManager, PapermillEngines from papermill.utils import remove_args, merge_kwargs, logger +DEFAULT_ENGINE_USER_ID = 'default_engine_user_id' + class EngineKey: - def __init__(self, client_id, notebook_file): - self.client_id = client_id + def __init__(self, user_id, notebook_file): + self.user_id = user_id self.notebook_file = notebook_file def __hash__(self): # Combine attributes for a unique hash - return hash((self.client_id, self.notebook_file)) + return hash((self.user_id, self.notebook_file)) def __eq__(self, other): if isinstance(other, EngineKey): - return self.client_id == other.client_id and self.notebook_file == other.notebook_file + return self.user_id == other.user_id and self.notebook_file == other.notebook_file return False def __iter__(self): - return iter((self.client_id, self.notebook_file)) + return iter((self.user_id, self.notebook_file)) def __str__(self): - return f"{self.client_id}:{self.notebook_file}" + return f"{self.user_id}:{self.notebook_file}" class EngineHolder: @@ -135,6 +137,7 @@ async def async_execute_notebook( cls, nb, kernel_name, + engine_user_id=DEFAULT_ENGINE_USER_ID, output_path=None, progress_bar=True, log_output=False, @@ -159,7 +162,8 @@ async def async_execute_notebook( nb_man.notebook_start() try: - await cls.async_execute_managed_notebook(nb_man, kernel_name, log_output=log_output, **kwargs) + await cls.async_execute_managed_notebook(nb_man, kernel_name, log_output=log_output, + engine_user_id=engine_user_id, **kwargs) finally: nb_man.cleanup_pbar() nb_man.notebook_complete() @@ -173,6 +177,7 @@ async def async_execute_managed_notebook( cls, nb_man, kernel_name, + engine_user_id=DEFAULT_ENGINE_USER_ID, log_output=False, stdout_file=None, stderr_file=None, @@ -190,8 +195,11 @@ async def async_execute_managed_notebook( configured logger. start_timeout (int): Duration to wait for kernel start-up. execution_timeout (int): Duration to wait before failing execution (default: never). + engine_user_id (str): User id to create papermill engine client """ + key = EngineKey(engine_user_id, nb_man.nb['metadata']['papermill']['input_path']) + def create_client(): # TODO: should be static # Exclude parameters that named differently downstream safe_kwargs = remove_args(['timeout', 'startup_timeout'], **kwargs) @@ -210,8 +218,6 @@ def create_client(): # TODO: should be static cls.logger.info(f"Created papermill notebook client for {key}") return PapermillNotebookClient(nb_man, **final_kwargs) - # TODO: pass client_id - key = EngineKey("", nb_man.nb['metadata']['papermill']['input_path']) engine_holder: EngineHolder = cls.get_or_create_engine_metadata(key, create_client) return await engine_holder.async_execute(nb_man) @@ -248,9 +254,10 @@ def remove_out_of_date_engines(cls, exclude_key: EngineKey): class CustomEngines(PapermillEngines): - async def async_execute_notebook_with_engine(self, engine_name, nb, kernel_name, **kwargs): + async def async_execute_notebook_with_engine(self, engine_name, nb, kernel_name, + engine_user_id=DEFAULT_ENGINE_USER_ID, **kwargs): """Fetch a named engine and execute the nb object against it.""" - return await self.get_engine(engine_name).async_execute_notebook(nb, kernel_name, **kwargs) + return await self.get_engine(engine_name).async_execute_notebook(nb, kernel_name, engine_user_id, **kwargs) # Instantiate a ExactproPapermillEngines instance, register Handlers and entrypoints diff --git a/json_stream_provider/papermill_execute_ext.py b/json_stream_provider/papermill_execute_ext.py index 6cdabb6..52e35f6 100644 --- a/json_stream_provider/papermill_execute_ext.py +++ b/json_stream_provider/papermill_execute_ext.py @@ -11,18 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. from pathlib import Path @@ -34,7 +22,7 @@ from papermill.parameterize import add_builtin_parameters, parameterize_notebook, parameterize_path from papermill.utils import chdir -from json_stream_provider.custom_engines import exactpro_papermill_engines +from json_stream_provider.custom_engines import exactpro_papermill_engines, DEFAULT_ENGINE_USER_ID # The code of this method is derived from https://github.com/nteract/papermill/blob/2.6.0 under the BSD License. @@ -74,6 +62,7 @@ async def async_execute_notebook( input_path, output_path, + engine_user_id=DEFAULT_ENGINE_USER_ID, parameters=None, engine_name=None, request_save_on_cell_execute=True, @@ -97,6 +86,8 @@ async def async_execute_notebook( Path to input notebook or NotebookNode object of notebook output_path : str or Path or None Path to save executed notebook. If None, no file will be saved + engine_user_id : str + User id to create papermill engine client parameters : dict, optional Arbitrary keyword arguments to pass to the notebook parameters engine_name : str, optional @@ -176,6 +167,7 @@ async def async_execute_notebook( nb = await exactpro_papermill_engines.async_execute_notebook_with_engine( engine_name, nb, + engine_user_id=engine_user_id, input_path=input_path, output_path=output_path if request_save_on_cell_execute else None, kernel_name=kernel_name, diff --git a/local-run/with-jupyter-notebook/compose.yml b/local-run/with-jupyter-notebook/compose.yml index 260096a..d4f7f13 100644 --- a/local-run/with-jupyter-notebook/compose.yml +++ b/local-run/with-jupyter-notebook/compose.yml @@ -28,7 +28,7 @@ services: - th2_network th2_rpt_viewer: - image: ghcr.io/th2-net/th2-rpt-viewer:5.2.9-notebooks-launch-time-10918718527 + image: ghcr.io/th2-net/th2-rpt-viewer:5.2.11-notebooks-launch-time-11887250641 ports: - "8083:8080" networks: diff --git a/server.py b/server.py index e2a944c..a0bb51e 100644 --- a/server.py +++ b/server.py @@ -11,42 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. import asyncio import json @@ -56,19 +20,26 @@ from asyncio import Task from datetime import datetime, timezone from enum import Enum +from logging import INFO from typing import Coroutine, Any +from uuid import uuid4 -from json_stream_provider import papermill_execute_ext as epm import papermill as pm -from papermill.utils import chdir from aiohttp import web +from aiohttp.web_middlewares import middleware from aiohttp.web_request import Request +from aiohttp.web_response import Response from aiohttp_swagger import * from aiojobs import Job from aiojobs.aiohttp import setup +from papermill.utils import chdir +from json_stream_provider import papermill_execute_ext as epm from json_stream_provider.custom_engines import CustomEngine, EngineBusyError from json_stream_provider.log_configuratior import configure_logging +from json_stream_provider.papermill_execute_ext import DEFAULT_ENGINE_USER_ID + +ENGINE_USER_ID_COOKIE_KEY = 'engine_user_id' os.system('pip list') @@ -146,8 +117,45 @@ def read_config(path: str): logger.error(f"Read '{path}' configuration failure", e) +def get_or_default_engine_user_id(req: Request) -> str: + return req.cookies.get(ENGINE_USER_ID_COOKIE_KEY, DEFAULT_ENGINE_USER_ID) + + +def get_or_gen_engine_user_id(req: Request = None) -> str: + global logger + + engine_user_id: str = DEFAULT_ENGINE_USER_ID + if req is not None: + engine_user_id = get_or_default_engine_user_id(req) + if engine_user_id == DEFAULT_ENGINE_USER_ID: + engine_user_id = str(uuid4()) + if logger.isEnabledFor(INFO): + user_agent = req.headers.get('User-Agent', 'unknown') + user_ip = req.remote + logger.info(f"Generated user identifier for {user_ip}/{user_agent}") + return engine_user_id + + +def put_engine_user_id_if_absent(res: Response, engine_user_id: str = None, req: Request = None) -> Response: + if res.cookies.get(ENGINE_USER_ID_COOKIE_KEY) is not None: + return res + + if engine_user_id is None: + engine_user_id = get_or_gen_engine_user_id(req) + res.set_cookie(ENGINE_USER_ID_COOKIE_KEY, engine_user_id) + return res + + +@middleware +async def add_engine_user_id_middleware(req, handler): + res = await handler(req) + if isinstance(req, Request) and isinstance(res, Response) and res.status == 200: + res = put_engine_user_id_if_absent(res=res, req=req) + return res + + # noinspection PyUnusedLocal -async def req_status(req: Request): +async def req_status(req: Request) -> Response: """ --- description: This end-point allow to test that service is up. @@ -192,7 +200,7 @@ def replace_server_to_local(path: str): raise Exception("Path didn't start with notebooks or results folder") -async def req_notebooks(req: Request): +async def req_notebooks(req: Request) -> Response: """ --- description: This end-point allows to get notebooks that could be requested. @@ -252,7 +260,7 @@ async def req_notebooks(req: Request): }) -async def req_jsons(req: Request): +async def req_jsons(req: Request) -> Response: """ --- description: This end-point allows to get JSONLs that could be requested. @@ -321,7 +329,7 @@ async def req_jsons(req: Request): }) -async def req_files(req: Request): +async def req_files(req: Request) -> Response: """ --- description: This end-point allows to get files and directories. @@ -390,7 +398,7 @@ async def req_files(req: Request): }) -async def req_parameters(req: Request): +async def req_parameters(req: Request) -> Response: """ --- description: This end-point allows to get parameters for notebook in requested path. @@ -420,7 +428,7 @@ async def req_parameters(req: Request): return web.json_response(params) -async def launch_notebook(input_path, arguments: dict, file_name, task_metadata: TaskMetadata): +async def launch_notebook(engine_user_id: str, input_path, arguments: dict, file_name, task_metadata: TaskMetadata): global logger global tasks logger.info(f'launching notebook {input_path} with {arguments}') @@ -435,6 +443,7 @@ async def launch_notebook(input_path, arguments: dict, file_name, task_metadata: with chdir(input_path[:input_path.rfind('/')]): input_path = input_path[input_path.rfind('/') + 1:] await epm.async_execute_notebook( + engine_user_id=engine_user_id, input_path=input_path, output_path=log_out, parameters=arguments, @@ -473,7 +482,7 @@ def convert_parameter(parameter): return parameter_value -async def req_launch(req: Request): +async def req_launch(req: Request) -> Response: """ --- description: This end-point allows to start notebook. Query requires path to notebook. @@ -492,7 +501,6 @@ async def req_launch(req: Request): "500": description: failed operation. directory for output doesn't exist. """ - from uuid import uuid4 global tasks global logger path_arg = req.rel_url.query.get('path', '') @@ -514,6 +522,7 @@ async def req_launch(req: Request): output_path = results_dir + '/%s.jsonl' % str(file_name) customization_path = results_dir + '/%s.json' % str(file_name) req_json = await req.json() + user_id = get_or_default_engine_user_id(req) parameters = {} for key, parameter in req_json.items(): try: @@ -525,12 +534,13 @@ async def req_launch(req: Request): task_id = str(uuid4()) task_metadata = TaskMetadata(task_id=task_id) tasks[task_id] = task_metadata - task: Task[None] = asyncio.create_task(launch_notebook(path_converted, parameters, file_name, task_metadata)) + task: Task[None] = asyncio.create_task( + launch_notebook(user_id, path_converted, parameters, file_name, task_metadata)) task_metadata.task = task return web.json_response({'task_id': task_id}) -async def req_file(req: Request): +async def req_file(req: Request) -> Response: """ --- description: This end-point allows to get file from requested path. Query requires path to file. @@ -562,7 +572,7 @@ async def req_file(req: Request): return web.json_response({'result': content}) -async def req_result(req: Request): +async def req_result(req: Request) -> Response: """ --- description: This end-point allows to get result from requested task. @@ -615,7 +625,7 @@ async def req_result(req: Request): return web.HTTPNotFound() -async def req_stop(req: Request): +async def req_stop(req: Request) -> Response: """ --- description: This end-point allows to stop task. Query requires task id which will be stopped. @@ -650,7 +660,7 @@ async def req_stop(req: Request): if cfg_path: read_config(cfg_path) - app = web.Application() + app = web.Application(middlewares=[add_engine_user_id_middleware]) setup(app) app.router.add_route('GET', "/status", req_status)