From 9ab041c9d13074063f23f62b5fd1a167df6dc615 Mon Sep 17 00:00:00 2001 From: Nikita Smirnov <46124551+Nikita-Smirnov-Exactpro@users.noreply.github.com> Date: Mon, 23 Sep 2024 12:11:40 +0400 Subject: [PATCH] Improve notebook execution performance (#6) * Added papermill custom engine to reuse it for notebook execution * update file path to absolute instead of relative * add ability for getting customization file from result * Implemented logic to use a separate papermill notebook client for each notebook * Implemented async_execute for papermill engine * Added EngineBusyError * Configured compose.yml to build j-sp image instead of pulling from repository * updated readme with changes in rpt-viewer * Optimised Dockerfile --------- Co-authored-by: molotgor --- Dockerfile | 9 +- NOTICE | 1 + README.md | 22 +- json_stream_provider/__init__.py | 0 json_stream_provider/custom_engines.py | 259 +++++++++++++++ .../log_configuratior.py | 20 +- json_stream_provider/papermill_execute_ext.py | 196 +++++++++++ local-run/with-jupyter-notebook/compose.yml | 6 +- .../json-stream-provider/custom.json | 3 +- package_info.json | 2 +- server.py | 312 ++++++++++++------ 11 files changed, 721 insertions(+), 109 deletions(-) create mode 100644 NOTICE create mode 100644 json_stream_provider/__init__.py create mode 100644 json_stream_provider/custom_engines.py rename log_configuratior.py => json_stream_provider/log_configuratior.py (63%) create mode 100644 json_stream_provider/papermill_execute_ext.py diff --git a/Dockerfile b/Dockerfile index 2729076..b68c30c 100644 --- a/Dockerfile +++ b/Dockerfile @@ -3,8 +3,8 @@ FROM python:3.9-slim # Set the working directory in the container WORKDIR /app -# Copy the current directory contents into the container at /app -COPY . /app +# Copy requirements.txt into the container at /app +COPY requirements.txt /app/ # groupadd --system - create a system account # useradd --system - create a system account @@ -41,5 +41,10 @@ ENV PIP_CONFIG_FILE="${HOME}/.pip/pip.conf" RUN mkdir -p "${PYTHON_SHARED_LIB_PATH}" RUN echo 'umask 0007' >> "${HOME}/.bashrc" +# Copy the json_stream_provider module into the container at /app +COPY json_stream_provider /app/json_stream_provider +# Copy the destributive files into the container at /app +COPY LICENSE NOTICE README.md package_info.json server.py /app/ + ENTRYPOINT ["python", "/app/server.py"] CMD ["/var/th2/config/custom.json"] \ No newline at end of file diff --git a/NOTICE b/NOTICE new file mode 100644 index 0000000..897845d --- /dev/null +++ b/NOTICE @@ -0,0 +1 @@ +This project includes code from https://github.com/nteract/papermill/blob/2.6.0 which is licensed under the BSD License. \ No newline at end of file diff --git a/README.md b/README.md index 81c419d..6c970e8 100644 --- a/README.md +++ b/README.md @@ -9,6 +9,7 @@ This python server is made to launch Jupyter notebooks (*.ipynb) and get results * `notebooks` (Default value: /home/jupyter-notebook/) - path to the directory with notebooks. `j-sp` search files with `ipynb` extension recursively in the specified folder. * `results` (Default value: /home/jupyter-notebook/results) - path to the directory for run results. `j-sp` resolves result file with `jsonl` extension against specified folder. * `logs` (Default value: /home/jupyter-notebook/logs) - path to the directory for run logs. `j-sp` puts run logs to specified folder. +* `out-of-use-engine-time` (Default value: 3600) - out-of-use time interval in seconds. `j-sp` unregisters engine related to a notebook when user doesn't run the notebook more than this time ### mounting: @@ -37,6 +38,7 @@ spec: notebooks: /home/jupyter-notebook/ results: /home/jupyter-notebook/j-sp/results/ logs: /home/jupyter-notebook/j-sp/logs/ + out-of-use-engine-time: 3600 mounting: - path: /home/jupyter-notebook/ pvcName: jupyter-notebook @@ -100,7 +102,7 @@ chmod -R g=u user_data/ #### start command ```shell cd local-run/with-jupyter-notebook -docker compose up +docker compose up --build ``` #### clean command ```shell @@ -119,9 +121,25 @@ docker compose build ## Release notes: +### 0.0.7 + +* Custom engine holds separate papermill notebook client for each file. + +### 0.0.6 + +* Added papermill custom engine to reuse it for notebook execution. + A separate engine is registered for each notebook and unregistered after 1 hour out-of-use time by default. +* update local run with jupyter-notebook: + * updated th2-rpt-viewer: + * `JSON Reader` page pulls execution status each 50 ms instead of 1 sec + * `JSON Reader` page now uses virtuoso for rendering lists + * `JSON Reader` page now has search, it's values could be loaded from `json` file containing array of objects containing `pattern` and `color` fields for searching content. Execution of notebook could create such file and it will be loaded into UI if it would be created in path of `customization_path` parameter. + * Added ability to create multiple `JSON Reader` pages. + * `JSON Reader` page now has compare mode. + ### 0.0.5 -* added `umask 0007` to `~/.bashrc` file to provide rw file access for `users` group +* added `umask 0007` to `~/.bashrc` file to provide rw file access for `users` group * added `/file` request for loading content of single jsonl file * removed ability to get any file from machine via `/file` REST APIs * added sorting on requests `/files/notebooks` and `/files/results` diff --git a/json_stream_provider/__init__.py b/json_stream_provider/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/json_stream_provider/custom_engines.py b/json_stream_provider/custom_engines.py new file mode 100644 index 0000000..6c53c72 --- /dev/null +++ b/json_stream_provider/custom_engines.py @@ -0,0 +1,259 @@ +# Copyright 2024 Exactpro (Exactpro Systems Limited) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging.config +import time +from datetime import datetime + +from papermill.clientwrap import PapermillNotebookClient +from papermill.engines import NBClientEngine, NotebookExecutionManager, PapermillEngines +from papermill.utils import remove_args, merge_kwargs, logger + + +class EngineKey: + def __init__(self, client_id, notebook_file): + self.client_id = client_id + self.notebook_file = notebook_file + + def __hash__(self): + # Combine attributes for a unique hash + return hash((self.client_id, self.notebook_file)) + + def __eq__(self, other): + if isinstance(other, EngineKey): + return self.client_id == other.client_id and self.notebook_file == other.notebook_file + return False + + def __iter__(self): + return iter((self.client_id, self.notebook_file)) + + def __str__(self): + return f"{self.client_id}:{self.notebook_file}" + + +class EngineHolder: + _key: EngineKey + _client: PapermillNotebookClient + _last_used_time: float + _busy: bool = False + + def __init__(self, key: EngineKey, client: PapermillNotebookClient): + self._key = key + self._client = client + self._last_used_time = time.time() + + def __str__(self): + return f"Engine(key={self._key}, last_used_time={self._last_used_time}, is_busy={self._busy})" + + async def async_execute(self, nb_man): + if self._busy: + raise EngineBusyError( + f"Notebook client related to '{self._key}' has been busy since {self._get_last_used_date_time()}") + + try: + self._busy = True + # accept new notebook into (possibly) existing client + self._client.nb_man = nb_man + self._client.nb = nb_man.nb + # reuse client connection to existing kernel + output = await self._client.async_execute(cleanup_kc=False) + # renumber executions + for i, cell in enumerate(nb_man.nb.cells): + if 'execution_count' in cell: + cell['execution_count'] = i + 1 + + return output + finally: + self._busy = False + + def get_last_used_time(self) -> float: + return self._last_used_time + + def close(self): + self._client = None + + def _get_last_used_date_time(self): + return datetime.fromtimestamp(self._last_used_time) + + +class EngineBusyError(RuntimeError): + pass + + +class CustomEngine(NBClientEngine): + out_of_use_engine_time: int = 60 * 60 + metadata_dict: dict = {} + logger: logging.Logger + + # The code of this method is derived from https://github.com/nteract/papermill/blob/2.6.0 under the BSD License. + # Original license follows: + # + # BSD 3-Clause License + # + # Copyright (c) 2017, nteract + # All rights reserved. + # + # Redistribution and use in source and binary forms, with or without + # modification, are permitted provided that the following conditions are met: + # + # * Redistributions of source code must retain the above copyright notice, this + # list of conditions and the following disclaimer. + # + # * Redistributions in binary form must reproduce the above copyright notice, + # this list of conditions and the following disclaimer in the documentation + # and/or other materials provided with the distribution. + # + # * Neither the name of the copyright holder nor the names of its + # contributors may be used to endorse or promote products derived from + # this software without specific prior written permission. + # + # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + # DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE + # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + # SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER + # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, + # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + # + # Modified by Exactpro for https://github.com/th2-net/th2-json-stream-provider-py + @classmethod + async def async_execute_notebook( + cls, + nb, + kernel_name, + output_path=None, + progress_bar=True, + log_output=False, + autosave_cell_every=30, + **kwargs, + ): + """ + A wrapper to handle notebook execution tasks. + + Wraps the notebook object in a `NotebookExecutionManager` in order to track + execution state in a uniform manner. This is meant to help simplify + engine implementations. This allows a developer to just focus on + iterating and executing the cell contents. + """ + nb_man = NotebookExecutionManager( + nb, + output_path=output_path, + progress_bar=progress_bar, + log_output=log_output, + autosave_cell_every=autosave_cell_every, + ) + + nb_man.notebook_start() + try: + await cls.async_execute_managed_notebook(nb_man, kernel_name, log_output=log_output, **kwargs) + finally: + nb_man.cleanup_pbar() + nb_man.notebook_complete() + + return nb_man.nb + + # this method has been copied from the issue comment + # https://github.com/nteract/papermill/issues/583#issuecomment-791988091 + @classmethod + async def async_execute_managed_notebook( + cls, + nb_man, + kernel_name, + log_output=False, + stdout_file=None, + stderr_file=None, + start_timeout=60, + execution_timeout=None, + **kwargs + ): + """ + Performs the actual execution of the parameterized notebook locally. + + Args: + nb_man (NotebookExecutionManager): Wrapper for execution state of a notebook. + kernel_name (str): Name of kernel to execute the notebook against. + log_output (bool): Flag for whether or not to write notebook output to the + configured logger. + start_timeout (int): Duration to wait for kernel start-up. + execution_timeout (int): Duration to wait before failing execution (default: never). + """ + + def create_client(): # TODO: should be static + # Exclude parameters that named differently downstream + safe_kwargs = remove_args(['timeout', 'startup_timeout'], **kwargs) + + # Nicely handle preprocessor arguments prioritizing values set by engine + final_kwargs = merge_kwargs( + safe_kwargs, + timeout=execution_timeout if execution_timeout else kwargs.get('timeout'), + startup_timeout=start_timeout, + kernel_name=kernel_name, + log=logger, + log_output=log_output, + stdout_file=stdout_file, + stderr_file=stderr_file, + ) + cls.logger.info(f"Created papermill notebook client for {key}") + return PapermillNotebookClient(nb_man, **final_kwargs) + + # TODO: pass client_id + key = EngineKey("", nb_man.nb['metadata']['papermill']['input_path']) + engine_holder: EngineHolder = cls.get_or_create_engine_metadata(key, create_client) + return await engine_holder.async_execute(nb_man) + + @classmethod + def create_logger(cls): + cls.logger = logging.getLogger('engine') + + @classmethod + def set_out_of_use_engine_time(cls, value: int): + cls.out_of_use_engine_time = value + + @classmethod + def get_or_create_engine_metadata(cls, key: EngineKey, func): + cls.remove_out_of_date_engines(key) + + engine_holder: EngineHolder = cls.metadata_dict.get(key) + if engine_holder is None: + engine_holder = EngineHolder(key, func()) + cls.metadata_dict[key] = engine_holder + + return engine_holder + + @classmethod + def remove_out_of_date_engines(cls, exclude_key: EngineKey): + now = time.time() + dead_line = now - cls.out_of_use_engine_time + out_of_use_engines = [key for key, metadata in cls.metadata_dict.items() if + key != exclude_key and metadata.get_last_used_time() < dead_line] + for key in out_of_use_engines: + engine_holder: EngineHolder = cls.metadata_dict.pop(key) + engine_holder.close() + cls.logger.info( + f"unregistered '{key}' papermill engine, last used time {now - engine_holder.get_last_used_time()} sec ago") + + +class CustomEngines(PapermillEngines): + async def async_execute_notebook_with_engine(self, engine_name, nb, kernel_name, **kwargs): + """Fetch a named engine and execute the nb object against it.""" + return await self.get_engine(engine_name).async_execute_notebook(nb, kernel_name, **kwargs) + + +# Instantiate a ExactproPapermillEngines instance, register Handlers and entrypoints +exactpro_papermill_engines = CustomEngines() +exactpro_papermill_engines.register(None, CustomEngine) +exactpro_papermill_engines.register_entry_points() diff --git a/log_configuratior.py b/json_stream_provider/log_configuratior.py similarity index 63% rename from log_configuratior.py rename to json_stream_provider/log_configuratior.py index bdc0575..f01291c 100644 --- a/log_configuratior.py +++ b/json_stream_provider/log_configuratior.py @@ -1,8 +1,24 @@ +# Copyright 2024 Exactpro (Exactpro Systems Limited) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + import logging.config import os log4py_file = '/var/th2/config/log4py.conf' -def configureLogging(): + + +def configure_logging(): if os.path.exists(log4py_file): logging.config.fileConfig(log4py_file, disable_existing_loggers=False) logging.getLogger(__name__).info(f'Logger is configured by {log4py_file} file') @@ -30,5 +46,3 @@ def configureLogging(): } logging.config.dictConfig(default_logging_config) logging.getLogger(__name__).info('Logger is configured by default') - - diff --git a/json_stream_provider/papermill_execute_ext.py b/json_stream_provider/papermill_execute_ext.py new file mode 100644 index 0000000..6cdabb6 --- /dev/null +++ b/json_stream_provider/papermill_execute_ext.py @@ -0,0 +1,196 @@ +# Copyright 2024 Exactpro (Exactpro Systems Limited) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from pathlib import Path + + +from papermill.log import logger +from papermill.execute import prepare_notebook_metadata, remove_error_markers, raise_for_execution_errors +from papermill.inspection import _infer_parameters +from papermill.iorw import get_pretty_path, load_notebook_node, local_file_io_cwd, write_ipynb +from papermill.parameterize import add_builtin_parameters, parameterize_notebook, parameterize_path +from papermill.utils import chdir + +from json_stream_provider.custom_engines import exactpro_papermill_engines + + +# The code of this method is derived from https://github.com/nteract/papermill/blob/2.6.0 under the BSD License. +# Original license follows: +# +# BSD 3-Clause License +# +# Copyright (c) 2017, nteract +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# * Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +# +# Modified by Exactpro for https://github.com/th2-net/th2-json-stream-provider-py +async def async_execute_notebook( + input_path, + output_path, + parameters=None, + engine_name=None, + request_save_on_cell_execute=True, + prepare_only=False, + kernel_name=None, + language=None, + progress_bar=True, + log_output=False, + stdout_file=None, + stderr_file=None, + start_timeout=60, + report_mode=False, + cwd=None, + **engine_kwargs, +): + """Executes a single notebook locally. + + Parameters + ---------- + input_path : str or Path or nbformat.NotebookNode + Path to input notebook or NotebookNode object of notebook + output_path : str or Path or None + Path to save executed notebook. If None, no file will be saved + parameters : dict, optional + Arbitrary keyword arguments to pass to the notebook parameters + engine_name : str, optional + Name of execution engine to use + request_save_on_cell_execute : bool, optional + Request save notebook after each cell execution + autosave_cell_every : int, optional + How often in seconds to save in the middle of long cell executions + prepare_only : bool, optional + Flag to determine if execution should occur or not + kernel_name : str, optional + Name of kernel to execute the notebook against + language : str, optional + Programming language of the notebook + progress_bar : bool, optional + Flag for whether or not to show the progress bar. + log_output : bool, optional + Flag for whether or not to write notebook output to the configured logger + start_timeout : int, optional + Duration in seconds to wait for kernel start-up + report_mode : bool, optional + Flag for whether or not to hide input. + cwd : str or Path, optional + Working directory to use when executing the notebook + **kwargs + Arbitrary keyword arguments to pass to the notebook engine + + Returns + ------- + nb : NotebookNode + Executed notebook object + """ + if isinstance(input_path, Path): + input_path = str(input_path) + if isinstance(output_path, Path): + output_path = str(output_path) + if isinstance(cwd, Path): + cwd = str(cwd) + + path_parameters = add_builtin_parameters(parameters) + input_path = parameterize_path(input_path, path_parameters) + output_path = parameterize_path(output_path, path_parameters) + + logger.info(f"Input Notebook: {get_pretty_path(input_path)}") + logger.info(f"Output Notebook: {get_pretty_path(output_path)}") + with local_file_io_cwd(): + if cwd is not None: + logger.info(f"Working directory: {get_pretty_path(cwd)}") + + nb = load_notebook_node(input_path) + + # Parameterize the Notebook. + if parameters: + parameter_predefined = _infer_parameters(nb, name=kernel_name, language=language) + parameter_predefined = {p.name for p in parameter_predefined} + for p in parameters: + if p not in parameter_predefined: + logger.warning(f"Passed unknown parameter: {p}") + nb = parameterize_notebook( + nb, + parameters, + report_mode, + kernel_name=kernel_name, + language=language, + engine_name=engine_name, + ) + + nb = prepare_notebook_metadata(nb, input_path, output_path, report_mode) + # clear out any existing error markers from previous papermill runs + nb = remove_error_markers(nb) + + if not prepare_only: + # Dropdown to the engine to fetch the kernel name from the notebook document + kernel_name = exactpro_papermill_engines.nb_kernel_name(engine_name=engine_name, nb=nb, name=kernel_name) + # Execute the Notebook in `cwd` if it is set + with chdir(cwd): + nb = await exactpro_papermill_engines.async_execute_notebook_with_engine( + engine_name, + nb, + input_path=input_path, + output_path=output_path if request_save_on_cell_execute else None, + kernel_name=kernel_name, + progress_bar=progress_bar, + log_output=log_output, + start_timeout=start_timeout, + stdout_file=stdout_file, + stderr_file=stderr_file, + **engine_kwargs, + ) + + # Check for errors first (it saves on error before raising) + raise_for_execution_errors(nb, output_path) + + # Write final output in case the engine didn't write it on cell completion. + write_ipynb(nb, output_path) + + return nb diff --git a/local-run/with-jupyter-notebook/compose.yml b/local-run/with-jupyter-notebook/compose.yml index a4176fe..260096a 100644 --- a/local-run/with-jupyter-notebook/compose.yml +++ b/local-run/with-jupyter-notebook/compose.yml @@ -1,6 +1,8 @@ services: json_stream_provider: - image: ghcr.io/th2-net/th2-json-stream-provider-py:0.0.5-dev + build: + context: ../../ + dockerfile: Dockerfile ports: - "8081:8080" volumes: @@ -26,7 +28,7 @@ services: - th2_network th2_rpt_viewer: - image: ghcr.io/th2-net/th2-rpt-viewer:5.2.8 + image: ghcr.io/th2-net/th2-rpt-viewer:5.2.9-notebooks-launch-time-10918718527 ports: - "8083:8080" networks: diff --git a/local-run/with-jupyter-notebook/json-stream-provider/custom.json b/local-run/with-jupyter-notebook/json-stream-provider/custom.json index 1fca2cf..c32609b 100644 --- a/local-run/with-jupyter-notebook/json-stream-provider/custom.json +++ b/local-run/with-jupyter-notebook/json-stream-provider/custom.json @@ -1,5 +1,6 @@ { "notebooks": "/home/jovyan", "results": "/home/jovyan/j-sp/results", - "logs": "/home/jovyan/j-sp/logs" + "logs": "/home/jovyan/j-sp/logs", + "out-of-use-engine-time": 3600 } \ No newline at end of file diff --git a/package_info.json b/package_info.json index 4135c93..c34cbec 100644 --- a/package_info.json +++ b/package_info.json @@ -1,4 +1,4 @@ { "package_name": "th2-json-stream-provider", - "package_version": "0.0.5" + "package_version": "0.0.7" } \ No newline at end of file diff --git a/server.py b/server.py index 35e805a..e2a944c 100644 --- a/server.py +++ b/server.py @@ -1,31 +1,74 @@ -# Copyright 2024 Exactpro (Exactpro Systems Limited) +# Copyright 2024 Exactpro (Exactpro Systems Limited) # -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. import asyncio import json import logging.config import os from argparse import ArgumentParser +from asyncio import Task from datetime import datetime, timezone +from enum import Enum +from typing import Coroutine, Any +from json_stream_provider import papermill_execute_ext as epm import papermill as pm +from papermill.utils import chdir from aiohttp import web from aiohttp.web_request import Request from aiohttp_swagger import * -from aiojobs.aiohttp import setup, spawn +from aiojobs import Job +from aiojobs.aiohttp import setup -from log_configuratior import configureLogging +from json_stream_provider.custom_engines import CustomEngine, EngineBusyError +from json_stream_provider.log_configuratior import configure_logging os.system('pip list') @@ -33,11 +76,42 @@ notebooks_dir: str = '/home/jupyter-notebook/' results_dir: str = '/home/jupyter-notebook/results/' log_dir: str = '/home/jupyter-notebook/logs/' + tasks: dict = {} -configureLogging() +configure_logging() +CustomEngine.create_logger() logger: logging.Logger = logging.getLogger('j-sp') + +class TaskStatus(Enum): + CREATED = 'created' + SUCCESS = 'success' + FAILED = 'failed' + IN_PROGRESS = 'in progress' + + +class TaskMetadata: + task_id: str + task: Task[None] + status: TaskStatus + result: Any + customization: str = '' + job: Coroutine[Any, Any, Job[None]] = None + + def __init__(self, task_id: str, result: Any = '', customization: str = '', + job: Coroutine[Any, Any, Job[None]] = None): + self.task_id = task_id + self.status = TaskStatus.CREATED + self.result = result + self.customization = customization + self.job = job + + def close_job(self) -> None: + if self.job is not None: + self.job.close() + + def create_dir(path: str): if not os.path.exists(path): os.makedirs(path) @@ -50,23 +124,29 @@ def read_config(path: str): global logger try: file = open(path, "r") - result = json.load(file) - notebooks_dir = result.get('notebooks', notebooks_dir) + cfg = json.load(file) + + notebooks_dir = cfg.get('notebooks', notebooks_dir) logger.info('notebooks_dir=%s', notebooks_dir) if notebooks_dir: create_dir(notebooks_dir) - results_dir = result.get('results', results_dir) - logger.info('results_dir=%s',results_dir) + + results_dir = cfg.get('results', results_dir) + logger.info('results_dir=%s', results_dir) if results_dir: create_dir(results_dir) - log_dir = result.get('logs', log_dir) + + log_dir = cfg.get('logs', log_dir) logger.info('log_dir=%s', log_dir) if log_dir: create_dir(log_dir) + + CustomEngine.set_out_of_use_engine_time(cfg.get('out-of-use-engine-time', CustomEngine.out_of_use_engine_time)) except Exception as e: logger.error(f"Read '{path}' configuration failure", e) +# noinspection PyUnusedLocal async def req_status(req: Request): """ --- @@ -86,13 +166,15 @@ async def req_status(req: Request): def get_dirs(path): return [f.path for f in os.scandir(path) if f.is_dir() and f.name[0] != '.'] -def get_files(path, type): - return [f.path for f in os.scandir(path) if f.is_file() and f.name.endswith(type) and f.name[0] != '.'] + +def get_files(path, file_type): + return [f.path for f in os.scandir(path) if f.is_file() and f.name.endswith(file_type) and f.name[0] != '.'] def replace_slashes(path: str): return path.replace('\\', '/') + def replace_local_to_server(path: str): if path.startswith(notebooks_dir): return replace_slashes(path).replace(notebooks_dir, './notebooks/', 1) @@ -113,7 +195,8 @@ def replace_server_to_local(path: str): async def req_notebooks(req: Request): """ --- - description: This end-point allows to get notebooks that could be requested. Query requires path to directory in which notebooks is searched. + description: This end-point allows to get notebooks that could be requested. + Query requires path to directory in which notebooks is searched. tags: - File operation produces: @@ -122,7 +205,8 @@ async def req_notebooks(req: Request): "200": description: successful operation. Return dictionary of available directories/files. "404": - description: failed operation when queried directory doesn't exist or requested path didn't start with ./notebooks. + description: failed operation when queried directory doesn't exist + or requested path didn't start with ./notebooks. """ global logger path_arg = req.rel_url.query.get('path', '') @@ -140,12 +224,13 @@ async def req_notebooks(req: Request): 'directories': dirs, 'files': files }) - + try: path_converted = replace_server_to_local(path_arg) - except: + except Exception as error: + logger.warning("Requested path didn't start with ./notebooks", error) return web.HTTPNotFound(reason="Requested path didn't start with ./notebooks") - + if path_arg: if os.path.isdir(path_converted): dirs = list(map(replace_local_to_server, get_dirs(path_converted))) @@ -160,7 +245,7 @@ async def req_notebooks(req: Request): }) else: return web.HTTPNotFound() - + return web.json_response({ 'directories': [], 'files': [] @@ -170,7 +255,8 @@ async def req_notebooks(req: Request): async def req_jsons(req: Request): """ --- - description: This end-point allows to get jsonls that could be requested. Query requires path to directory in which jsonls is searched. + description: This end-point allows to get JSONLs that could be requested. + Query requires path to directory in which JSONLs is searched. tags: - File operation produces: @@ -179,7 +265,8 @@ async def req_jsons(req: Request): "200": description: successful operation. Return dictionary of available directories/files. "404": - description: failed operation when queried directory doesn't exist or requested path didn't start with ./results or ./notebooks. + description: failed operation when queried directory doesn't exist + or requested path didn't start with ./results or ./notebooks. """ global logger path_arg = req.rel_url.query.get('path', '') @@ -199,7 +286,7 @@ async def req_jsons(req: Request): dirs = list({*dirs_note, *dirs_res}) files = list({*files_note, *files_res}) - + dirs.sort() files.sort() @@ -207,16 +294,17 @@ async def req_jsons(req: Request): 'directories': dirs, 'files': files }) - + try: path_converted = replace_server_to_local(path_arg) - except: + except Exception as error: + logger.warning("Requested path didn't start with ./results or ./notebooks", error) return web.HTTPNotFound(reason="Requested path didn't start with ./results or ./notebooks") if path_arg: if os.path.isdir(path_converted): dirs = list(map(replace_local_to_server, get_dirs(path_converted))) files = list(map(replace_local_to_server, get_files(path_converted, '.jsonl'))) - + dirs.sort() files.sort() @@ -232,10 +320,12 @@ async def req_jsons(req: Request): 'files': [] }) + async def req_files(req: Request): """ --- - description: This end-point allows to get files and directories. Query requires path to directory in which files and directories is searched. + description: This end-point allows to get files and directories. + Query requires path to directory in which files and directories is searched. tags: - File operation produces: @@ -244,7 +334,8 @@ async def req_files(req: Request): "200": description: successful operation. Return dictionary of available directories/files. "404": - description: failed operation when queried directory doesn't exist or requested path didn't start with ./results or ./notebooks. + description: failed operation when queried directory doesn't exist + or requested path didn't start with ./results or ./notebooks. """ global logger path_arg = req.rel_url.query.get('path', '') @@ -272,16 +363,17 @@ async def req_files(req: Request): 'directories': dirs, 'files': files }) - + try: path_converted = replace_server_to_local(path_arg) - except: + except Exception as error: + logger.warning("Requested path didn't start with ./results or ./notebooks", error) return web.HTTPNotFound(reason="Requested path didn't start with ./results or ./notebooks") if path_arg: if os.path.isdir(path_converted): dirs = list(map(replace_local_to_server, get_dirs(path_converted))) files = list(map(replace_local_to_server, get_files(path_converted, ''))) - + dirs.sort() files.sort() @@ -301,7 +393,8 @@ async def req_files(req: Request): async def req_parameters(req: Request): """ --- - description: This end-point allows to get parameters for notebook in requested path. Query requires path to notebook. + description: This end-point allows to get parameters for notebook in requested path. + Query requires path to notebook. tags: - File operation produces: @@ -310,14 +403,16 @@ async def req_parameters(req: Request): "200": description: successful operation. Return json of file's parameters. "404": - description: failed operation when queried file doesn't exist or requested path didn't start with ./notebooks. + description: failed operation when queried file doesn't exist + or requested path didn't start with ./notebooks. """ global logger path = req.rel_url.query.get('path', '') logger.info('/files?path={path}'.format(path=str(path))) try: path_converted = replace_server_to_local(path) - except: + except Exception as error: + logger.warning("Requested path didn't start with ./notebooks", error) return web.HTTPNotFound(reason="Requested path didn't start with ./notebooks") if not path or not os.path.isfile(path_converted): return web.HTTPNotFound() @@ -325,50 +420,55 @@ async def req_parameters(req: Request): return web.json_response(params) -async def launch_notebook(input_path, arguments, file_name, task_id): - global tasks +async def launch_notebook(input_path, arguments: dict, file_name, task_metadata: TaskMetadata): global logger + global tasks logger.info(f'launching notebook {input_path} with {arguments}') + + if task_metadata is None: + return + + task_metadata.status = TaskStatus.IN_PROGRESS start_execution = datetime.now() log_out: str = (log_dir + '/%s.log.ipynb' % file_name) if log_dir and file_name else None try: - with pm.utils.chdir(input_path[:input_path.rfind('/')]): + with chdir(input_path[:input_path.rfind('/')]): input_path = input_path[input_path.rfind('/') + 1:] - pm.execute_notebook(input_path, log_out, arguments) + await epm.async_execute_notebook( + input_path=input_path, + output_path=log_out, + parameters=arguments, + ) logger.debug(f'successfully launched notebook {input_path}') - if tasks.get(task_id): - tasks[task_id] = { - 'status': 'success', - 'result': arguments.get('output_path') - } + task_metadata.status = TaskStatus.SUCCESS + task_metadata.result = arguments.get('output_path') + task_metadata.customization = arguments.get('customization_path') + except EngineBusyError as error: + logger.warning(error.args) + task_metadata.status = TaskStatus.FAILED + task_metadata.result = error except Exception as error: logger.error(f'failed to launch notebook {input_path}', error) - if tasks.get(task_id): - tasks[task_id] = { - 'status': 'failed', - 'result': error - } + task_metadata.status = TaskStatus.FAILED + task_metadata.result = error finally: spent_time = (datetime.now() - start_execution).total_seconds() logger.info(f'ended launch notebook {input_path} with {arguments} spent_time {spent_time} sec') -def convert_parameter(parameter, notebook_path): +def convert_parameter(parameter): parameter_type = parameter.get('type') parameter_value = parameter.get('value') - if (parameter_type == 'file path'): + if parameter_type == 'file path': try: parameter_path = replace_server_to_local(parameter_value) - except: - raise Exception( - "Parameter {name} of type={type} with value={value} didn't start with ./notebooks or ./results" - .format(name=parameter.get('name'), type=parameter_type, value=parameter_value) - ) - - relative_path = os.path.relpath(parameter_path, notebook_path[:notebook_path.rfind('/')]) + except Exception as error: + msg = (f"Parameter {parameter.get('name')} of type={parameter_type} with value={parameter_value} " + "didn't start with ./notebooks or ./results") + logger.error(msg, error) + raise Exception(msg, error) - return relative_path - + return parameter_path else: return parameter_value @@ -376,7 +476,8 @@ def convert_parameter(parameter, notebook_path): async def req_launch(req: Request): """ --- - description: This end-point allows to start notebook. Query requires path to notebook. Body requred to be dictionary of parameters. + description: This end-point allows to start notebook. Query requires path to notebook. + Body required to be dictionary of parameters. tags: - Execution operation produces: @@ -400,7 +501,8 @@ async def req_launch(req: Request): return web.HTTPBadRequest(reason='Body with parameters not present') try: path_converted = replace_server_to_local(path_arg) - except: + except Exception as error: + logger.warning("Requested path didn't start with ./notebooks", error) return web.HTTPNotFound(reason="Requested path didn't start with ./notebooks") if not path_arg or not os.path.isfile(path_converted): return web.HTTPNotFound() @@ -410,21 +512,21 @@ async def req_launch(req: Request): timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H-%M-%S-%f") file_name = notebook_name + '_' + timestamp output_path = results_dir + '/%s.jsonl' % str(file_name) + customization_path = results_dir + '/%s.json' % str(file_name) req_json = await req.json() parameters = {} for key, parameter in req_json.items(): try: - parameters[key] = convert_parameter(parameter, path_converted) + parameters[key] = convert_parameter(parameter) except Exception as error: return web.HTTPInternalServerError(reason=str(error)) parameters['output_path'] = output_path + parameters['customization_path'] = customization_path task_id = str(uuid4()) - job = spawn(req, launch_notebook(path_converted, parameters, file_name, task_id)) - tasks[task_id] = { - 'status': 'in progress', - 'job': job - } - asyncio.shield(job) + task_metadata = TaskMetadata(task_id=task_id) + tasks[task_id] = task_metadata + task: Task[None] = asyncio.create_task(launch_notebook(path_converted, parameters, file_name, task_metadata)) + task_metadata.task = task return web.json_response({'task_id': task_id}) @@ -440,16 +542,17 @@ async def req_file(req: Request): "200": description: successful operation. Return file's json. "404": - description: failed operation. requested file doesn't exist or requested path didn't start with ./results or ./notebooks. + description: failed operation. requested file doesn't exist + or requested path didn't start with ./results or ./notebooks. """ global tasks global logger path = req.rel_url.query.get('path', '') logger.info('/file?path={path}'.format(path=str(path))) - path_converted = replace_server_to_local(path) try: path_converted = replace_server_to_local(path) - except: + except Exception as error: + logger.warning("Requested path didn't start with ./results or ./notebooks", error) return web.HTTPNotFound(reason="Requested path didn't start with ./results or ./notebooks") if not path or not os.path.isfile(path_converted): return web.HTTPNotFound() @@ -458,10 +561,12 @@ async def req_file(req: Request): file.close() return web.json_response({'result': content}) -async def reqResult(req: Request): + +async def req_result(req: Request): """ --- - description: This end-point allows to get result from requested task. Query requires task id from which result is required. + description: This end-point allows to get result from requested task. + Query requires task id from which result is required. tags: - Execution operation produces: @@ -475,32 +580,41 @@ async def reqResult(req: Request): "400": description: failed operation. body with parameters not present. "404": - description: failed operation. requested task doesn't exist or resulting file doesn't exist or status is unknown. + description: failed operation. requested task doesn't exist + or resulting file doesn't exist or status is unknown. """ global tasks global logger task_id = req.rel_url.query.get('id') - logger.info('/result?id={task_id}'.format(task_id=str(task_id))) - task = tasks.get(task_id, None) + logger.debug('/result?id={task_id}'.format(task_id=str(task_id))) + task: TaskMetadata = tasks.get(task_id) if task is None: return web.HTTPNotFound(reason="Requested task doesn't exist") - status = task.get('status', None) - if status == 'in progress': - return web.json_response({'status': status}) - elif status == 'success': - path_param = task.get('result','') + status = task.status + if status == TaskStatus.IN_PROGRESS: + return web.json_response({'status': status.value}) + elif status == TaskStatus.SUCCESS: + path_param = task.result if not path_param or not os.path.isfile(path_param): return web.HTTPNotFound(reason="Resulting file doesn't exist") + customization_param = task.customization + customization = "[]" + if len(customization_param) > 0 and os.path.isfile(customization_param): + customization_file = open(customization_param, "r") + customization = customization_file.read() + customization_file.close() file = open(path_param, "r") content = file.read() file.close() - return web.json_response({'status': status, 'result': content, 'path': replace_local_to_server(path_param) }) - elif status == 'failed': - error = task.get('result', Exception()) - return web.json_response({'status': status, 'result': str(error)}) + return web.json_response({'status': status.value, 'result': content, 'customization': customization, + 'path': replace_local_to_server(path_param)}) + elif status == TaskStatus.FAILED: + error: Exception = task.result + return web.json_response({'status': status.value, 'result': str(error)}) else: return web.HTTPNotFound() + async def req_stop(req: Request): """ --- @@ -519,19 +633,21 @@ async def req_stop(req: Request): global logger task_id = req.rel_url.query.get('id') logger.info('/stop?id={task_id}'.format(task_id=str(task_id))) - task = tasks.pop(task_id, None) + task: TaskMetadata = tasks.pop(task_id) try: - if task: - await task.job.close() - except: + if task and task.status == TaskStatus.IN_PROGRESS: + task.task.cancel("stopped by user") + except Exception as error: + logger.warning("failed to stop process", error) return web.HTTPInternalServerError(reason='failed to stop process') return web.HTTPOk() + if __name__ == '__main__': parser = ArgumentParser() parser.add_argument('config') cfg_path = vars(parser.parse_args()).get('config') - if (cfg_path): + if cfg_path: read_config(cfg_path) app = web.Application() @@ -544,8 +660,8 @@ async def req_stop(req: Request): app.router.add_route('GET', "/files", req_parameters) app.router.add_route('GET', "/file", req_file) app.router.add_route('POST', "/execute", req_launch) - app.router.add_route('GET', "/result", reqResult) + app.router.add_route('GET', "/result", req_result) app.router.add_route('POST', "/stop", req_stop) setup_swagger(app) logger.info('starting server') - web.run_app(app) \ No newline at end of file + web.run_app(app)