Skip to content

Commit

Permalink
pw_console: Handle web logging in a separate thread
Browse files Browse the repository at this point in the history
Change-Id: I1d09bac0e1059e5287f61e9f39bf83243e2ff44b
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/223691
Lint: Lint 🤖 <[email protected]>
Reviewed-by: Asad Memon <[email protected]>
Commit-Queue: Anthony DiGirolamo <[email protected]>
Reviewed-by: Taylor Cramer <[email protected]>
  • Loading branch information
AnthonyDiGirolamo authored and CQ Bot Account committed Jul 17, 2024
1 parent bce5a2c commit 4efaa47
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 52 deletions.
13 changes: 10 additions & 3 deletions pw_console/py/pw_console/python_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,15 @@ def setup_python_logging(
logging.getLogger("pw_console.serial_debug_logger").setLevel(logging.DEBUG)


def log_record_to_json(record: logging.LogRecord) -> str:
def log_record_to_dict(record: logging.LogRecord) -> dict[str, Any]:
"""Convert a log record into a dict for use with json formatting."""
log_dict: dict[str, Any] = {}
log_dict["message"] = record.getMessage()
log_dict["levelno"] = record.levelno
log_dict["levelname"] = record.levelname
log_dict["args"] = record.args
log_dict["args"] = ''
if record.args:
log_dict["args"] = [str(arg) for arg in record.args]
log_dict["time"] = str(record.created)
log_dict["time_string"] = datetime.fromtimestamp(record.created).isoformat(
timespec="seconds"
Expand All @@ -145,7 +148,11 @@ def log_record_to_json(record: logging.LogRecord) -> str:

log_dict["fields"][key] = str(value)

return json.dumps(log_dict)
return log_dict


def log_record_to_json(record: logging.LogRecord) -> str:
return json.dumps(log_record_to_dict(record))


class JsonLogFormatter(logging.Formatter):
Expand Down
95 changes: 51 additions & 44 deletions pw_console/py/pw_console/web_kernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,17 @@
import io
import json
import logging
import operator
import sys
import types
from typing import Callable, TYPE_CHECKING, List, Dict, Any
from typing import Any

from aiohttp import web
from aiohttp.web_ws import WebSocketResponse
from prompt_toolkit.completion import CompleteEvent
from prompt_toolkit.document import Document
from ptpython.completer import PythonCompleter
from ptpython.repl import _has_coroutine_flag

from pw_console.python_logging import log_record_to_json

if TYPE_CHECKING:
from pw_console.log_line import LogLine
from pw_console.python_logging import log_record_to_dict

_LOG = logging.getLogger(__package__)

Expand All @@ -48,7 +44,7 @@ class MissingCallId(Exception):
"""Exception for request with missing call id."""


def format_completions(all_completions) -> List[Dict[str, str]]:
def format_completions(all_completions) -> list[dict[str, str]]:
# Hide private suggestions
all_completions = [
completion
Expand Down Expand Up @@ -89,32 +85,42 @@ def compile_code(code: str, mode: str) -> types.CodeType:


class WebSocketStreamingResponder(logging.Handler):
def __init__(self, connection):
logging.Handler.__init__(self=self)
"""Python logging handler that sends json serialized logs.
Args:
connection: the WebSocketResponse object to send json logs to.
loop: The asyncio loop to run the send_str in.
"""

def __init__(
self,
connection: WebSocketResponse,
loop: asyncio.AbstractEventLoop,
*args,
**kwargs,
) -> None:
self.connection = connection
self.request_ids = []
self.formatter: Callable[[LogLine], str] = operator.attrgetter(
'ansi_stripped_log'
self.loop = loop
self.request_ids: list[int] = []
super().__init__(*args, **kwargs)

def emit(self, record: logging.LogRecord) -> None:
# Process this log record in a separate event loop.
asyncio.run_coroutine_threadsafe(
self._process_log(record),
self.loop,
)
self.formatter = lambda record: json.loads(log_record_to_json(record))

def emit(self, record):
log_string = self.formatter(record)
self._process_log_string(log_string)

def _process_log_string(self, log_string):
loop = asyncio.get_event_loop()
asyncio.set_event_loop(loop)
async def _process_log(self, record: logging.LogRecord) -> None:
"""Send the log serialized to json via the current WebSocketResponse."""
for req_id in self.request_ids:
loop.create_task(
self.connection.send_str(
json.dumps(
{
'id': req_id,
'streaming': True,
'data': {'log_line': log_string},
}
)
await self.connection.send_str(
json.dumps(
{
'id': req_id,
'streaming': True,
'data': {'log_line': log_record_to_dict(record)},
}
)
)

Expand All @@ -124,13 +130,16 @@ class WebKernel:

def __init__(
self,
connection: web.WebSocketResponse,
kernel_params,
connection: WebSocketResponse,
kernel_params: dict[str, Any],
loop: asyncio.AbstractEventLoop,
) -> None:
"""Create a new kernel for this particular websocket connection."""
self.connection = connection
self.kernel_params = kernel_params
self.logger_handlers: Dict[str, WebSocketStreamingResponder] = {}
self.loop = loop

self.logger_handlers: dict[str, WebSocketStreamingResponder] = {}
self.connected = False
self.python_completer = PythonCompleter(
self.get_globals,
Expand Down Expand Up @@ -220,7 +229,7 @@ async def handle_request(self, request) -> str:
_LOG.error('Failed to parse request: %s', request)
return ''

async def handle_eval(self, code: str) -> Dict[str, str] | None:
async def handle_eval(self, code: str) -> dict[str, str] | None:
"""Evaluate user code and return output."""
# Patch stdout and stderr to capture repl print() statements.
temp_stdout = io.StringIO()
Expand All @@ -231,7 +240,7 @@ async def handle_eval(self, code: str) -> Dict[str, str] | None:
sys.stdout = temp_stdout
sys.stderr = temp_stderr

def return_result_with_stdout_stderr(result) -> Dict[str, str]:
def return_result_with_stdout_stderr(result) -> dict[str, str]:
# Always restore original stdout and stderr
sys.stdout = original_stdout
sys.stderr = original_stderr
Expand Down Expand Up @@ -297,7 +306,7 @@ async def _eval_async(self, code: str) -> Any:

def handle_autocompletion(
self, code: str, cursor_pos: int
) -> List[Dict[str, str]]:
) -> list[dict[str, str]]:
doc = Document(code, cursor_pos)
all_completions = list(
self.python_completer.get_completions(
Expand All @@ -316,24 +325,22 @@ def handle_disconnect(self) -> None:
logger.removeHandler(self.logger_handlers[logger_name])

def get_globals(self) -> dict[str, Any]:
return self.kernel_params['global_vars']
return self.kernel_params.get('global_vars', globals())

def get_locals(self) -> dict[str, Any]:
if self.kernel_params['local_vars']:
return self.kernel_params['local_vars']

return self.get_globals()
return self.kernel_params.get('local_vars', self.get_globals())

def handle_log_source_list(self) -> List[str]:
if self.kernel_params['loggers']:
def handle_log_source_list(self) -> list[str]:
if 'loggers' in self.kernel_params:
return list(self.kernel_params['loggers'].keys())
return []

def handle_log_source_subscribe(self, logger_name, request_id) -> bool:
if self.kernel_params['loggers'][logger_name]:
if not logger_name in self.logger_handlers:
self.logger_handlers[logger_name] = WebSocketStreamingResponder(
self.connection
self.connection,
self.loop,
)
for logger in self.kernel_params['loggers'][logger_name]:
logger.addHandler(self.logger_handlers[logger_name])
Expand Down
37 changes: 32 additions & 5 deletions pw_console/py/pw_console/web_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
import email.utils
import logging
import mimetypes
from pathlib import Path
import socket
from threading import Thread
import webbrowser
from pathlib import Path
from typing import Callable

from aiohttp import web, WSMsgType

from pw_console.web_kernel import WebKernel
Expand Down Expand Up @@ -61,6 +63,7 @@ def pw_console_http_server(
handler = WebHandlers(
html_files=html_files, kernel_params=kernel_params
)
handler.start_web_socket_streaming_responder_thread()
runner = aiohttp_server(handler.handle_request)
port = find_available_port(start_port)
loop = asyncio.new_event_loop()
Expand All @@ -73,19 +76,42 @@ def pw_console_http_server(
webbrowser.open(url)
loop.run_forever()
except KeyboardInterrupt:
_LOG.info('Shutting down...')
handler.stop_web_socket_streaming_responder_thread()
loop.stop()


class WebHandlers:
"""Request handler that serves files from pw_console.html package data."""

def __init__(self, html_files: dict[str, str], kernel_params):
def __init__(self, html_files: dict[str, str], kernel_params) -> None:
self.html_files = html_files
self.date_modified = email.utils.formatdate(
datetime.datetime.now().timestamp(), usegmt=True
)
self.kernel_params = kernel_params

self.web_socket_streaming_responder_loop = asyncio.new_event_loop()

def _web_socket_streaming_responder_thread_entry(self):
"""Entry point for the web socket logging handlers thread."""
asyncio.set_event_loop(self.web_socket_streaming_responder_loop)
self.web_socket_streaming_responder_loop.run_forever()

def start_web_socket_streaming_responder_thread(self):
"""Start thread for handling log messages to web socket responses."""
thread = Thread(
target=self._web_socket_streaming_responder_thread_entry,
args=(),
daemon=True,
)
thread.start()

def stop_web_socket_streaming_responder_thread(self):
self.web_socket_streaming_responder_loop.call_soon_threadsafe(
self.web_socket_streaming_responder_loop.stop
)

async def handle_request(
self, request: web.Request
) -> web.Response | web.WebSocketResponse:
Expand Down Expand Up @@ -123,16 +149,17 @@ async def handle_request(
async def handle_websocket(
self, request: web.Request
) -> web.WebSocketResponse:
"""Handle a websocket connection request by creating a new web kernel"""
"""Handle a websocket connection request by creating a new kernel."""
ws = web.WebSocketResponse()
await ws.prepare(request)
kernel = WebKernel(ws, self.kernel_params)
kernel = WebKernel(
ws, self.kernel_params, self.web_socket_streaming_responder_loop
)
try:
async for msg in ws:
if msg.type == WSMsgType.TEXT:
if msg.data == 'close':
return ws

response = await kernel.handle_request(msg.data)
await ws.send_str(response)
elif msg.type == WSMsgType.ERROR:
Expand Down

0 comments on commit 4efaa47

Please sign in to comment.