diff --git a/pygbag/__main__.py b/pygbag/__main__.py index 657315d..c38fc6f 100644 --- a/pygbag/__main__.py +++ b/pygbag/__main__.py @@ -106,7 +106,7 @@ def truc(*argv, **kw): def prompt(): # FIXME for js style console should be # CSI(f"{TTY.LINES+TTY.CONSOLE};1H{prompt}") - print('\r>>> ',end='') + print('\r>X> ',end='') fakehost.is_browser = False fakehost.async_input = async_input diff --git a/pygbag/app.py b/pygbag/app.py index 02deebf..6fe7487 100644 --- a/pygbag/app.py +++ b/pygbag/app.py @@ -65,6 +65,8 @@ DEFAULT_PORT = 8000 DEFAULT_TMPL = "default.tmpl" +DEFAULT_WIDTH = 1280 +DEFAULT_HEIGHT = 720 def set_args(program): global DEFAULT_SCRIPT @@ -193,11 +195,17 @@ async def main_run(app_folder, mainscript, cdn=DEFAULT_CDN): help="Specify alternate bind address [default: localhost]", ) - # parser.add_argument( - # "--directory", - # default=build_dir.as_posix(), - # help="Specify alternative directory [default:%s]" % build_dir, - # ) + parser.add_argument( + "--width", + default=DEFAULT_WIDTH, + help="framebuffer width [default:%d]" % DEFAULT_WIDTH, + ) + + parser.add_argument( + "--height", + default=DEFAULT_HEIGHT, + help="framebuffer width [default:%d]" % DEFAULT_HEIGHT, + ) parser.add_argument( "--PYBUILD", @@ -339,6 +347,8 @@ async def main_run(app_folder, mainscript, cdn=DEFAULT_CDN): "cdn": args.cdn, "proxy": f"http://{args.bind}:{args.port}/", "xtermjs": "1", + "width" : args.width, + "height" : args.height, "ume_block": args.ume_block, "can_close": args.can_close, "archive": app_name, diff --git a/pygbag/filtering.py b/pygbag/filtering.py index fc99d48..acb923c 100644 --- a/pygbag/filtering.py +++ b/pygbag/filtering.py @@ -8,6 +8,7 @@ IGNORE = """ +/.mypy_cache /.ssh /.local /.config diff --git a/pygbag/support/_xterm_parser/__init__.py b/pygbag/support/_xterm_parser/__init__.py new file mode 100644 index 0000000..bd6a610 --- /dev/null +++ b/pygbag/support/_xterm_parser/__init__.py @@ -0,0 +1 @@ +from ._xterm_parser import XTermParser diff --git a/pygbag/support/_xterm_parser/_ansi_sequences.py b/pygbag/support/_xterm_parser/_ansi_sequences.py new file mode 100644 index 0000000..ac8b94f --- /dev/null +++ b/pygbag/support/_xterm_parser/_ansi_sequences.py @@ -0,0 +1,316 @@ +from typing import Mapping, Tuple + +from .keys import Keys + +# Mapping of vt100 escape codes to Keys. +ANSI_SEQUENCES_KEYS: Mapping[str, Tuple[Keys, ...]] = { + # Control keys. + " ": (Keys.Space,), + "\r": (Keys.Enter,), + "\x00": (Keys.ControlAt,), # Control-At (Also for Ctrl-Space) + "\x01": (Keys.ControlA,), # Control-A (home) + "\x02": (Keys.ControlB,), # Control-B (emacs cursor left) + "\x03": (Keys.ControlC,), # Control-C (interrupt) + "\x04": (Keys.ControlD,), # Control-D (exit) + "\x05": (Keys.ControlE,), # Control-E (end) + "\x06": (Keys.ControlF,), # Control-F (cursor forward) + "\x07": (Keys.ControlG,), # Control-G + "\x08": (Keys.Backspace,), # Control-H (8) (Identical to '\b') + "\x09": (Keys.Tab,), # Control-I (9) (Identical to '\t') + "\x0a": (Keys.ControlJ,), # Control-J (10) (Identical to '\n') + "\x0b": (Keys.ControlK,), # Control-K (delete until end of line; vertical tab) + "\x0c": (Keys.ControlL,), # Control-L (clear; form feed) + # "\x0d": (Keys.ControlM,), # Control-M (13) (Identical to '\r') + "\x0e": (Keys.ControlN,), # Control-N (14) (history forward) + "\x0f": (Keys.ControlO,), # Control-O (15) + "\x10": (Keys.ControlP,), # Control-P (16) (history back) + "\x11": (Keys.ControlQ,), # Control-Q + "\x12": (Keys.ControlR,), # Control-R (18) (reverse search) + "\x13": (Keys.ControlS,), # Control-S (19) (forward search) + "\x14": (Keys.ControlT,), # Control-T + "\x15": (Keys.ControlU,), # Control-U + "\x16": (Keys.ControlV,), # Control-V + "\x17": (Keys.ControlW,), # Control-W + "\x18": (Keys.ControlX,), # Control-X + "\x19": (Keys.ControlY,), # Control-Y (25) + "\x1a": (Keys.ControlZ,), # Control-Z + "\x1b": (Keys.Escape,), # Also Control-[ + "\x1b\x1b": ( + Keys.Escape, + ), # Windows issues esc esc for a single press of escape key + "\x9b": (Keys.ShiftEscape,), + "\x1c": (Keys.ControlBackslash,), # Both Control-\ (also Ctrl-| ) + "\x1d": (Keys.ControlSquareClose,), # Control-] + "\x1e": (Keys.ControlCircumflex,), # Control-^ + "\x1f": (Keys.ControlUnderscore,), # Control-underscore (Also for Ctrl-hyphen.) + # ASCII Delete (0x7f) + # Vt220 (and Linux terminal) send this when pressing backspace. We map this + # to ControlH, because that will make it easier to create key bindings that + # work everywhere, with the trade-off that it's no longer possible to + # handle backspace and control-h individually for the few terminals that + # support it. (Most terminals send ControlH when backspace is pressed.) + # See: http://www.ibb.net/~anne/keyboard.html + "\x7f": (Keys.Backspace,), + "\x1b\x7f": (Keys.ControlW,), + # Various + "\x1b[1~": (Keys.Home,), # tmux + "\x1b[2~": (Keys.Insert,), + "\x1b[3~": (Keys.Delete,), + "\x1b[4~": (Keys.End,), # tmux + "\x1b[5~": (Keys.PageUp,), + "\x1b[6~": (Keys.PageDown,), + "\x1b[7~": (Keys.Home,), # xrvt + "\x1b[8~": (Keys.End,), # xrvt + "\x1b[Z": (Keys.BackTab,), # shift + tab + "\x1b\x09": (Keys.BackTab,), # Linux console + "\x1b[~": (Keys.BackTab,), # Windows console + # -- + # Function keys. + "\x1bOP": (Keys.F1,), + "\x1bOQ": (Keys.F2,), + "\x1bOR": (Keys.F3,), + "\x1bOS": (Keys.F4,), + "\x1b[[A": (Keys.F1,), # Linux console. + "\x1b[[B": (Keys.F2,), # Linux console. + "\x1b[[C": (Keys.F3,), # Linux console. + "\x1b[[D": (Keys.F4,), # Linux console. + "\x1b[[E": (Keys.F5,), # Linux console. + "\x1b[11~": (Keys.F1,), # rxvt-unicode + "\x1b[12~": (Keys.F2,), # rxvt-unicode + "\x1b[13~": (Keys.F3,), # rxvt-unicode + "\x1b[14~": (Keys.F4,), # rxvt-unicode + "\x1b[15~": (Keys.F5,), + "\x1b[17~": (Keys.F6,), + "\x1b[18~": (Keys.F7,), + "\x1b[19~": (Keys.F8,), + "\x1b[20~": (Keys.F9,), + "\x1b[21~": (Keys.F10,), + "\x1b[23~": (Keys.F11,), + "\x1b[24~": (Keys.F12,), + "\x1b[25~": (Keys.F13,), + "\x1b[26~": (Keys.F14,), + "\x1b[28~": (Keys.F15,), + "\x1b[29~": (Keys.F16,), + "\x1b[31~": (Keys.F17,), + "\x1b[32~": (Keys.F18,), + "\x1b[33~": (Keys.F19,), + "\x1b[34~": (Keys.F20,), + # Xterm + "\x1b[1;2P": (Keys.F13,), + "\x1b[1;2Q": (Keys.F14,), + "\x1b[1;2R": ( + Keys.F15, + ), # Conflicts with CPR response; enabled after https://github.com/Textualize/textual/issues/3440. + "\x1b[1;2S": (Keys.F16,), + "\x1b[15;2~": (Keys.F17,), + "\x1b[17;2~": (Keys.F18,), + "\x1b[18;2~": (Keys.F19,), + "\x1b[19;2~": (Keys.F20,), + "\x1b[20;2~": (Keys.F21,), + "\x1b[21;2~": (Keys.F22,), + "\x1b[23;2~": (Keys.F23,), + "\x1b[24;2~": (Keys.F24,), + # -- + # Control + function keys. + "\x1b[1;5P": (Keys.ControlF1,), + "\x1b[1;5Q": (Keys.ControlF2,), + "\x1b[1;5R": ( + Keys.ControlF3, + ), # Conflicts with CPR response; enabled after https://github.com/Textualize/textual/issues/3440. + "\x1b[1;5S": (Keys.ControlF4,), + "\x1b[15;5~": (Keys.ControlF5,), + "\x1b[17;5~": (Keys.ControlF6,), + "\x1b[18;5~": (Keys.ControlF7,), + "\x1b[19;5~": (Keys.ControlF8,), + "\x1b[20;5~": (Keys.ControlF9,), + "\x1b[21;5~": (Keys.ControlF10,), + "\x1b[23;5~": (Keys.ControlF11,), + "\x1b[24;5~": (Keys.ControlF12,), + "\x1b[1;6P": (Keys.ControlF13,), + "\x1b[1;6Q": (Keys.ControlF14,), + "\x1b[1;6R": ( + Keys.ControlF15, + ), # Conflicts with CPR response; enabled after https://github.com/Textualize/textual/issues/3440. + "\x1b[1;6S": (Keys.ControlF16,), + "\x1b[15;6~": (Keys.ControlF17,), + "\x1b[17;6~": (Keys.ControlF18,), + "\x1b[18;6~": (Keys.ControlF19,), + "\x1b[19;6~": (Keys.ControlF20,), + "\x1b[20;6~": (Keys.ControlF21,), + "\x1b[21;6~": (Keys.ControlF22,), + "\x1b[23;6~": (Keys.ControlF23,), + "\x1b[24;6~": (Keys.ControlF24,), + # -- + # Tmux (Win32 subsystem) sends the following scroll events. + "\x1b[62~": (Keys.ScrollUp,), + "\x1b[63~": (Keys.ScrollDown,), + # -- + # Sequences generated by numpad 5. Not sure what it means. (It doesn't + # appear in 'infocmp'. Just ignore. + "\x1b[E": (Keys.Ignore,), # Xterm. + "\x1b[G": (Keys.Ignore,), # Linux console. + # -- + # Meta/control/escape + pageup/pagedown/insert/delete. + "\x1b[3;2~": (Keys.ShiftDelete,), # xterm, gnome-terminal. + "\x1b[5;2~": (Keys.ShiftPageUp,), + "\x1b[6;2~": (Keys.ShiftPageDown,), + "\x1b[2;3~": (Keys.Escape, Keys.Insert), + "\x1b[3;3~": (Keys.Escape, Keys.Delete), + "\x1b[5;3~": (Keys.Escape, Keys.PageUp), + "\x1b[6;3~": (Keys.Escape, Keys.PageDown), + "\x1b[2;4~": (Keys.Escape, Keys.ShiftInsert), + "\x1b[3;4~": (Keys.Escape, Keys.ShiftDelete), + "\x1b[5;4~": (Keys.Escape, Keys.ShiftPageUp), + "\x1b[6;4~": (Keys.Escape, Keys.ShiftPageDown), + "\x1b[3;5~": (Keys.ControlDelete,), # xterm, gnome-terminal. + "\x1b[5;5~": (Keys.ControlPageUp,), + "\x1b[6;5~": (Keys.ControlPageDown,), + "\x1b[3;6~": (Keys.ControlShiftDelete,), + "\x1b[5;6~": (Keys.ControlShiftPageUp,), + "\x1b[6;6~": (Keys.ControlShiftPageDown,), + "\x1b[2;7~": (Keys.Escape, Keys.ControlInsert), + "\x1b[5;7~": (Keys.Escape, Keys.ControlPageDown), + "\x1b[6;7~": (Keys.Escape, Keys.ControlPageDown), + "\x1b[2;8~": (Keys.Escape, Keys.ControlShiftInsert), + "\x1b[5;8~": (Keys.Escape, Keys.ControlShiftPageDown), + "\x1b[6;8~": (Keys.Escape, Keys.ControlShiftPageDown), + # -- + # Arrows. + # (Normal cursor mode). + "\x1b[A": (Keys.Up,), + "\x1b[B": (Keys.Down,), + "\x1b[C": (Keys.Right,), + "\x1b[D": (Keys.Left,), + "\x1b[H": (Keys.Home,), + "\x1b[F": (Keys.End,), + # Tmux sends following keystrokes when control+arrow is pressed, but for + # Emacs ansi-term sends the same sequences for normal arrow keys. Consider + # it a normal arrow press, because that's more important. + # (Application cursor mode). + "\x1bOA": (Keys.Up,), + "\x1bOB": (Keys.Down,), + "\x1bOC": (Keys.Right,), + "\x1bOD": (Keys.Left,), + "\x1bOF": (Keys.End,), + "\x1bOH": (Keys.Home,), + # Shift + arrows. + "\x1b[1;2A": (Keys.ShiftUp,), + "\x1b[1;2B": (Keys.ShiftDown,), + "\x1b[1;2C": (Keys.ShiftRight,), + "\x1b[1;2D": (Keys.ShiftLeft,), + "\x1b[1;2F": (Keys.ShiftEnd,), + "\x1b[1;2H": (Keys.ShiftHome,), + # Meta + arrow keys. Several terminals handle this differently. + # The following sequences are for xterm and gnome-terminal. + # (Iterm sends ESC followed by the normal arrow_up/down/left/right + # sequences, and the OSX Terminal sends ESCb and ESCf for "alt + # arrow_left" and "alt arrow_right." We don't handle these + # explicitly, in here, because would could not distinguish between + # pressing ESC (to go to Vi navigation mode), followed by just the + # 'b' or 'f' key. These combinations are handled in + # the input processor.) + "\x1b[1;3A": (Keys.Escape, Keys.Up), + "\x1b[1;3B": (Keys.Escape, Keys.Down), + "\x1b[1;3C": (Keys.Escape, Keys.Right), + "\x1b[1;3D": (Keys.Escape, Keys.Left), + "\x1b[1;3F": (Keys.Escape, Keys.End), + "\x1b[1;3H": (Keys.Escape, Keys.Home), + # Alt+shift+number. + "\x1b[1;4A": (Keys.Escape, Keys.ShiftUp), + "\x1b[1;4B": (Keys.Escape, Keys.ShiftDown), + "\x1b[1;4C": (Keys.Escape, Keys.ShiftRight), + "\x1b[1;4D": (Keys.Escape, Keys.ShiftLeft), + "\x1b[1;4F": (Keys.Escape, Keys.ShiftEnd), + "\x1b[1;4H": (Keys.Escape, Keys.ShiftHome), + # Control + arrows. + "\x1b[1;5A": (Keys.ControlUp,), # Cursor Mode + "\x1b[1;5B": (Keys.ControlDown,), # Cursor Mode + "\x1b[1;5C": (Keys.ControlRight,), # Cursor Mode + "\x1b[1;5D": (Keys.ControlLeft,), # Cursor Mode + "\x1bf": (Keys.ControlRight,), # iTerm natural editing keys + "\x1bb": (Keys.ControlLeft,), # iTerm natural editing keys + "\x1b[1;5F": (Keys.ControlEnd,), + "\x1b[1;5H": (Keys.ControlHome,), + # Tmux sends following keystrokes when control+arrow is pressed, but for + # Emacs ansi-term sends the same sequences for normal arrow keys. Consider + # it a normal arrow press, because that's more important. + "\x1b[5A": (Keys.ControlUp,), + "\x1b[5B": (Keys.ControlDown,), + "\x1b[5C": (Keys.ControlRight,), + "\x1b[5D": (Keys.ControlLeft,), + "\x1bOc": (Keys.ControlRight,), # rxvt + "\x1bOd": (Keys.ControlLeft,), # rxvt + # Control + shift + arrows. + "\x1b[1;6A": (Keys.ControlShiftUp,), + "\x1b[1;6B": (Keys.ControlShiftDown,), + "\x1b[1;6C": (Keys.ControlShiftRight,), + "\x1b[1;6D": (Keys.ControlShiftLeft,), + "\x1b[1;6F": (Keys.ControlShiftEnd,), + "\x1b[1;6H": (Keys.ControlShiftHome,), + # Control + Meta + arrows. + "\x1b[1;7A": (Keys.Escape, Keys.ControlUp), + "\x1b[1;7B": (Keys.Escape, Keys.ControlDown), + "\x1b[1;7C": (Keys.Escape, Keys.ControlRight), + "\x1b[1;7D": (Keys.Escape, Keys.ControlLeft), + "\x1b[1;7F": (Keys.Escape, Keys.ControlEnd), + "\x1b[1;7H": (Keys.Escape, Keys.ControlHome), + # Meta + Shift + arrows. + "\x1b[1;8A": (Keys.Escape, Keys.ControlShiftUp), + "\x1b[1;8B": (Keys.Escape, Keys.ControlShiftDown), + "\x1b[1;8C": (Keys.Escape, Keys.ControlShiftRight), + "\x1b[1;8D": (Keys.Escape, Keys.ControlShiftLeft), + "\x1b[1;8F": (Keys.Escape, Keys.ControlShiftEnd), + "\x1b[1;8H": (Keys.Escape, Keys.ControlShiftHome), + # Meta + arrow on (some?) Macs when using iTerm defaults (see issue #483). + "\x1b[1;9A": (Keys.Escape, Keys.Up), + "\x1b[1;9B": (Keys.Escape, Keys.Down), + "\x1b[1;9C": (Keys.Escape, Keys.Right), + "\x1b[1;9D": (Keys.Escape, Keys.Left), + # -- + # Control/shift/meta + number in mintty. + # (c-2 will actually send c-@ and c-6 will send c-^.) + "\x1b[1;5p": (Keys.Control0,), + "\x1b[1;5q": (Keys.Control1,), + "\x1b[1;5r": (Keys.Control2,), + "\x1b[1;5s": (Keys.Control3,), + "\x1b[1;5t": (Keys.Control4,), + "\x1b[1;5u": (Keys.Control5,), + "\x1b[1;5v": (Keys.Control6,), + "\x1b[1;5w": (Keys.Control7,), + "\x1b[1;5x": (Keys.Control8,), + "\x1b[1;5y": (Keys.Control9,), + "\x1b[1;6p": (Keys.ControlShift0,), + "\x1b[1;6q": (Keys.ControlShift1,), + "\x1b[1;6r": (Keys.ControlShift2,), + "\x1b[1;6s": (Keys.ControlShift3,), + "\x1b[1;6t": (Keys.ControlShift4,), + "\x1b[1;6u": (Keys.ControlShift5,), + "\x1b[1;6v": (Keys.ControlShift6,), + "\x1b[1;6w": (Keys.ControlShift7,), + "\x1b[1;6x": (Keys.ControlShift8,), + "\x1b[1;6y": (Keys.ControlShift9,), + "\x1b[1;7p": (Keys.Escape, Keys.Control0), + "\x1b[1;7q": (Keys.Escape, Keys.Control1), + "\x1b[1;7r": (Keys.Escape, Keys.Control2), + "\x1b[1;7s": (Keys.Escape, Keys.Control3), + "\x1b[1;7t": (Keys.Escape, Keys.Control4), + "\x1b[1;7u": (Keys.Escape, Keys.Control5), + "\x1b[1;7v": (Keys.Escape, Keys.Control6), + "\x1b[1;7w": (Keys.Escape, Keys.Control7), + "\x1b[1;7x": (Keys.Escape, Keys.Control8), + "\x1b[1;7y": (Keys.Escape, Keys.Control9), + "\x1b[1;8p": (Keys.Escape, Keys.ControlShift0), + "\x1b[1;8q": (Keys.Escape, Keys.ControlShift1), + "\x1b[1;8r": (Keys.Escape, Keys.ControlShift2), + "\x1b[1;8s": (Keys.Escape, Keys.ControlShift3), + "\x1b[1;8t": (Keys.Escape, Keys.ControlShift4), + "\x1b[1;8u": (Keys.Escape, Keys.ControlShift5), + "\x1b[1;8v": (Keys.Escape, Keys.ControlShift6), + "\x1b[1;8w": (Keys.Escape, Keys.ControlShift7), + "\x1b[1;8x": (Keys.Escape, Keys.ControlShift8), + "\x1b[1;8y": (Keys.Escape, Keys.ControlShift9), +} + +# https://gist.github.com/christianparpart/d8a62cc1ab659194337d73e399004036 +SYNC_START = "\x1b[?2026h" +SYNC_END = "\x1b[?2026l" diff --git a/pygbag/support/_xterm_parser/_context.py b/pygbag/support/_xterm_parser/_context.py new file mode 100644 index 0000000..33d8369 --- /dev/null +++ b/pygbag/support/_xterm_parser/_context.py @@ -0,0 +1,25 @@ +from __future__ import annotations + +from contextvars import ContextVar +from typing import TYPE_CHECKING, Callable + +if TYPE_CHECKING: + from .app import App + from .message import Message + from .message_pump import MessagePump + from .screen import Screen + + +class NoActiveAppError(RuntimeError): + """Runtime error raised if we try to retrieve the active app when there is none.""" + + +active_app: ContextVar["App"] = ContextVar("active_app") +active_message_pump: ContextVar["MessagePump"] = ContextVar("active_message_pump") +prevent_message_types_stack: ContextVar[list[set[type[Message]]]] = ContextVar( + "prevent_message_types_stack" +) +visible_screen_stack: ContextVar[list[Screen]] = ContextVar("visible_screen_stack") +"""A stack of visible screens (with background alpha < 1), used in the screen render process.""" +message_hook: ContextVar[Callable[[Message], None]] = ContextVar("message_hook") +"""A callable that accepts a message. Used by App.run_test.""" diff --git a/pygbag/support/_xterm_parser/_parser.py b/pygbag/support/_xterm_parser/_parser.py new file mode 100644 index 0000000..812e063 --- /dev/null +++ b/pygbag/support/_xterm_parser/_parser.py @@ -0,0 +1,172 @@ +from __future__ import annotations + +import io +from collections import deque +from typing import Callable, Deque, Generator, Generic, Iterable, TypeVar, Union + + +class ParseError(Exception): + pass + + +class ParseEOF(ParseError): + """End of Stream.""" + + +class Awaitable: + __slots__: list[str] = [] + + +class _Read(Awaitable): + __slots__ = ["remaining"] + + def __init__(self, count: int) -> None: + self.remaining = count + + def __repr__(self) -> str: + return f"_ReadBytes({self.remaining})" + + +class _Read1(Awaitable): + __slots__: list[str] = [] + + +class _ReadUntil(Awaitable): + __slots__ = ["sep", "max_bytes"] + + def __init__(self, sep: str, max_bytes: int | None = None) -> None: + self.sep = sep + self.max_bytes = max_bytes + + +class _PeekBuffer(Awaitable): + __slots__: list[str] = [] + + +T = TypeVar("T") + + +TokenCallback = Callable[[T], None] + + +class Parser(Generic[T]): + read = _Read + read1 = _Read1 + read_until = _ReadUntil + peek_buffer = _PeekBuffer + + def __init__(self) -> None: + self._buffer = io.StringIO() + self._eof = False + self._tokens: Deque[T] = deque() + self._gen = self.parse(self._tokens.append) + self._awaiting: Union[Awaitable, T] = next(self._gen) + + @property + def is_eof(self) -> bool: + return self._eof + + def reset(self) -> None: + self._gen = self.parse(self._tokens.append) + self._awaiting = next(self._gen) + + def feed(self, data: str) -> Iterable[T]: + if self._eof: + raise ParseError("end of file reached") from None + if not data: + self._eof = True + try: + self._gen.send(self._buffer.getvalue()) + except StopIteration: + raise ParseError("end of file reached") from None + while self._tokens: + yield self._tokens.popleft() + + self._buffer.truncate(0) + return + + _buffer = self._buffer + pos = 0 + tokens = self._tokens + popleft = tokens.popleft + data_size = len(data) + + while tokens: + yield popleft() + + while pos < data_size or isinstance(self._awaiting, _PeekBuffer): + _awaiting = self._awaiting + if isinstance(_awaiting, _Read1): + self._awaiting = self._gen.send(data[pos : pos + 1]) + pos += 1 + + elif isinstance(_awaiting, _PeekBuffer): + self._awaiting = self._gen.send(data[pos:]) + + elif isinstance(_awaiting, _Read): + remaining = _awaiting.remaining + chunk = data[pos : pos + remaining] + chunk_size = len(chunk) + pos += chunk_size + _buffer.write(chunk) + remaining -= chunk_size + if remaining: + _awaiting.remaining = remaining + else: + _awaiting = self._gen.send(_buffer.getvalue()) + _buffer.seek(0) + _buffer.truncate() + + elif isinstance(_awaiting, _ReadUntil): + chunk = data[pos:] + _buffer.write(chunk) + sep = _awaiting.sep + sep_index = _buffer.getvalue().find(sep) + + if sep_index == -1: + pos += len(chunk) + if ( + _awaiting.max_bytes is not None + and _buffer.tell() > _awaiting.max_bytes + ): + self._gen.throw(ParseError(f"expected {sep}")) + else: + sep_index += len(sep) + if ( + _awaiting.max_bytes is not None + and sep_index > _awaiting.max_bytes + ): + self._gen.throw(ParseError(f"expected {sep}")) + data = _buffer.getvalue()[sep_index:] + pos = 0 + self._awaiting = self._gen.send(_buffer.getvalue()[:sep_index]) + _buffer.seek(0) + _buffer.truncate() + + while tokens: + yield popleft() + + def parse(self, on_token: Callable[[T], None]) -> Generator[Awaitable, str, None]: + yield from () + + +if __name__ == "__main__": + data = "Where there is a Will there is a way!" + + class TestParser(Parser[str]): + def parse( + self, on_token: Callable[[str], None] + ) -> Generator[Awaitable, str, None]: + while True: + data = yield self.read1() + if not data: + break + on_token(data) + + test_parser = TestParser() + + for n in range(0, len(data), 5): + for token in test_parser.feed(data[n : n + 5]): + print(token) + for token in test_parser.feed(""): + print(token) diff --git a/pygbag/support/_xterm_parser/_time.py b/pygbag/support/_xterm_parser/_time.py new file mode 100644 index 0000000..fea8a56 --- /dev/null +++ b/pygbag/support/_xterm_parser/_time.py @@ -0,0 +1,53 @@ +import asyncio +import platform +from asyncio import sleep as asyncio_sleep +from time import monotonic, perf_counter + +PLATFORM = platform.system() +WINDOWS = PLATFORM == "Windows" + + +if WINDOWS: + time = perf_counter +else: + time = monotonic + + +if WINDOWS: + # sleep on windows as a resolution of 15ms + # Python3.11 is somewhat better, but this home-grown version beats it + # Deduced from practical experiments + + from ._win_sleep import sleep as win_sleep + + async def sleep(secs: float) -> None: + """Sleep for a given number of seconds. + + Args: + secs: Number of seconds to sleep for. + """ + await asyncio.create_task(win_sleep(secs)) + +else: + + async def sleep(secs: float) -> None: + """Sleep for a given number of seconds. + + Args: + secs: Number of seconds to sleep for. + """ + # From practical experiments, asyncio.sleep sleeps for at least half a millisecond too much + # Presumably there is overhead asyncio itself which accounts for this + # We will reduce the sleep to compensate, and also don't sleep at all for less than half a millisecond + sleep_for = secs - 0.0005 + if sleep_for > 0: + await asyncio_sleep(sleep_for) + + +get_time = time +"""Get the current wall clock (monotonic) time. + +Returns: + The value (in fractional seconds) of a monotonic clock, + i.e. a clock that cannot go backwards. +""" diff --git a/pygbag/support/_xterm_parser/_types.py b/pygbag/support/_xterm_parser/_types.py new file mode 100644 index 0000000..b1ad797 --- /dev/null +++ b/pygbag/support/_xterm_parser/_types.py @@ -0,0 +1,46 @@ +from typing import TYPE_CHECKING, Any, Awaitable, Callable, List, Union + +from typing_extensions import Protocol + +if TYPE_CHECKING: + from rich.segment import Segment + + from .message import Message + + +class MessageTarget(Protocol): + """Protocol that must be followed by objects that can receive messages.""" + + async def _post_message(self, message: "Message") -> bool: + ... + + def post_message(self, message: "Message") -> bool: + ... + + +class EventTarget(Protocol): + async def _post_message(self, message: "Message") -> bool: + ... + + def post_message(self, message: "Message") -> bool: + ... + + +class UnusedParameter: + """Helper type for a parameter that isn't specified in a method call.""" + + +SegmentLines = List[List["Segment"]] +CallbackType = Union[Callable[[], Awaitable[None]], Callable[[], None]] +"""Type used for arbitrary callables used in callbacks.""" +IgnoreReturnCallbackType = Union[Callable[[], Awaitable[Any]], Callable[[], Any]] +"""A callback which ignores the return type.""" +WatchCallbackType = Union[ + Callable[[], Awaitable[None]], + Callable[[Any], Awaitable[None]], + Callable[[Any, Any], Awaitable[None]], + Callable[[], None], + Callable[[Any], None], + Callable[[Any, Any], None], +] +"""Type used for callbacks passed to the `watch` method of widgets.""" diff --git a/pygbag/support/_xterm_parser/_xterm_parser.py b/pygbag/support/_xterm_parser/_xterm_parser.py new file mode 100644 index 0000000..d3653dc --- /dev/null +++ b/pygbag/support/_xterm_parser/_xterm_parser.py @@ -0,0 +1,258 @@ +from __future__ import annotations + +import re +import unicodedata +from typing import Any, Callable, Generator, Iterable + +from . import events, messages +from ._ansi_sequences import ANSI_SEQUENCES_KEYS +from ._parser import Awaitable, Parser, TokenCallback +from .keys import KEY_NAME_REPLACEMENTS, _character_to_key + +# When trying to determine whether the current sequence is a supported/valid +# escape sequence, at which length should we give up and consider our search +# to be unsuccessful? +_MAX_SEQUENCE_SEARCH_THRESHOLD = 20 + +_re_mouse_event = re.compile("^" + re.escape("\x1b[") + r"(\d+);(?P\d)\$y" +) +_re_bracketed_paste_start = re.compile(r"^\x1b\[200~$") +_re_bracketed_paste_end = re.compile(r"^\x1b\[201~$") + + +class XTermParser(Parser[events.Event]): + _re_sgr_mouse = re.compile(r"\x1b\[<(\d+);(\d+);(\d+)([Mm])") + + def __init__(self, more_data: Callable[[], bool], debug: bool = False) -> None: + self.more_data = more_data + self.last_x = 0 + self.last_y = 0 + + self._debug_log_file = open("keys.log", "wt") if debug else None + + super().__init__() + + def debug_log(self, *args: Any) -> None: # pragma: no cover + if self._debug_log_file is not None: + self._debug_log_file.write(" ".join(args) + "\n") + self._debug_log_file.flush() + + def feed(self, data: str) -> Iterable[events.Event]: + self.debug_log(f"FEED {data!r}") + return super().feed(data) + + def parse_mouse_code(self, code: str) -> events.Event | None: + sgr_match = self._re_sgr_mouse.match(code) + if sgr_match: + _buttons, _x, _y, state = sgr_match.groups() + buttons = int(_buttons) + x = int(_x) - 1 + y = int(_y) - 1 + delta_x = x - self.last_x + delta_y = y - self.last_y + self.last_x = x + self.last_y = y + event_class: type[events.MouseEvent] + + if buttons & 64: + event_class = ( + events.MouseScrollDown if buttons & 1 else events.MouseScrollUp + ) + button = 0 + else: + if buttons & 32: + event_class = events.MouseMove + else: + event_class = events.MouseDown if state == "M" else events.MouseUp + + button = (buttons + 1) & 3 + + event = event_class( + x, + y, + delta_x, + delta_y, + button, + bool(buttons & 4), + bool(buttons & 8), + bool(buttons & 16), + screen_x=x, + screen_y=y, + ) + return event + return None + + _reissued_sequence_debug_book: Callable[[str], None] | None = None + """INTERNAL USE ONLY! + + If this property is set to a callable, it will be called *instead* of + the reissued sequence being emitted as key events. + """ + + def parse(self, on_token: TokenCallback) -> Generator[Awaitable, str, None]: + ESC = "\x1b" + read1 = self.read1 + sequence_to_key_events = self._sequence_to_key_events + more_data = self.more_data + paste_buffer: list[str] = [] + bracketed_paste = False + use_prior_escape = False + + def reissue_sequence_as_keys(reissue_sequence: str) -> None: + if self._reissued_sequence_debug_book is not None: + self._reissued_sequence_debug_book(reissue_sequence) + return + for character in reissue_sequence: + key_events = sequence_to_key_events(character) + for event in key_events: + if event.key == "escape": + event = events.Key("circumflex_accent", "^") + on_token(event) + + while not self.is_eof: + if not bracketed_paste and paste_buffer: + # We're at the end of the bracketed paste. + # The paste buffer has content, but the bracketed paste has finished, + # so we flush the paste buffer. We have to remove the final character + # since if bracketed paste has come to an end, we'll have added the + # ESC from the closing bracket, since at that point we didn't know what + # the full escape code was. + pasted_text = "".join(paste_buffer[:-1]) + # Note the removal of NUL characters: https://github.com/Textualize/textual/issues/1661 + on_token(events.Paste(pasted_text.replace("\x00", ""))) + paste_buffer.clear() + + character = ESC if use_prior_escape else (yield read1()) + use_prior_escape = False + + if bracketed_paste: + paste_buffer.append(character) + + self.debug_log(f"character={character!r}") + if character == ESC: + # Could be the escape key was pressed OR the start of an escape sequence + sequence: str = character + if not bracketed_paste: + # TODO: There's nothing left in the buffer at the moment, + # but since we're on an escape, how can we be sure that the + # data that next gets fed to the parser isn't an escape sequence? + + # This problem arises when an ESC falls at the end of a chunk. + # We'll be at an escape, but peek_buffer will return an empty + # string because there's nothing in the buffer yet. + + # This code makes an assumption that an escape sequence will never be + # "chopped up", so buffers would never contain partial escape sequences. + peek_buffer = yield self.peek_buffer() + if not peek_buffer: + # An escape arrived without any following characters + on_token(events.Key("escape", "\x1b")) + continue + if peek_buffer and peek_buffer[0] == ESC: + # There is an escape in the buffer, so ESC ESC has arrived + yield read1() + on_token(events.Key("escape", "\x1b")) + # If there is no further data, it is not part of a sequence, + # So we don't need to go in to the loop + if len(peek_buffer) == 1 and not more_data(): + continue + + # Look ahead through the suspected escape sequence for a match + while True: + # If we run into another ESC at this point, then we've failed + # to find a match, and should issue everything we've seen within + # the suspected sequence as Key events instead. + sequence_character = yield read1() + new_sequence = sequence + sequence_character + + threshold_exceeded = len(sequence) > _MAX_SEQUENCE_SEARCH_THRESHOLD + found_escape = sequence_character and sequence_character == ESC + + if threshold_exceeded: + # We exceeded the sequence length threshold, so reissue all the + # characters in that sequence as key-presses. + reissue_sequence_as_keys(new_sequence) + break + + if found_escape: + # We've hit an escape, so we need to reissue all the keys + # up to but not including it, since this escape could be + # part of an upcoming control sequence. + use_prior_escape = True + reissue_sequence_as_keys(sequence) + break + + sequence = new_sequence + + self.debug_log(f"sequence={sequence!r}") + + bracketed_paste_start_match = _re_bracketed_paste_start.match( + sequence + ) + if bracketed_paste_start_match is not None: + bracketed_paste = True + break + + bracketed_paste_end_match = _re_bracketed_paste_end.match(sequence) + if bracketed_paste_end_match is not None: + bracketed_paste = False + break + + if not bracketed_paste: + # Was it a pressed key event that we received? + key_events = list(sequence_to_key_events(sequence)) + for key_event in key_events: + on_token(key_event) + if key_events: + break + # Or a mouse event? + mouse_match = _re_mouse_event.match(sequence) + if mouse_match is not None: + mouse_code = mouse_match.group(0) + event = self.parse_mouse_code(mouse_code) + if event: + on_token(event) + break + + # Or a mode report? + # (i.e. the terminal saying it supports a mode we requested) + mode_report_match = _re_terminal_mode_response.match(sequence) + if mode_report_match is not None: + if ( + mode_report_match["mode_id"] == "2026" + and int(mode_report_match["setting_parameter"]) > 0 + ): + on_token(messages.TerminalSupportsSynchronizedOutput()) + break + else: + if not bracketed_paste: + for event in sequence_to_key_events(character): + on_token(event) + + def _sequence_to_key_events( + self, sequence: str, _unicode_name=unicodedata.name + ) -> Iterable[events.Key]: + """Map a sequence of code points on to a sequence of keys. + + Args: + sequence: Sequence of code points. + + Returns: + Keys + """ + keys = ANSI_SEQUENCES_KEYS.get(sequence) + if keys is not None: + for key in keys: + yield events.Key(key.value, sequence if len(sequence) == 1 else None) + elif len(sequence) == 1: + try: + if not sequence.isalnum(): + name = _character_to_key(sequence) + else: + name = sequence + name = KEY_NAME_REPLACEMENTS.get(name, name) + yield events.Key(name, sequence) + except: + yield events.Key(sequence, sequence) diff --git a/pygbag/support/_xterm_parser/case.py b/pygbag/support/_xterm_parser/case.py new file mode 100644 index 0000000..e92dfa3 --- /dev/null +++ b/pygbag/support/_xterm_parser/case.py @@ -0,0 +1,23 @@ +import re +from typing import Match, Pattern + + +def camel_to_snake( + name: str, _re_snake: Pattern[str] = re.compile("[a-z][A-Z]") +) -> str: + """Convert name from CamelCase to snake_case. + + Args: + name: A symbol name, such as a class name. + + Returns: + Name in camel case. + """ + + def repl(match: Match[str]) -> str: + lower: str + upper: str + lower, upper = match.group() # type: ignore + return f"{lower}_{upper.lower()}" + + return _re_snake.sub(repl, name).lower() diff --git a/pygbag/support/_xterm_parser/events.py b/pygbag/support/_xterm_parser/events.py new file mode 100644 index 0000000..eeb3272 --- /dev/null +++ b/pygbag/support/_xterm_parser/events.py @@ -0,0 +1,580 @@ +""" + +Builtin events sent by Textual. + +Events may be marked as "Bubbles" and "Verbose". +See the [events guide](/guide/events/#bubbling) for an explanation of bubbling. +Verbose events are excluded from the textual console, unless you explicitly request them with the `-v` switch as follows: + +``` +textual console -v +``` +""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import TYPE_CHECKING, Type, TypeVar + +from ._types import CallbackType +from .geometry import Offset, Size +from .keys import _get_key_aliases +from .message import Message + +MouseEventT = TypeVar("MouseEventT", bound="MouseEvent") + +if TYPE_CHECKING: + from .timer import Timer as TimerClass + from .timer import TimerCallback + from .widget import Widget + + +class Event(Message): + """The base class for all events.""" + + +class Callback(Event, bubble=False, verbose=True): + def __init__(self, callback: CallbackType) -> None: + self.callback = callback + super().__init__() + + +class ShutdownRequest(Event): + pass + + +class Shutdown(Event): + pass + + +class Load(Event, bubble=False): + """ + Sent when the App is running but *before* the terminal is in application mode. + + Use this event to run any set up that doesn't require any visuals such as loading + configuration and binding keys. + + - [ ] Bubbles + - [ ] Verbose + """ + + +class Idle(Event, bubble=False): + """Sent when there are no more items in the message queue. + + This is a pseudo-event in that it is created by the Textual system and doesn't go + through the usual message queue. + + - [ ] Bubbles + - [ ] Verbose + """ + + +class Action(Event): + __slots__ = ["action"] + + def __init__(self, action: str) -> None: + super().__init__() + self.action = action + + +class Resize(Event, bubble=False): + """Sent when the app or widget has been resized. + + - [ ] Bubbles + - [ ] Verbose + + Args: + size: The new size of the Widget. + virtual_size: The virtual size (scrollable size) of the Widget. + container_size: The size of the Widget's container widget. + """ + + __slots__ = ["size", "virtual_size", "container_size"] + + def __init__( + self, + size: Size, + virtual_size: Size, + container_size: Size | None = None, + ) -> None: + self.size = size + self.virtual_size = virtual_size + self.container_size = size if container_size is None else container_size + super().__init__() + + def can_replace(self, message: "Message") -> bool: + return isinstance(message, Resize) + + +class Compose(Event, bubble=False, verbose=True): + """Sent to a widget to request it to compose and mount children. + + - [ ] Bubbles + - [X] Verbose + """ + + +class Mount(Event, bubble=False, verbose=False): + """Sent when a widget is *mounted* and may receive messages. + + - [ ] Bubbles + - [ ] Verbose + """ + + +class Unmount(Event, bubble=False, verbose=False): + """Sent when a widget is unmounted and may not longer receive messages. + + - [ ] Bubbles + - [ ] Verbose + """ + + +class Show(Event, bubble=False): + """Sent when a widget has become visible. + + - [ ] Bubbles + - [ ] Verbose + """ + + +class Hide(Event, bubble=False): + """Sent when a widget has been hidden. + + - [ ] Bubbles + - [ ] Verbose + + A widget may be hidden by setting its `visible` flag to `False`, if it is no longer in a layout, + or if it has been offset beyond the edges of the terminal. + """ + + +class Ready(Event, bubble=False): + """Sent to the app when the DOM is ready. + + - [ ] Bubbles + - [ ] Verbose + """ + + +class MouseCapture(Event, bubble=False): + """Sent when the mouse has been captured. + + - [ ] Bubbles + - [ ] Verbose + + When a mouse has been captured, all further mouse events will be sent to the capturing widget. + + Args: + mouse_position: The position of the mouse when captured. + """ + + def __init__(self, mouse_position: Offset) -> None: + super().__init__() + self.mouse_position = mouse_position + + + +class MouseRelease(Event, bubble=False): + """Mouse has been released. + + - [ ] Bubbles + - [ ] Verbose + + Args: + mouse_position: The position of the mouse when released. + """ + + def __init__(self, mouse_position: Offset) -> None: + super().__init__() + self.mouse_position = mouse_position + + +class InputEvent(Event): + """Base class for input events.""" + + +class Key(InputEvent): + """Sent when the user hits a key on the keyboard. + + - [X] Bubbles + - [ ] Verbose + + Args: + key: The key that was pressed. + character: A printable character or ``None`` if it is not printable. + + Attributes: + aliases: The aliases for the key, including the key itself. + """ + + __slots__ = ["key", "character", "aliases"] + + def __init__(self, key: str, character: str | None) -> None: + super().__init__() + self.key = key + self.character = ( + (key if len(key) == 1 else None) if character is None else character + ) + self.aliases: list[str] = _get_key_aliases(key) + + @property + def name(self) -> str: + """Name of a key suitable for use as a Python identifier.""" + return _key_to_identifier(self.key).lower() + + @property + def name_aliases(self) -> list[str]: + """The corresponding name for every alias in `aliases` list.""" + return [_key_to_identifier(key) for key in self.aliases] + + @property + def is_printable(self) -> bool: + """Check if the key is printable (produces a unicode character). + + Returns: + True if the key is printable. + """ + return False if self.character is None else self.character.isprintable() + + +def _key_to_identifier(key: str) -> str: + """Convert the key string to a name suitable for use as a Python identifier.""" + if len(key) == 1 and key.isupper(): + key = f"upper_{key.lower()}" + return key.replace("+", "_").lower() + + +class MouseEvent(InputEvent, bubble=True): + """Sent in response to a mouse event. + + - [X] Bubbles + - [ ] Verbose + + Args: + x: The relative x coordinate. + y: The relative y coordinate. + delta_x: Change in x since the last message. + delta_y: Change in y since the last message. + button: Indexed of the pressed button. + shift: True if the shift key is pressed. + meta: True if the meta key is pressed. + ctrl: True if the ctrl key is pressed. + screen_x: The absolute x coordinate. + screen_y: The absolute y coordinate. + """ + + __slots__ = [ + "x", + "y", + "delta_x", + "delta_y", + "button", + "shift", + "meta", + "ctrl", + "screen_x", + "screen_y", + ] + + def __init__( + self, + x: int, + y: int, + delta_x: int, + delta_y: int, + button: int, + shift: bool, + meta: bool, + ctrl: bool, + screen_x: int | None = None, + screen_y: int | None = None, + ) -> None: + super().__init__() + self.x = x + self.y = y + self.delta_x = delta_x + self.delta_y = delta_y + self.button = button + self.shift = shift + self.meta = meta + self.ctrl = ctrl + self.screen_x = x if screen_x is None else screen_x + self.screen_y = y if screen_y is None else screen_y + + @classmethod + def from_event(cls: Type[MouseEventT], event: MouseEvent) -> MouseEventT: + new_event = cls( + event.x, + event.y, + event.delta_x, + event.delta_y, + event.button, + event.shift, + event.meta, + event.ctrl, + event.screen_x, + event.screen_y, + ) + return new_event + + @property + def offset(self) -> Offset: + """The mouse coordinate as an offset. + + Returns: + Mouse coordinate. + """ + return Offset(self.x, self.y) + + @property + def screen_offset(self) -> Offset: + """Mouse coordinate relative to the screen. + + Returns: + Mouse coordinate. + """ + return Offset(self.screen_x, self.screen_y) + + @property + def delta(self) -> Offset: + """Mouse coordinate delta (change since last event). + + Returns: + Mouse coordinate. + """ + return Offset(self.delta_x, self.delta_y) + + + def get_content_offset(self, widget: Widget) -> Offset | None: + """Get offset within a widget's content area, or None if offset is not in content (i.e. padding or border). + + Args: + widget: Widget receiving the event. + + Returns: + An offset where the origin is at the top left of the content area. + """ + if self.screen_offset not in widget.content_region: + return None + return self.get_content_offset_capture(widget) + + def get_content_offset_capture(self, widget: Widget) -> Offset: + """Get offset from a widget's content area. + + This method works even if the offset is outside the widget content region. + + Args: + widget: Widget receiving the event. + + Returns: + An offset where the origin is at the top left of the content area. + """ + return self.offset - widget.gutter.top_left + + def _apply_offset(self, x: int, y: int) -> MouseEvent: + return self.__class__( + x=self.x + x, + y=self.y + y, + delta_x=self.delta_x, + delta_y=self.delta_y, + button=self.button, + shift=self.shift, + meta=self.meta, + ctrl=self.ctrl, + screen_x=self.screen_x, + screen_y=self.screen_y, + ) + + +class MouseMove(MouseEvent, bubble=True, verbose=True): + """Sent when the mouse cursor moves. + + - [X] Bubbles + - [X] Verbose + """ + + +class MouseDown(MouseEvent, bubble=True, verbose=True): + """Sent when a mouse button is pressed. + + - [X] Bubbles + - [X] Verbose + """ + + +class MouseUp(MouseEvent, bubble=True, verbose=True): + """Sent when a mouse button is released. + + - [X] Bubbles + - [X] Verbose + """ + + +class MouseScrollDown(MouseEvent, bubble=True): + """Sent when the mouse wheel is scrolled *down*. + + - [X] Bubbles + - [ ] Verbose + """ + + +class MouseScrollUp(MouseEvent, bubble=True): + """Sent when the mouse wheel is scrolled *up*. + + - [X] Bubbles + - [ ] Verbose + """ + + +class Click(MouseEvent, bubble=True): + """Sent when a widget is clicked. + + - [X] Bubbles + - [ ] Verbose + """ + + +class Timer(Event, bubble=False, verbose=True): + """Sent in response to a timer. + + - [ ] Bubbles + - [X] Verbose + """ + + __slots__ = ["time", "count", "callback"] + + def __init__( + self, + timer: "TimerClass", + time: float, + count: int = 0, + callback: TimerCallback | None = None, + ) -> None: + super().__init__() + self.timer = timer + self.time = time + self.count = count + self.callback = callback + + +class Enter(Event, bubble=False, verbose=True): + """Sent when the mouse is moved over a widget. + + - [ ] Bubbles + - [X] Verbose + """ + + +class Leave(Event, bubble=False, verbose=True): + """Sent when the mouse is moved away from a widget. + + - [ ] Bubbles + - [X] Verbose + """ + + +class Focus(Event, bubble=False): + """Sent when a widget is focussed. + + - [ ] Bubbles + - [ ] Verbose + """ + + +class Blur(Event, bubble=False): + """Sent when a widget is blurred (un-focussed). + + - [ ] Bubbles + - [ ] Verbose + """ + + +@dataclass +class DescendantFocus(Event, bubble=True, verbose=True): + """Sent when a child widget is focussed. + + - [X] Bubbles + - [X] Verbose + """ + + widget: Widget + """The widget that was focused.""" + + @property + def control(self) -> Widget: + """The widget that was focused (alias of `widget`).""" + return self.widget + + +@dataclass +class DescendantBlur(Event, bubble=True, verbose=True): + """Sent when a child widget is blurred. + + - [X] Bubbles + - [X] Verbose + """ + + widget: Widget + """The widget that was blurred.""" + + @property + def control(self) -> Widget: + """The widget that was blurred (alias of `widget`).""" + return self.widget + + +class Paste(Event, bubble=True): + """Event containing text that was pasted into the Textual application. + This event will only appear when running in a terminal emulator that supports + bracketed paste mode. Textual will enable bracketed pastes when an app starts, + and disable it when the app shuts down. + + - [X] Bubbles + - [ ] Verbose + + + Args: + text: The text that has been pasted. + """ + + def __init__(self, text: str) -> None: + super().__init__() + self.text = text + + +class ScreenResume(Event, bubble=False): + """Sent to screen that has been made active. + + - [ ] Bubbles + - [ ] Verbose + """ + + +class ScreenSuspend(Event, bubble=False): + """Sent to screen when it is no longer active. + + - [ ] Bubbles + - [ ] Verbose + """ + + +class Print(Event, bubble=False): + """Sent to a widget that is capturing prints. + + - [ ] Bubbles + - [ ] Verbose + + Args: + text: Text that was printed. + stderr: True if the print was to stderr, or False for stdout. + + """ + + def __init__(self, text: str, stderr: bool = False) -> None: + super().__init__() + self.text = text + self.stderr = stderr + + diff --git a/pygbag/support/_xterm_parser/geometry.py b/pygbag/support/_xterm_parser/geometry.py new file mode 100644 index 0000000..1d10bc4 --- /dev/null +++ b/pygbag/support/_xterm_parser/geometry.py @@ -0,0 +1,1156 @@ +""" + +Functions and classes to manage terminal geometry (anything involving coordinates or dimensions). +""" + +from __future__ import annotations + +from functools import lru_cache +from operator import attrgetter, itemgetter +from typing import ( + TYPE_CHECKING, + Any, + Collection, + NamedTuple, + Tuple, + TypeVar, + Union, + cast, +) + +from typing_extensions import Final + +if TYPE_CHECKING: + from typing_extensions import TypeAlias + + +SpacingDimensions: TypeAlias = Union[ + int, Tuple[int], Tuple[int, int], Tuple[int, int, int, int] +] +"""The valid ways in which you can specify spacing.""" + +T = TypeVar("T", int, float) + + +def clamp(value: T, minimum: T, maximum: T) -> T: + """Adjust a value so it is not less than a minimum and not greater + than a maximum value. + + Args: + value: A value. + minimum: Minimum value. + maximum: Maximum value. + + Returns: + New value that is not less than the minimum or greater than the maximum. + """ + if minimum > maximum: + maximum, minimum = minimum, maximum + if value < minimum: + return minimum + elif value > maximum: + return maximum + else: + return value + + +class Offset(NamedTuple): + """A cell offset defined by x and y coordinates. + + Offsets are typically relative to the top left of the terminal or other container. + + Textual prefers the names `x` and `y`, but you could consider `x` to be the _column_ and `y` to be the _row_. + + Offsets support addition, subtraction, multiplication, and negation. + + Example: + ```python + >>> from textual.geometry import Offset + >>> offset = Offset(3, 2) + >>> offset + Offset(x=3, y=2) + >>> offset += Offset(10, 0) + >>> offset + Offset(x=13, y=2) + >>> -offset + Offset(x=-13, y=-2) + ``` + """ + + x: int = 0 + """Offset in the x-axis (horizontal)""" + y: int = 0 + """Offset in the y-axis (vertical)""" + + @property + def is_origin(self) -> bool: + """Is the offset at (0, 0)?""" + return self == (0, 0) + + @property + def clamped(self) -> Offset: + """This offset with `x` and `y` restricted to values above zero.""" + x, y = self + return Offset(0 if x < 0 else x, 0 if y < 0 else y) + + def __bool__(self) -> bool: + return self != (0, 0) + + def __add__(self, other: object) -> Offset: + if isinstance(other, tuple): + _x, _y = self + x, y = other + return Offset(_x + x, _y + y) + return NotImplemented + + def __sub__(self, other: object) -> Offset: + if isinstance(other, tuple): + _x, _y = self + x, y = other + return Offset(_x - x, _y - y) + return NotImplemented + + def __mul__(self, other: object) -> Offset: + if isinstance(other, (float, int)): + x, y = self + return Offset(int(x * other), int(y * other)) + return NotImplemented + + def __neg__(self) -> Offset: + x, y = self + return Offset(-x, -y) + + def blend(self, destination: Offset, factor: float) -> Offset: + """Calculate a new offset on a line between this offset and a destination offset. + + Args: + destination: Point where factor would be 1.0. + factor: A value between 0 and 1.0. + + Returns: + A new point on a line between self and destination. + """ + x1, y1 = self + x2, y2 = destination + return Offset( + int(x1 + (x2 - x1) * factor), + int(y1 + (y2 - y1) * factor), + ) + + def get_distance_to(self, other: Offset) -> float: + """Get the distance to another offset. + + Args: + other: An offset. + + Returns: + Distance to other offset. + """ + x1, y1 = self + x2, y2 = other + distance: float = ((x2 - x1) * (x2 - x1) + (y2 - y1) * (y2 - y1)) ** 0.5 + return distance + + +class Size(NamedTuple): + """The dimensions (width and height) of a rectangular region. + + Example: + ```python + >>> from textual.geometry import Size + >>> size = Size(2, 3) + >>> size + Size(width=2, height=3) + >>> size.area + 6 + >>> size + Size(10, 20) + Size(width=12, height=23) + ``` + """ + + width: int = 0 + """The width in cells.""" + + height: int = 0 + """The height in cells.""" + + def __bool__(self) -> bool: + """A Size is Falsy if it has area 0.""" + return self.width * self.height != 0 + + @property + def area(self) -> int: + """The area occupied by a region of this size.""" + return self.width * self.height + + @property + def region(self) -> Region: + """A region of the same size, at the origin.""" + width, height = self + return Region(0, 0, width, height) + + @property + def line_range(self) -> range: + """A range object that covers values between 0 and `height`.""" + return range(self.height) + + def __add__(self, other: object) -> Size: + if isinstance(other, tuple): + width, height = self + width2, height2 = other + return Size(max(0, width + width2), max(0, height + height2)) + return NotImplemented + + def __sub__(self, other: object) -> Size: + if isinstance(other, tuple): + width, height = self + width2, height2 = other + return Size(max(0, width - width2), max(0, height - height2)) + return NotImplemented + + def contains(self, x: int, y: int) -> bool: + """Check if a point is in area defined by the size. + + Args: + x: X coordinate. + y: Y coordinate. + + Returns: + True if the point is within the region. + """ + width, height = self + return width > x >= 0 and height > y >= 0 + + def contains_point(self, point: tuple[int, int]) -> bool: + """Check if a point is in the area defined by the size. + + Args: + point: A tuple of x and y coordinates. + + Returns: + True if the point is within the region. + """ + x, y = point + width, height = self + return width > x >= 0 and height > y >= 0 + + def __contains__(self, other: Any) -> bool: + try: + x: int + y: int + x, y = other + except Exception: + raise TypeError( + "Dimensions.__contains__ requires an iterable of two integers" + ) + width, height = self + return width > x >= 0 and height > y >= 0 + + +class Region(NamedTuple): + """Defines a rectangular region. + + A Region consists of a coordinate (x and y) and dimensions (width and height). + + ``` + (x, y) + ┌────────────────────┐ ▲ + │ │ │ + │ │ │ + │ │ height + │ │ │ + │ │ │ + └────────────────────┘ ▼ + ◀─────── width ──────▶ + ``` + + Example: + ```python + >>> from textual.geometry import Region + >>> region = Region(4, 5, 20, 10) + >>> region + Region(x=4, y=5, width=20, height=10) + >>> region.area + 200 + >>> region.size + Size(width=20, height=10) + >>> region.offset + Offset(x=4, y=5) + >>> region.contains(1, 2) + False + >>> region.contains(10, 8) + True + ``` + """ + + x: int = 0 + """Offset in the x-axis (horizontal).""" + y: int = 0 + """Offset in the y-axis (vertical).""" + width: int = 0 + """The width of the region.""" + height: int = 0 + """The height of the region.""" + + @classmethod + def from_union(cls, regions: Collection[Region]) -> Region: + """Create a Region from the union of other regions. + + Args: + regions: One or more regions. + + Returns: + A Region that encloses all other regions. + """ + if not regions: + raise ValueError("At least one region expected") + min_x = min(regions, key=itemgetter(0)).x + max_x = max(regions, key=attrgetter("right")).right + min_y = min(regions, key=itemgetter(1)).y + max_y = max(regions, key=attrgetter("bottom")).bottom + return cls(min_x, min_y, max_x - min_x, max_y - min_y) + + @classmethod + def from_corners(cls, x1: int, y1: int, x2: int, y2: int) -> Region: + """Construct a Region form the top left and bottom right corners. + + Args: + x1: Top left x. + y1: Top left y. + x2: Bottom right x. + y2: Bottom right y. + + Returns: + A new region. + """ + return cls(x1, y1, x2 - x1, y2 - y1) + + @classmethod + def from_offset(cls, offset: tuple[int, int], size: tuple[int, int]) -> Region: + """Create a region from offset and size. + + Args: + offset: Offset (top left point). + size: Dimensions of region. + + Returns: + A region instance. + """ + x, y = offset + width, height = size + return cls(x, y, width, height) + + @classmethod + def get_scroll_to_visible( + cls, window_region: Region, region: Region, *, top: bool = False + ) -> Offset: + """Calculate the smallest offset required to translate a window so that it contains + another region. + + This method is used to calculate the required offset to scroll something in to view. + + Args: + window_region: The window region. + region: The region to move inside the window. + top: Get offset to top of window. + + Returns: + An offset required to add to region to move it inside window_region. + """ + + if region in window_region and not top: + # Region is already inside the window, so no need to move it. + return NULL_OFFSET + + window_left, window_top, window_right, window_bottom = window_region.corners + region = region.crop_size(window_region.size) + left, top_, right, bottom = region.corners + delta_x = delta_y = 0 + + if not ( + (window_right > left >= window_left) + and (window_right > right >= window_left) + ): + # The region does not fit + # The window needs to scroll on the X axis to bring region in to view + delta_x = min( + left - window_left, + left - (window_right - region.width), + key=abs, + ) + + if top: + delta_y = top_ - window_top + + elif not ( + (window_bottom > top_ >= window_top) + and (window_bottom > bottom >= window_top) + ): + # The window needs to scroll on the Y axis to bring region in to view + delta_y = min( + top_ - window_top, + top_ - (window_bottom - region.height), + key=abs, + ) + return Offset(delta_x, delta_y) + + def __bool__(self) -> bool: + """A Region is considered False when it has no area.""" + _, _, width, height = self + return width * height > 0 + + @property + def column_span(self) -> tuple[int, int]: + """A pair of integers for the start and end columns (x coordinates) in this region. + + The end value is *exclusive*. + """ + return (self.x, self.x + self.width) + + @property + def line_span(self) -> tuple[int, int]: + """A pair of integers for the start and end lines (y coordinates) in this region. + + The end value is *exclusive*. + """ + return (self.y, self.y + self.height) + + @property + def right(self) -> int: + """Maximum X value (non inclusive).""" + return self.x + self.width + + @property + def bottom(self) -> int: + """Maximum Y value (non inclusive).""" + return self.y + self.height + + @property + def area(self) -> int: + """The area under the region.""" + return self.width * self.height + + @property + def offset(self) -> Offset: + """The top left corner of the region. + + Returns: + An offset. + """ + return Offset(*self[:2]) + + @property + def center(self) -> tuple[float, float]: + """The center of the region. + + Note, that this does *not* return an `Offset`, because the center may not be an integer coordinate. + + Returns: + Tuple of floats. + """ + x, y, width, height = self + return (x + width / 2.0, y + height / 2.0) + + @property + def bottom_left(self) -> Offset: + """Bottom left offset of the region. + + Returns: + An offset. + """ + x, y, _width, height = self + return Offset(x, y + height) + + @property + def top_right(self) -> Offset: + """Top right offset of the region. + + Returns: + An offset. + """ + x, y, width, _height = self + return Offset(x + width, y) + + @property + def bottom_right(self) -> Offset: + """Bottom right offset of the region. + + Returns: + An offset. + """ + x, y, width, height = self + return Offset(x + width, y + height) + + @property + def size(self) -> Size: + """Get the size of the region.""" + return Size(*self[2:]) + + @property + def corners(self) -> tuple[int, int, int, int]: + """The top left and bottom right coordinates as a tuple of four integers.""" + x, y, width, height = self + return x, y, x + width, y + height + + @property + def column_range(self) -> range: + """A range object for X coordinates.""" + return range(self.x, self.x + self.width) + + @property + def line_range(self) -> range: + """A range object for Y coordinates.""" + return range(self.y, self.y + self.height) + + @property + def reset_offset(self) -> Region: + """An region of the same size at (0, 0). + + Returns: + A region at the origin. + """ + _, _, width, height = self + return Region(0, 0, width, height) + + def __add__(self, other: object) -> Region: + if isinstance(other, tuple): + ox, oy = other + x, y, width, height = self + return Region(x + ox, y + oy, width, height) + return NotImplemented + + def __sub__(self, other: object) -> Region: + if isinstance(other, tuple): + ox, oy = other + x, y, width, height = self + return Region(x - ox, y - oy, width, height) + return NotImplemented + + def at_offset(self, offset: tuple[int, int]) -> Region: + """Get a new Region with the same size at a given offset. + + Args: + offset: An offset. + + Returns: + New Region with adjusted offset. + """ + x, y = offset + _x, _y, width, height = self + return Region(x, y, width, height) + + def crop_size(self, size: tuple[int, int]) -> Region: + """Get a region with the same offset, with a size no larger than `size`. + + Args: + size: Maximum width and height (WIDTH, HEIGHT). + + Returns: + New region that could fit within `size`. + """ + x, y, width1, height1 = self + width2, height2 = size + return Region(x, y, min(width1, width2), min(height1, height2)) + + def expand(self, size: tuple[int, int]) -> Region: + """Increase the size of the region by adding a border. + + Args: + size: Additional width and height. + + Returns: + A new region. + """ + expand_width, expand_height = size + x, y, width, height = self + return Region( + x - expand_width, + y - expand_height, + width + expand_width * 2, + height + expand_height * 2, + ) + + def clip_size(self, size: tuple[int, int]) -> Region: + """Clip the size to fit within minimum values. + + Args: + size: Maximum width and height. + + Returns: + No region, not bigger than size. + """ + x, y, width, height = self + max_width, max_height = size + return Region(x, y, min(width, max_width), min(height, max_height)) + + @lru_cache(maxsize=1024) + def overlaps(self, other: Region) -> bool: + """Check if another region overlaps this region. + + Args: + other: A Region. + + Returns: + True if other region shares any cells with this region. + """ + x, y, x2, y2 = self.corners + ox, oy, ox2, oy2 = other.corners + + return ((x2 > ox >= x) or (x2 > ox2 > x) or (ox < x and ox2 >= x2)) and ( + (y2 > oy >= y) or (y2 > oy2 > y) or (oy < y and oy2 >= y2) + ) + + def contains(self, x: int, y: int) -> bool: + """Check if a point is in the region. + + Args: + x: X coordinate. + y: Y coordinate. + + Returns: + True if the point is within the region. + """ + self_x, self_y, width, height = self + return (self_x + width > x >= self_x) and (self_y + height > y >= self_y) + + def contains_point(self, point: tuple[int, int]) -> bool: + """Check if a point is in the region. + + Args: + point: A tuple of x and y coordinates. + + Returns: + True if the point is within the region. + """ + x1, y1, x2, y2 = self.corners + try: + ox, oy = point + except Exception: + raise TypeError(f"a tuple of two integers is required, not {point!r}") + return (x2 > ox >= x1) and (y2 > oy >= y1) + + @lru_cache(maxsize=1024) + def contains_region(self, other: Region) -> bool: + """Check if a region is entirely contained within this region. + + Args: + other: A region. + + Returns: + True if the other region fits perfectly within this region. + """ + x1, y1, x2, y2 = self.corners + ox, oy, ox2, oy2 = other.corners + return ( + (x2 >= ox >= x1) + and (y2 >= oy >= y1) + and (x2 >= ox2 >= x1) + and (y2 >= oy2 >= y1) + ) + + @lru_cache(maxsize=1024) + def translate(self, offset: tuple[int, int]) -> Region: + """Move the offset of the Region. + + Args: + offset: Offset to add to region. + + Returns: + A new region shifted by (x, y) + """ + + self_x, self_y, width, height = self + offset_x, offset_y = offset + return Region(self_x + offset_x, self_y + offset_y, width, height) + + @lru_cache(maxsize=4096) + def __contains__(self, other: Any) -> bool: + """Check if a point is in this region.""" + if isinstance(other, Region): + return self.contains_region(other) + else: + try: + return self.contains_point(other) + except TypeError: + return False + + def clip(self, width: int, height: int) -> Region: + """Clip this region to fit within width, height. + + Args: + width: Width of bounds. + height: Height of bounds. + + Returns: + Clipped region. + """ + x1, y1, x2, y2 = self.corners + + _clamp = clamp + new_region = Region.from_corners( + _clamp(x1, 0, width), + _clamp(y1, 0, height), + _clamp(x2, 0, width), + _clamp(y2, 0, height), + ) + return new_region + + @lru_cache(maxsize=4096) + def grow(self, margin: tuple[int, int, int, int]) -> Region: + """Grow a region by adding spacing. + + Args: + margin: Grow space by `(, , , )`. + + Returns: + New region. + """ + if not any(margin): + return self + top, right, bottom, left = margin + x, y, width, height = self + return Region( + x=x - left, + y=y - top, + width=max(0, width + left + right), + height=max(0, height + top + bottom), + ) + + @lru_cache(maxsize=4096) + def shrink(self, margin: tuple[int, int, int, int]) -> Region: + """Shrink a region by subtracting spacing. + + Args: + margin: Shrink space by `(, , , )`. + + Returns: + The new, smaller region. + """ + if not any(margin): + return self + top, right, bottom, left = margin + x, y, width, height = self + return Region( + x=x + left, + y=y + top, + width=max(0, width - (left + right)), + height=max(0, height - (top + bottom)), + ) + + @lru_cache(maxsize=4096) + def intersection(self, region: Region) -> Region: + """Get the overlapping portion of the two regions. + + Args: + region: A region that overlaps this region. + + Returns: + A new region that covers when the two regions overlap. + """ + # Unrolled because this method is used a lot + x1, y1, w1, h1 = self + cx1, cy1, w2, h2 = region + x2 = x1 + w1 + y2 = y1 + h1 + cx2 = cx1 + w2 + cy2 = cy1 + h2 + + rx1 = cx2 if x1 > cx2 else (cx1 if x1 < cx1 else x1) + ry1 = cy2 if y1 > cy2 else (cy1 if y1 < cy1 else y1) + rx2 = cx2 if x2 > cx2 else (cx1 if x2 < cx1 else x2) + ry2 = cy2 if y2 > cy2 else (cy1 if y2 < cy1 else y2) + + return Region(rx1, ry1, rx2 - rx1, ry2 - ry1) + + @lru_cache(maxsize=4096) + def union(self, region: Region) -> Region: + """Get the smallest region that contains both regions. + + Args: + region: Another region. + + Returns: + An optimally sized region to cover both regions. + """ + x1, y1, x2, y2 = self.corners + ox1, oy1, ox2, oy2 = region.corners + + union_region = self.from_corners( + min(x1, ox1), min(y1, oy1), max(x2, ox2), max(y2, oy2) + ) + return union_region + + @lru_cache(maxsize=1024) + def split(self, cut_x: int, cut_y: int) -> tuple[Region, Region, Region, Region]: + """Split a region in to 4 from given x and y offsets (cuts). + + ``` + cut_x ↓ + ┌────────┐ ┌───┐ + │ │ │ │ + │ 0 │ │ 1 │ + │ │ │ │ + cut_y → └────────┘ └───┘ + ┌────────┐ ┌───┐ + │ 2 │ │ 3 │ + └────────┘ └───┘ + ``` + + Args: + cut_x: Offset from self.x where the cut should be made. If negative, the cut + is taken from the right edge. + cut_y: Offset from self.y where the cut should be made. If negative, the cut + is taken from the lower edge. + + Returns: + Four new regions which add up to the original (self). + """ + + x, y, width, height = self + if cut_x < 0: + cut_x = width + cut_x + if cut_y < 0: + cut_y = height + cut_y + + _Region = Region + return ( + _Region(x, y, cut_x, cut_y), + _Region(x + cut_x, y, width - cut_x, cut_y), + _Region(x, y + cut_y, cut_x, height - cut_y), + _Region(x + cut_x, y + cut_y, width - cut_x, height - cut_y), + ) + + @lru_cache(maxsize=1024) + def split_vertical(self, cut: int) -> tuple[Region, Region]: + """Split a region in to two, from a given x offset. + + ``` + cut ↓ + ┌────────┐┌───┐ + │ 0 ││ 1 │ + │ ││ │ + └────────┘└───┘ + ``` + + Args: + cut: An offset from self.x where the cut should be made. If cut is negative, + it is taken from the right edge. + + Returns: + Two regions, which add up to the original (self). + """ + + x, y, width, height = self + if cut < 0: + cut = width + cut + + return ( + Region(x, y, cut, height), + Region(x + cut, y, width - cut, height), + ) + + @lru_cache(maxsize=1024) + def split_horizontal(self, cut: int) -> tuple[Region, Region]: + """Split a region in to two, from a given y offset. + + ``` + ┌─────────┐ + │ 0 │ + │ │ + cut → └─────────┘ + ┌─────────┐ + │ 1 │ + └─────────┘ + ``` + + Args: + cut: An offset from self.y where the cut should be made. May be negative, + for the offset to start from the lower edge. + + Returns: + Two regions, which add up to the original (self). + """ + x, y, width, height = self + if cut < 0: + cut = height + cut + + return ( + Region(x, y, width, cut), + Region(x, y + cut, width, height - cut), + ) + + def translate_inside( + self, container: Region, x_axis: bool = True, y_axis: bool = True + ) -> Region: + """Translate this region, so it fits within a container. + + This will ensure that there is as little overlap as possible. + The top left of the returned region is guaranteed to be within the container. + + ``` + ┌──────────────────┐ ┌──────────────────┐ + │ container │ │ container │ + │ │ │ ┌─────────────┤ + │ │ ──▶ │ │ return │ + │ ┌──────────┴──┐ │ │ │ + │ │ self │ │ │ │ + └───────┤ │ └────┴─────────────┘ + │ │ + └─────────────┘ + ``` + + + Args: + container: A container region. + x_axis: Allow translation of X axis. + y_axis: Allow translation of Y axis. + + Returns: + A new region with same dimensions that fits with inside container. + """ + x1, y1, width1, height1 = container + x2, y2, width2, height2 = self + return Region( + max(min(x2, x1 + width1 - width2), x1) if x_axis else x2, + max(min(y2, y1 + height1 - height2), y1) if y_axis else y2, + width2, + height2, + ) + + def inflect( + self, x_axis: int = +1, y_axis: int = +1, margin: Spacing | None = None + ) -> Region: + """Inflect a region around one or both axis. + + The `x_axis` and `y_axis` parameters define which direction to move the region. + A positive value will move the region right or down, a negative value will move + the region left or up. A value of `0` will leave that axis unmodified. + + ``` + ╔══════════╗ │ + ║ ║ + ║ Self ║ │ + ║ ║ + ╚══════════╝ │ + + ─ ─ ─ ─ ─ ─ ─ ─ ┼ ─ ─ ─ ─ ─ ─ ─ ─ + + │ ┌──────────┐ + │ │ + │ │ Result │ + │ │ + │ └──────────┘ + ``` + + Args: + x_axis: +1 to inflect in the positive direction, -1 to inflect in the negative direction. + y_axis: +1 to inflect in the positive direction, -1 to inflect in the negative direction. + margin: Additional margin. + + Returns: + A new region. + """ + inflect_margin = NULL_SPACING if margin is None else margin + x, y, width, height = self + if x_axis: + x += (width + inflect_margin.width) * x_axis + if y_axis: + y += (height + inflect_margin.height) * y_axis + return Region(x, y, width, height) + + +class Spacing(NamedTuple): + """The spacing around a renderable, such as padding and border. + + Spacing is defined by four integers for the space at the top, right, bottom, and left of a region. + + ``` + ┌ ─ ─ ─ ─ ─ ─ ─▲─ ─ ─ ─ ─ ─ ─ ─ ┐ + │ top + │ ┏━━━━━▼━━━━━━┓ │ + ◀──────▶┃ ┃◀───────▶ + │ left ┃ ┃ right │ + ┃ ┃ + │ ┗━━━━━▲━━━━━━┛ │ + │ bottom + └ ─ ─ ─ ─ ─ ─ ─▼─ ─ ─ ─ ─ ─ ─ ─ ┘ + ``` + + Example: + ```python + >>> from textual.geometry import Region, Spacing + >>> region = Region(2, 3, 20, 10) + >>> spacing = Spacing(1, 2, 3, 4) + >>> region.grow(spacing) + Region(x=-2, y=2, width=26, height=14) + >>> region.shrink(spacing) + Region(x=6, y=4, width=14, height=6) + >>> spacing.css + '1 2 3 4' + ``` + """ + + top: int = 0 + """Space from the top of a region.""" + right: int = 0 + """Space from the right of a region.""" + bottom: int = 0 + """Space from the bottom of a region.""" + left: int = 0 + """Space from the left of a region.""" + + def __bool__(self) -> bool: + return self != (0, 0, 0, 0) + + @property + def width(self) -> int: + """Total space in the x axis.""" + return self.left + self.right + + @property + def height(self) -> int: + """Total space in the y axis.""" + return self.top + self.bottom + + @property + def top_left(self) -> tuple[int, int]: + """A pair of integers for the left, and top space.""" + return (self.left, self.top) + + @property + def bottom_right(self) -> tuple[int, int]: + """A pair of integers for the right, and bottom space.""" + return (self.right, self.bottom) + + @property + def totals(self) -> tuple[int, int]: + """A pair of integers for the total horizontal and vertical space.""" + top, right, bottom, left = self + return (left + right, top + bottom) + + @property + def css(self) -> str: + """A string containing the spacing in CSS format. + + For example: "1" or "2 4" or "4 2 8 2". + """ + top, right, bottom, left = self + if top == right == bottom == left: + return f"{top}" + if (top, right) == (bottom, left): + return f"{top} {right}" + else: + return f"{top} {right} {bottom} {left}" + + @classmethod + def unpack(cls, pad: SpacingDimensions) -> Spacing: + """Unpack padding specified in CSS style. + + Args: + pad: An integer, or tuple of 1, 2, or 4 integers. + + Raises: + ValueError: If `pad` is an invalid value. + + Returns: + New Spacing object. + """ + if isinstance(pad, int): + return cls(pad, pad, pad, pad) + pad_len = len(pad) + if pad_len == 1: + _pad = pad[0] + return cls(_pad, _pad, _pad, _pad) + if pad_len == 2: + pad_top, pad_right = cast(Tuple[int, int], pad) + return cls(pad_top, pad_right, pad_top, pad_right) + if pad_len == 4: + top, right, bottom, left = cast(Tuple[int, int, int, int], pad) + return cls(top, right, bottom, left) + raise ValueError( + f"1, 2 or 4 integers required for spacing properties; {pad_len} given" + ) + + @classmethod + def vertical(cls, amount: int) -> Spacing: + """Construct a Spacing with a given amount of spacing on vertical edges, + and no horizontal spacing. + + Args: + amount: The magnitude of spacing to apply to vertical edges + + Returns: + `Spacing(amount, 0, amount, 0)` + """ + return Spacing(amount, 0, amount, 0) + + @classmethod + def horizontal(cls, amount: int) -> Spacing: + """Construct a Spacing with a given amount of spacing on horizontal edges, + and no vertical spacing. + + Args: + amount: The magnitude of spacing to apply to horizontal edges + + Returns: + `Spacing(0, amount, 0, amount)` + """ + return Spacing(0, amount, 0, amount) + + @classmethod + def all(cls, amount: int) -> Spacing: + """Construct a Spacing with a given amount of spacing on all edges. + + Args: + amount: The magnitude of spacing to apply to all edges + + Returns: + `Spacing(amount, amount, amount, amount)` + """ + return Spacing(amount, amount, amount, amount) + + def __add__(self, other: object) -> Spacing: + if isinstance(other, tuple): + top1, right1, bottom1, left1 = self + top2, right2, bottom2, left2 = other + return Spacing( + top1 + top2, right1 + right2, bottom1 + bottom2, left1 + left2 + ) + return NotImplemented + + def __sub__(self, other: object) -> Spacing: + if isinstance(other, tuple): + top1, right1, bottom1, left1 = self + top2, right2, bottom2, left2 = other + return Spacing( + top1 - top2, right1 - right2, bottom1 - bottom2, left1 - left2 + ) + return NotImplemented + + def grow_maximum(self, other: Spacing) -> Spacing: + """Grow spacing with a maximum. + + Args: + other: Spacing object. + + Returns: + New spacing where the values are maximum of the two values. + """ + top, right, bottom, left = self + other_top, other_right, other_bottom, other_left = other + return Spacing( + max(top, other_top), + max(right, other_right), + max(bottom, other_bottom), + max(left, other_left), + ) + + +NULL_OFFSET: Final = Offset(0, 0) +"""An [offset][textual.geometry.Offset] constant for (0, 0).""" + +NULL_REGION: Final = Region(0, 0, 0, 0) +"""A [Region][textual.geometry.Region] constant for a null region (at the origin, with both width and height set to zero).""" + +NULL_SPACING: Final = Spacing(0, 0, 0, 0) +"""A [Spacing][textual.geometry.Spacing] constant for no space.""" diff --git a/pygbag/support/_xterm_parser/keys.py b/pygbag/support/_xterm_parser/keys.py new file mode 100644 index 0000000..ef32404 --- /dev/null +++ b/pygbag/support/_xterm_parser/keys.py @@ -0,0 +1,314 @@ +from __future__ import annotations + +import unicodedata +from enum import Enum + + +# Adapted from prompt toolkit https://github.com/prompt-toolkit/python-prompt-toolkit/blob/master/prompt_toolkit/keys.py +class Keys(str, Enum): # type: ignore[no-redef] + """ + List of keys for use in key bindings. + + Note that this is an "StrEnum", all values can be compared against + strings. + """ + + @property + def value(self) -> str: + return super().value + + Escape = "escape" # Also Control-[ + ShiftEscape = "shift+escape" + Return = "return" + + ControlAt = "ctrl+@" # Also Control-Space. + + ControlA = "ctrl+a" + ControlB = "ctrl+b" + ControlC = "ctrl+c" + ControlD = "ctrl+d" + ControlE = "ctrl+e" + ControlF = "ctrl+f" + ControlG = "ctrl+g" + ControlH = "ctrl+h" + ControlI = "ctrl+i" # Tab + ControlJ = "ctrl+j" # Newline + ControlK = "ctrl+k" + ControlL = "ctrl+l" + ControlM = "ctrl+m" # Carriage return + ControlN = "ctrl+n" + ControlO = "ctrl+o" + ControlP = "ctrl+p" + ControlQ = "ctrl+q" + ControlR = "ctrl+r" + ControlS = "ctrl+s" + ControlT = "ctrl+t" + ControlU = "ctrl+u" + ControlV = "ctrl+v" + ControlW = "ctrl+w" + ControlX = "ctrl+x" + ControlY = "ctrl+y" + ControlZ = "ctrl+z" + + Control1 = "ctrl+1" + Control2 = "ctrl+2" + Control3 = "ctrl+3" + Control4 = "ctrl+4" + Control5 = "ctrl+5" + Control6 = "ctrl+6" + Control7 = "ctrl+7" + Control8 = "ctrl+8" + Control9 = "ctrl+9" + Control0 = "ctrl+0" + + ControlShift1 = "ctrl+shift+1" + ControlShift2 = "ctrl+shift+2" + ControlShift3 = "ctrl+shift+3" + ControlShift4 = "ctrl+shift+4" + ControlShift5 = "ctrl+shift+5" + ControlShift6 = "ctrl+shift+6" + ControlShift7 = "ctrl+shift+7" + ControlShift8 = "ctrl+shift+8" + ControlShift9 = "ctrl+shift+9" + ControlShift0 = "ctrl+shift+0" + + ControlBackslash = "ctrl+backslash" + ControlSquareClose = "ctrl+right_square_bracket" + ControlCircumflex = "ctrl+circumflex_accent" + ControlUnderscore = "ctrl+underscore" + + Left = "left" + Right = "right" + Up = "up" + Down = "down" + Home = "home" + End = "end" + Insert = "insert" + Delete = "delete" + PageUp = "pageup" + PageDown = "pagedown" + + ControlLeft = "ctrl+left" + ControlRight = "ctrl+right" + ControlUp = "ctrl+up" + ControlDown = "ctrl+down" + ControlHome = "ctrl+home" + ControlEnd = "ctrl+end" + ControlInsert = "ctrl+insert" + ControlDelete = "ctrl+delete" + ControlPageUp = "ctrl+pageup" + ControlPageDown = "ctrl+pagedown" + + ShiftLeft = "shift+left" + ShiftRight = "shift+right" + ShiftUp = "shift+up" + ShiftDown = "shift+down" + ShiftHome = "shift+home" + ShiftEnd = "shift+end" + ShiftInsert = "shift+insert" + ShiftDelete = "shift+delete" + ShiftPageUp = "shift+pageup" + ShiftPageDown = "shift+pagedown" + + ControlShiftLeft = "ctrl+shift+left" + ControlShiftRight = "ctrl+shift+right" + ControlShiftUp = "ctrl+shift+up" + ControlShiftDown = "ctrl+shift+down" + ControlShiftHome = "ctrl+shift+home" + ControlShiftEnd = "ctrl+shift+end" + ControlShiftInsert = "ctrl+shift+insert" + ControlShiftDelete = "ctrl+shift+delete" + ControlShiftPageUp = "ctrl+shift+pageup" + ControlShiftPageDown = "ctrl+shift+pagedown" + + BackTab = "shift+tab" # shift + tab + + F1 = "f1" + F2 = "f2" + F3 = "f3" + F4 = "f4" + F5 = "f5" + F6 = "f6" + F7 = "f7" + F8 = "f8" + F9 = "f9" + F10 = "f10" + F11 = "f11" + F12 = "f12" + F13 = "f13" + F14 = "f14" + F15 = "f15" + F16 = "f16" + F17 = "f17" + F18 = "f18" + F19 = "f19" + F20 = "f20" + F21 = "f21" + F22 = "f22" + F23 = "f23" + F24 = "f24" + + ControlF1 = "ctrl+f1" + ControlF2 = "ctrl+f2" + ControlF3 = "ctrl+f3" + ControlF4 = "ctrl+f4" + ControlF5 = "ctrl+f5" + ControlF6 = "ctrl+f6" + ControlF7 = "ctrl+f7" + ControlF8 = "ctrl+f8" + ControlF9 = "ctrl+f9" + ControlF10 = "ctrl+f10" + ControlF11 = "ctrl+f11" + ControlF12 = "ctrl+f12" + ControlF13 = "ctrl+f13" + ControlF14 = "ctrl+f14" + ControlF15 = "ctrl+f15" + ControlF16 = "ctrl+f16" + ControlF17 = "ctrl+f17" + ControlF18 = "ctrl+f18" + ControlF19 = "ctrl+f19" + ControlF20 = "ctrl+f20" + ControlF21 = "ctrl+f21" + ControlF22 = "ctrl+f22" + ControlF23 = "ctrl+f23" + ControlF24 = "ctrl+f24" + + # Matches any key. + Any = "" + + # Special. + ScrollUp = "" + ScrollDown = "" + + # For internal use: key which is ignored. + # (The key binding for this key should not do anything.) + Ignore = "" + + # Some 'Key' aliases (for backwardshift+compatibility). + ControlSpace = "ctrl-at" + Tab = "tab" + Space = "space" + Enter = "enter" + Backspace = "backspace" + + # ShiftControl was renamed to ControlShift in + # 888fcb6fa4efea0de8333177e1bbc792f3ff3c24 (20 Feb 2020). + ShiftControlLeft = ControlShiftLeft + ShiftControlRight = ControlShiftRight + ShiftControlHome = ControlShiftHome + ShiftControlEnd = ControlShiftEnd + + +# Unicode db contains some obscure names +# This mapping replaces them with more common terms +KEY_NAME_REPLACEMENTS = { + "solidus": "slash", + "reverse_solidus": "backslash", + "commercial_at": "at", + "hyphen_minus": "minus", + "plus_sign": "plus", + "low_line": "underscore", +} +REPLACED_KEYS = {value: key for key, value in KEY_NAME_REPLACEMENTS.items()} + +# Convert the friendly versions of character key Unicode names +# back to their original names. +# This is because we go from Unicode to friendly by replacing spaces and dashes +# with underscores, which cannot be undone by replacing underscores with spaces/dashes. +KEY_TO_UNICODE_NAME = { + "exclamation_mark": "EXCLAMATION MARK", + "quotation_mark": "QUOTATION MARK", + "number_sign": "NUMBER SIGN", + "dollar_sign": "DOLLAR SIGN", + "percent_sign": "PERCENT SIGN", + "left_parenthesis": "LEFT PARENTHESIS", + "right_parenthesis": "RIGHT PARENTHESIS", + "plus_sign": "PLUS SIGN", + "hyphen_minus": "HYPHEN-MINUS", + "full_stop": "FULL STOP", + "less_than_sign": "LESS-THAN SIGN", + "equals_sign": "EQUALS SIGN", + "greater_than_sign": "GREATER-THAN SIGN", + "question_mark": "QUESTION MARK", + "commercial_at": "COMMERCIAL AT", + "left_square_bracket": "LEFT SQUARE BRACKET", + "reverse_solidus": "REVERSE SOLIDUS", + "right_square_bracket": "RIGHT SQUARE BRACKET", + "circumflex_accent": "CIRCUMFLEX ACCENT", + "low_line": "LOW LINE", + "grave_accent": "GRAVE ACCENT", + "left_curly_bracket": "LEFT CURLY BRACKET", + "vertical_line": "VERTICAL LINE", + "right_curly_bracket": "RIGHT CURLY BRACKET", +} + +# Some keys have aliases. For example, if you press `ctrl+m` on your keyboard, +# it's treated the same way as if you press `enter`. Key handlers `key_ctrl_m` and +# `key_enter` are both valid in this case. +KEY_ALIASES = { + "tab": ["ctrl+i"], + "enter": ["ctrl+m"], + "escape": ["ctrl+left_square_brace"], + "ctrl+at": ["ctrl+space"], + "ctrl+j": ["newline"], +} + +KEY_DISPLAY_ALIASES = { + "up": "↑", + "down": "↓", + "left": "←", + "right": "→", + "backspace": "⌫", + "escape": "ESC", + "enter": "⏎", + "minus": "-", + "space": "SPACE", +} + + +def _get_unicode_name_from_key(key: str) -> str: + """Get the best guess for the Unicode name of the char corresponding to the key. + + This function can be seen as a pseudo-inverse of the function `_character_to_key`. + """ + return KEY_TO_UNICODE_NAME.get(key, key.upper()) + + +def _get_key_aliases(key: str) -> list[str]: + """Return all aliases for the given key, including the key itself""" + return [key] + KEY_ALIASES.get(key, []) + + +def _get_key_display(key: str) -> str: + """Given a key (i.e. the `key` string argument to Binding __init__), + return the value that should be displayed in the app when referring + to this key (e.g. in the Footer widget).""" + display_alias = KEY_DISPLAY_ALIASES.get(key) + if display_alias: + return display_alias + + original_key = REPLACED_KEYS.get(key, key) + tentative_unicode_name = _get_unicode_name_from_key(original_key) + try: + unicode_character = unicodedata.lookup(tentative_unicode_name) + except KeyError: + return tentative_unicode_name + + # Check if printable. `delete` for example maps to a control sequence + # which we don't want to write to the terminal. + if unicode_character.isprintable(): + return unicode_character + return tentative_unicode_name + + +def _character_to_key(character: str) -> str: + """Convert a single character to a key value. + + This transformation can be undone by the function `_get_unicode_name_from_key`. + """ + if not character.isalnum(): + key = unicodedata.name(character).lower().replace("-", "_").replace(" ", "_") + else: + key = character + key = KEY_NAME_REPLACEMENTS.get(key, key) + return key diff --git a/pygbag/support/_xterm_parser/message.py b/pygbag/support/_xterm_parser/message.py new file mode 100644 index 0000000..dd8d73c --- /dev/null +++ b/pygbag/support/_xterm_parser/message.py @@ -0,0 +1,128 @@ +""" + +The base class for all messages (including events). +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, ClassVar + +from . import _time +from ._context import active_message_pump +from .case import camel_to_snake + +if TYPE_CHECKING: + from .message_pump import MessagePump + from .widget import Widget + + +class Message: + """Base class for a message.""" + + __slots__ = [ + "_sender", + "time", + "_forwarded", + "_no_default_action", + "_stop_propagation", + "_prevent", + ] + + ALLOW_SELECTOR_MATCH: ClassVar[set[str]] = set() + """Additional attributes that can be used with the [`on` decorator][textual.on]. + + These attributes must be widgets. + """ + bubble: ClassVar[bool] = True # Message will bubble to parent + verbose: ClassVar[bool] = False # Message is verbose + no_dispatch: ClassVar[bool] = False # Message may not be handled by client code + namespace: ClassVar[str] = "" # Namespace to disambiguate messages + handler_name: ClassVar[str] + """Name of the default message handler.""" + + def __init__(self) -> None: + self.__post_init__() + + def __post_init__(self) -> None: + """Allow dataclasses to initialize the object.""" + self._sender: MessagePump | None = active_message_pump.get(None) + self.time: float = _time.get_time() + self._forwarded = False + self._no_default_action = False + self._stop_propagation = False + self._prevent: set[type[Message]] = set() + + def __init_subclass__( + cls, + bubble: bool | None = True, + verbose: bool = False, + no_dispatch: bool | None = False, + namespace: str | None = None, + ) -> None: + super().__init_subclass__() + if bubble is not None: + cls.bubble = bubble + cls.verbose = verbose + if no_dispatch is not None: + cls.no_dispatch = no_dispatch + if namespace is not None: + cls.namespace = namespace + name = camel_to_snake(cls.__name__) + cls.handler_name = f"on_{namespace}_{name}" if namespace else f"on_{name}" + + @property + def control(self) -> Widget | None: + """The widget associated with this message, or None by default.""" + return None + + @property + def is_forwarded(self) -> bool: + """Has the message been forwarded?""" + return self._forwarded + + def _set_forwarded(self) -> None: + """Mark this event as being forwarded.""" + self._forwarded = True + + def _set_sender(self, sender: MessagePump) -> None: + """Set the sender.""" + self._sender = sender + + def can_replace(self, message: "Message") -> bool: + """Check if another message may supersede this one. + + Args: + message: Another message. + + Returns: + True if this message may replace the given message + """ + return False + + def prevent_default(self, prevent: bool = True) -> Message: + """Suppress the default action(s). This will prevent handlers in any base classes + from being called. + + Args: + prevent: True if the default action should be suppressed, + or False if the default actions should be performed. + """ + self._no_default_action = prevent + return self + + def stop(self, stop: bool = True) -> Message: + """Stop propagation of the message to parent. + + Args: + stop: The stop flag. + """ + self._stop_propagation = stop + return self + + def _bubble_to(self, widget: MessagePump) -> None: + """Bubble to a widget (typically the parent). + + Args: + widget: Target of bubble. + """ + widget.post_message(self) diff --git a/pygbag/support/_xterm_parser/messages.py b/pygbag/support/_xterm_parser/messages.py new file mode 100644 index 0000000..cd3816b --- /dev/null +++ b/pygbag/support/_xterm_parser/messages.py @@ -0,0 +1,79 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + + +from ._types import CallbackType +from .geometry import Region +from .message import Message + +if TYPE_CHECKING: + from .widget import Widget + + +class CloseMessages(Message, verbose=True): + """Requests message pump to close.""" + + +class ExitApp(Message, verbose=True): + """Exit the app.""" + + +class Update(Message, verbose=True): + def __init__(self, widget: Widget): + super().__init__() + self.widget = widget + + + def __eq__(self, other: object) -> bool: + if isinstance(other, Update): + return self.widget == other.widget + return NotImplemented + + def can_replace(self, message: Message) -> bool: + # Update messages can replace update for the same widget + return isinstance(message, Update) and self.widget == message.widget + + +class Layout(Message, verbose=True): + """Sent by Textual when a layout is required.""" + + def can_replace(self, message: Message) -> bool: + return isinstance(message, Layout) + + +class UpdateScroll(Message, verbose=True): + """Sent by Textual when a scroll update is required.""" + + def can_replace(self, message: Message) -> bool: + return isinstance(message, UpdateScroll) + + +class InvokeLater(Message, verbose=True, bubble=False): + """Sent by Textual to invoke a callback.""" + + def __init__(self, callback: CallbackType) -> None: + self.callback = callback + super().__init__() + + +class ScrollToRegion(Message, bubble=False): + """Ask the parent to scroll a given region in to view.""" + + def __init__(self, region: Region) -> None: + self.region = region + super().__init__() + + +class Prompt(Message, no_dispatch=True): + """Used to 'wake up' an event loop.""" + + def can_replace(self, message: Message) -> bool: + return isinstance(message, Prompt) + + +class TerminalSupportsSynchronizedOutput(Message): + """ + Used to make the App aware that the terminal emulator supports synchronised output. + @link https://gist.github.com/christianparpart/d8a62cc1ab659194337d73e399004036 + """ diff --git a/pygbag/support/cross/__EMSCRIPTEN__.py b/pygbag/support/cross/__EMSCRIPTEN__.py index d6e1670..f17fabf 100644 --- a/pygbag/support/cross/__EMSCRIPTEN__.py +++ b/pygbag/support/cross/__EMSCRIPTEN__.py @@ -211,23 +211,23 @@ async def process(self): # ============================= PRELOADING ============================== -preld_counter = -1 +preloading = -1 prelist = {} ROOTDIR = f"/data/data/{sys.argv[0]}/assets" def explore(root): - global prelist, preld_counter + global prelist, preloading - if preld_counter < 0: - preld_counter = 0 + if preloading < 0: + preloading = 0 import shutil for current, dirnames, filenames in os.walk(root): for filename in filenames: if filename.endswith(".so"): - preld_counter += 1 + preloading += 1 src = f"{current}/{filename}" embed.preload(src) @@ -265,26 +265,26 @@ def run_main(PyConfig, loaderhome=None, loadermain="main.py"): if loaderhome: pdb(f"241: appdir mapped to {loaderhome} by loader") ROOTDIR = str(loaderhome) - - # simulator won't run javascript for now - if not hasattr(embed, "run_script"): - pdb("246: no js engine") - return False - - # do not do stuff if not called properly from our js loader. - if PyConfig.executable is None: - # running in sim - pdb("252: running in simulator") - return False - - sys.executable = PyConfig.executable or "python" +# +# # simulator won't run javascript for now +# if not hasattr(embed, "run_script"): +# pdb("246: no js engine") +# return False +# +# # do not do stuff if not called properly from our js loader. +# if PyConfig.executable is None: +# # running in sim +# pdb("252: running in simulator") +# return False +# +# #sys.executable = PyConfig.executable or "python" preloadedWasm = "so" preloadedImages = "png jpeg jpg gif" preloadedAudios = "wav ogg mp4" def preload_apk(p=None): - global preld_counter, prelist, ROOTDIR + global preloading, prelist, ROOTDIR global explore, preloadedWasm, preloadedImages, preloadedAudios ROOTDIR = p or ROOTDIR if os.path.isdir(ROOTDIR): @@ -295,19 +295,19 @@ def preload_apk(p=None): ROOTDIR = os.getcwd() LSRC = len(ROOTDIR) + 1 - preld_counter = -1 + preloading = -1 prelist = {} sys.path.insert(0, ROOTDIR) explore(ROOTDIR) - if preld_counter < 0: + if preloading < 0: pdb(f"{ROOTDIR=}") pdb(f"{os.getcwd()=}") - print(f"284: assets found :", preld_counter) - if not preld_counter: + print(f"284: assets found :", preloading) + if not preloading: embed.run() return True diff --git a/pygbag/support/cross/aio/toplevel.py b/pygbag/support/cross/aio/toplevel.py index 4c7e8e0..eb2e975 100644 --- a/pygbag/support/cross/aio/toplevel.py +++ b/pygbag/support/cross/aio/toplevel.py @@ -51,7 +51,7 @@ def parse_sync(shell, line, **env): try: sys.ps1 except AttributeError: - sys.ps1 = ">>> " + sys.ps1 = ">A> " try: sys.ps2 @@ -137,7 +137,7 @@ def prompt(self, prompt=None): #platform.prompt(prompt or sys.ps1) if repl:repl.prompt() - async def input_console(self, prompt=">>> "): + async def input_console(self, prompt=">I> "): if len(self.buffer): return self.buffer.pop(0) diff --git a/pygbag/support/cross/tomllib/__init__.py b/pygbag/support/cross/tomllib/__init__.py new file mode 100644 index 0000000..ef91cb9 --- /dev/null +++ b/pygbag/support/cross/tomllib/__init__.py @@ -0,0 +1,10 @@ +# SPDX-License-Identifier: MIT +# SPDX-FileCopyrightText: 2021 Taneli Hukkinen +# Licensed to PSF under a Contributor Agreement. + +__all__ = ("loads", "load", "TOMLDecodeError") + +from ._parser import TOMLDecodeError, load, loads + +# Pretend this exception was created here. +TOMLDecodeError.__module__ = __name__ diff --git a/pygbag/support/cross/tomllib/_parser.py b/pygbag/support/cross/tomllib/_parser.py new file mode 100644 index 0000000..45ca7a8 --- /dev/null +++ b/pygbag/support/cross/tomllib/_parser.py @@ -0,0 +1,691 @@ +# SPDX-License-Identifier: MIT +# SPDX-FileCopyrightText: 2021 Taneli Hukkinen +# Licensed to PSF under a Contributor Agreement. + +from __future__ import annotations + +from collections.abc import Iterable +import string +from types import MappingProxyType +from typing import Any, BinaryIO, NamedTuple + +from ._re import ( + RE_DATETIME, + RE_LOCALTIME, + RE_NUMBER, + match_to_datetime, + match_to_localtime, + match_to_number, +) +from ._types import Key, ParseFloat, Pos + +ASCII_CTRL = frozenset(chr(i) for i in range(32)) | frozenset(chr(127)) + +# Neither of these sets include quotation mark or backslash. They are +# currently handled as separate cases in the parser functions. +ILLEGAL_BASIC_STR_CHARS = ASCII_CTRL - frozenset("\t") +ILLEGAL_MULTILINE_BASIC_STR_CHARS = ASCII_CTRL - frozenset("\t\n") + +ILLEGAL_LITERAL_STR_CHARS = ILLEGAL_BASIC_STR_CHARS +ILLEGAL_MULTILINE_LITERAL_STR_CHARS = ILLEGAL_MULTILINE_BASIC_STR_CHARS + +ILLEGAL_COMMENT_CHARS = ILLEGAL_BASIC_STR_CHARS + +TOML_WS = frozenset(" \t") +TOML_WS_AND_NEWLINE = TOML_WS | frozenset("\n") +BARE_KEY_CHARS = frozenset(string.ascii_letters + string.digits + "-_") +KEY_INITIAL_CHARS = BARE_KEY_CHARS | frozenset("\"'") +HEXDIGIT_CHARS = frozenset(string.hexdigits) + +BASIC_STR_ESCAPE_REPLACEMENTS = MappingProxyType( + { + "\\b": "\u0008", # backspace + "\\t": "\u0009", # tab + "\\n": "\u000A", # linefeed + "\\f": "\u000C", # form feed + "\\r": "\u000D", # carriage return + '\\"': "\u0022", # quote + "\\\\": "\u005C", # backslash + } +) + + +class TOMLDecodeError(ValueError): + """An error raised if a document is not valid TOML.""" + + +def load(fp: BinaryIO, /, *, parse_float: ParseFloat = float) -> dict[str, Any]: + """Parse TOML from a binary file object.""" + b = fp.read() + try: + s = b.decode() + except AttributeError: + raise TypeError( + "File must be opened in binary mode, e.g. use `open('foo.toml', 'rb')`" + ) from None + return loads(s, parse_float=parse_float) + + +def loads(s: str, /, *, parse_float: ParseFloat = float) -> dict[str, Any]: # noqa: C901 + """Parse TOML from a string.""" + + # The spec allows converting "\r\n" to "\n", even in string + # literals. Let's do so to simplify parsing. + src = s.replace("\r\n", "\n") + pos = 0 + out = Output(NestedDict(), Flags()) + header: Key = () + parse_float = make_safe_parse_float(parse_float) + + # Parse one statement at a time + # (typically means one line in TOML source) + while True: + # 1. Skip line leading whitespace + pos = skip_chars(src, pos, TOML_WS) + + # 2. Parse rules. Expect one of the following: + # - end of file + # - end of line + # - comment + # - key/value pair + # - append dict to list (and move to its namespace) + # - create dict (and move to its namespace) + # Skip trailing whitespace when applicable. + try: + char = src[pos] + except IndexError: + break + if char == "\n": + pos += 1 + continue + if char in KEY_INITIAL_CHARS: + pos = key_value_rule(src, pos, out, header, parse_float) + pos = skip_chars(src, pos, TOML_WS) + elif char == "[": + try: + second_char: str | None = src[pos + 1] + except IndexError: + second_char = None + out.flags.finalize_pending() + if second_char == "[": + pos, header = create_list_rule(src, pos, out) + else: + pos, header = create_dict_rule(src, pos, out) + pos = skip_chars(src, pos, TOML_WS) + elif char != "#": + raise suffixed_err(src, pos, "Invalid statement") + + # 3. Skip comment + pos = skip_comment(src, pos) + + # 4. Expect end of line or end of file + try: + char = src[pos] + except IndexError: + break + if char != "\n": + raise suffixed_err( + src, pos, "Expected newline or end of document after a statement" + ) + pos += 1 + + return out.data.dict + + +class Flags: + """Flags that map to parsed keys/namespaces.""" + + # Marks an immutable namespace (inline array or inline table). + FROZEN = 0 + # Marks a nest that has been explicitly created and can no longer + # be opened using the "[table]" syntax. + EXPLICIT_NEST = 1 + + def __init__(self) -> None: + self._flags: dict[str, dict] = {} + self._pending_flags: set[tuple[Key, int]] = set() + + def add_pending(self, key: Key, flag: int) -> None: + self._pending_flags.add((key, flag)) + + def finalize_pending(self) -> None: + for key, flag in self._pending_flags: + self.set(key, flag, recursive=False) + self._pending_flags.clear() + + def unset_all(self, key: Key) -> None: + cont = self._flags + for k in key[:-1]: + if k not in cont: + return + cont = cont[k]["nested"] + cont.pop(key[-1], None) + + def set(self, key: Key, flag: int, *, recursive: bool) -> None: # noqa: A003 + cont = self._flags + key_parent, key_stem = key[:-1], key[-1] + for k in key_parent: + if k not in cont: + cont[k] = {"flags": set(), "recursive_flags": set(), "nested": {}} + cont = cont[k]["nested"] + if key_stem not in cont: + cont[key_stem] = {"flags": set(), "recursive_flags": set(), "nested": {}} + cont[key_stem]["recursive_flags" if recursive else "flags"].add(flag) + + def is_(self, key: Key, flag: int) -> bool: + if not key: + return False # document root has no flags + cont = self._flags + for k in key[:-1]: + if k not in cont: + return False + inner_cont = cont[k] + if flag in inner_cont["recursive_flags"]: + return True + cont = inner_cont["nested"] + key_stem = key[-1] + if key_stem in cont: + cont = cont[key_stem] + return flag in cont["flags"] or flag in cont["recursive_flags"] + return False + + +class NestedDict: + def __init__(self) -> None: + # The parsed content of the TOML document + self.dict: dict[str, Any] = {} + + def get_or_create_nest( + self, + key: Key, + *, + access_lists: bool = True, + ) -> dict: + cont: Any = self.dict + for k in key: + if k not in cont: + cont[k] = {} + cont = cont[k] + if access_lists and isinstance(cont, list): + cont = cont[-1] + if not isinstance(cont, dict): + raise KeyError("There is no nest behind this key") + return cont + + def append_nest_to_list(self, key: Key) -> None: + cont = self.get_or_create_nest(key[:-1]) + last_key = key[-1] + if last_key in cont: + list_ = cont[last_key] + if not isinstance(list_, list): + raise KeyError("An object other than list found behind this key") + list_.append({}) + else: + cont[last_key] = [{}] + + +class Output(NamedTuple): + data: NestedDict + flags: Flags + + +def skip_chars(src: str, pos: Pos, chars: Iterable[str]) -> Pos: + try: + while src[pos] in chars: + pos += 1 + except IndexError: + pass + return pos + + +def skip_until( + src: str, + pos: Pos, + expect: str, + *, + error_on: frozenset[str], + error_on_eof: bool, +) -> Pos: + try: + new_pos = src.index(expect, pos) + except ValueError: + new_pos = len(src) + if error_on_eof: + raise suffixed_err(src, new_pos, f"Expected {expect!r}") from None + + if not error_on.isdisjoint(src[pos:new_pos]): + while src[pos] not in error_on: + pos += 1 + raise suffixed_err(src, pos, f"Found invalid character {src[pos]!r}") + return new_pos + + +def skip_comment(src: str, pos: Pos) -> Pos: + try: + char: str | None = src[pos] + except IndexError: + char = None + if char == "#": + return skip_until( + src, pos + 1, "\n", error_on=ILLEGAL_COMMENT_CHARS, error_on_eof=False + ) + return pos + + +def skip_comments_and_array_ws(src: str, pos: Pos) -> Pos: + while True: + pos_before_skip = pos + pos = skip_chars(src, pos, TOML_WS_AND_NEWLINE) + pos = skip_comment(src, pos) + if pos == pos_before_skip: + return pos + + +def create_dict_rule(src: str, pos: Pos, out: Output) -> tuple[Pos, Key]: + pos += 1 # Skip "[" + pos = skip_chars(src, pos, TOML_WS) + pos, key = parse_key(src, pos) + + if out.flags.is_(key, Flags.EXPLICIT_NEST) or out.flags.is_(key, Flags.FROZEN): + raise suffixed_err(src, pos, f"Cannot declare {key} twice") + out.flags.set(key, Flags.EXPLICIT_NEST, recursive=False) + try: + out.data.get_or_create_nest(key) + except KeyError: + raise suffixed_err(src, pos, "Cannot overwrite a value") from None + + if not src.startswith("]", pos): + raise suffixed_err(src, pos, "Expected ']' at the end of a table declaration") + return pos + 1, key + + +def create_list_rule(src: str, pos: Pos, out: Output) -> tuple[Pos, Key]: + pos += 2 # Skip "[[" + pos = skip_chars(src, pos, TOML_WS) + pos, key = parse_key(src, pos) + + if out.flags.is_(key, Flags.FROZEN): + raise suffixed_err(src, pos, f"Cannot mutate immutable namespace {key}") + # Free the namespace now that it points to another empty list item... + out.flags.unset_all(key) + # ...but this key precisely is still prohibited from table declaration + out.flags.set(key, Flags.EXPLICIT_NEST, recursive=False) + try: + out.data.append_nest_to_list(key) + except KeyError: + raise suffixed_err(src, pos, "Cannot overwrite a value") from None + + if not src.startswith("]]", pos): + raise suffixed_err(src, pos, "Expected ']]' at the end of an array declaration") + return pos + 2, key + + +def key_value_rule( + src: str, pos: Pos, out: Output, header: Key, parse_float: ParseFloat +) -> Pos: + pos, key, value = parse_key_value_pair(src, pos, parse_float) + key_parent, key_stem = key[:-1], key[-1] + abs_key_parent = header + key_parent + + relative_path_cont_keys = (header + key[:i] for i in range(1, len(key))) + for cont_key in relative_path_cont_keys: + # Check that dotted key syntax does not redefine an existing table + if out.flags.is_(cont_key, Flags.EXPLICIT_NEST): + raise suffixed_err(src, pos, f"Cannot redefine namespace {cont_key}") + # Containers in the relative path can't be opened with the table syntax or + # dotted key/value syntax in following table sections. + out.flags.add_pending(cont_key, Flags.EXPLICIT_NEST) + + if out.flags.is_(abs_key_parent, Flags.FROZEN): + raise suffixed_err( + src, pos, f"Cannot mutate immutable namespace {abs_key_parent}" + ) + + try: + nest = out.data.get_or_create_nest(abs_key_parent) + except KeyError: + raise suffixed_err(src, pos, "Cannot overwrite a value") from None + if key_stem in nest: + raise suffixed_err(src, pos, "Cannot overwrite a value") + # Mark inline table and array namespaces recursively immutable + if isinstance(value, (dict, list)): + out.flags.set(header + key, Flags.FROZEN, recursive=True) + nest[key_stem] = value + return pos + + +def parse_key_value_pair( + src: str, pos: Pos, parse_float: ParseFloat +) -> tuple[Pos, Key, Any]: + pos, key = parse_key(src, pos) + try: + char: str | None = src[pos] + except IndexError: + char = None + if char != "=": + raise suffixed_err(src, pos, "Expected '=' after a key in a key/value pair") + pos += 1 + pos = skip_chars(src, pos, TOML_WS) + pos, value = parse_value(src, pos, parse_float) + return pos, key, value + + +def parse_key(src: str, pos: Pos) -> tuple[Pos, Key]: + pos, key_part = parse_key_part(src, pos) + key: Key = (key_part,) + pos = skip_chars(src, pos, TOML_WS) + while True: + try: + char: str | None = src[pos] + except IndexError: + char = None + if char != ".": + return pos, key + pos += 1 + pos = skip_chars(src, pos, TOML_WS) + pos, key_part = parse_key_part(src, pos) + key += (key_part,) + pos = skip_chars(src, pos, TOML_WS) + + +def parse_key_part(src: str, pos: Pos) -> tuple[Pos, str]: + try: + char: str | None = src[pos] + except IndexError: + char = None + if char in BARE_KEY_CHARS: + start_pos = pos + pos = skip_chars(src, pos, BARE_KEY_CHARS) + return pos, src[start_pos:pos] + if char == "'": + return parse_literal_str(src, pos) + if char == '"': + return parse_one_line_basic_str(src, pos) + raise suffixed_err(src, pos, "Invalid initial character for a key part") + + +def parse_one_line_basic_str(src: str, pos: Pos) -> tuple[Pos, str]: + pos += 1 + return parse_basic_str(src, pos, multiline=False) + + +def parse_array(src: str, pos: Pos, parse_float: ParseFloat) -> tuple[Pos, list]: + pos += 1 + array: list = [] + + pos = skip_comments_and_array_ws(src, pos) + if src.startswith("]", pos): + return pos + 1, array + while True: + pos, val = parse_value(src, pos, parse_float) + array.append(val) + pos = skip_comments_and_array_ws(src, pos) + + c = src[pos : pos + 1] + if c == "]": + return pos + 1, array + if c != ",": + raise suffixed_err(src, pos, "Unclosed array") + pos += 1 + + pos = skip_comments_and_array_ws(src, pos) + if src.startswith("]", pos): + return pos + 1, array + + +def parse_inline_table(src: str, pos: Pos, parse_float: ParseFloat) -> tuple[Pos, dict]: + pos += 1 + nested_dict = NestedDict() + flags = Flags() + + pos = skip_chars(src, pos, TOML_WS) + if src.startswith("}", pos): + return pos + 1, nested_dict.dict + while True: + pos, key, value = parse_key_value_pair(src, pos, parse_float) + key_parent, key_stem = key[:-1], key[-1] + if flags.is_(key, Flags.FROZEN): + raise suffixed_err(src, pos, f"Cannot mutate immutable namespace {key}") + try: + nest = nested_dict.get_or_create_nest(key_parent, access_lists=False) + except KeyError: + raise suffixed_err(src, pos, "Cannot overwrite a value") from None + if key_stem in nest: + raise suffixed_err(src, pos, f"Duplicate inline table key {key_stem!r}") + nest[key_stem] = value + pos = skip_chars(src, pos, TOML_WS) + c = src[pos : pos + 1] + if c == "}": + return pos + 1, nested_dict.dict + if c != ",": + raise suffixed_err(src, pos, "Unclosed inline table") + if isinstance(value, (dict, list)): + flags.set(key, Flags.FROZEN, recursive=True) + pos += 1 + pos = skip_chars(src, pos, TOML_WS) + + +def parse_basic_str_escape( + src: str, pos: Pos, *, multiline: bool = False +) -> tuple[Pos, str]: + escape_id = src[pos : pos + 2] + pos += 2 + if multiline and escape_id in {"\\ ", "\\\t", "\\\n"}: + # Skip whitespace until next non-whitespace character or end of + # the doc. Error if non-whitespace is found before newline. + if escape_id != "\\\n": + pos = skip_chars(src, pos, TOML_WS) + try: + char = src[pos] + except IndexError: + return pos, "" + if char != "\n": + raise suffixed_err(src, pos, "Unescaped '\\' in a string") + pos += 1 + pos = skip_chars(src, pos, TOML_WS_AND_NEWLINE) + return pos, "" + if escape_id == "\\u": + return parse_hex_char(src, pos, 4) + if escape_id == "\\U": + return parse_hex_char(src, pos, 8) + try: + return pos, BASIC_STR_ESCAPE_REPLACEMENTS[escape_id] + except KeyError: + raise suffixed_err(src, pos, "Unescaped '\\' in a string") from None + + +def parse_basic_str_escape_multiline(src: str, pos: Pos) -> tuple[Pos, str]: + return parse_basic_str_escape(src, pos, multiline=True) + + +def parse_hex_char(src: str, pos: Pos, hex_len: int) -> tuple[Pos, str]: + hex_str = src[pos : pos + hex_len] + if len(hex_str) != hex_len or not HEXDIGIT_CHARS.issuperset(hex_str): + raise suffixed_err(src, pos, "Invalid hex value") + pos += hex_len + hex_int = int(hex_str, 16) + if not is_unicode_scalar_value(hex_int): + raise suffixed_err(src, pos, "Escaped character is not a Unicode scalar value") + return pos, chr(hex_int) + + +def parse_literal_str(src: str, pos: Pos) -> tuple[Pos, str]: + pos += 1 # Skip starting apostrophe + start_pos = pos + pos = skip_until( + src, pos, "'", error_on=ILLEGAL_LITERAL_STR_CHARS, error_on_eof=True + ) + return pos + 1, src[start_pos:pos] # Skip ending apostrophe + + +def parse_multiline_str(src: str, pos: Pos, *, literal: bool) -> tuple[Pos, str]: + pos += 3 + if src.startswith("\n", pos): + pos += 1 + + if literal: + delim = "'" + end_pos = skip_until( + src, + pos, + "'''", + error_on=ILLEGAL_MULTILINE_LITERAL_STR_CHARS, + error_on_eof=True, + ) + result = src[pos:end_pos] + pos = end_pos + 3 + else: + delim = '"' + pos, result = parse_basic_str(src, pos, multiline=True) + + # Add at maximum two extra apostrophes/quotes if the end sequence + # is 4 or 5 chars long instead of just 3. + if not src.startswith(delim, pos): + return pos, result + pos += 1 + if not src.startswith(delim, pos): + return pos, result + delim + pos += 1 + return pos, result + (delim * 2) + + +def parse_basic_str(src: str, pos: Pos, *, multiline: bool) -> tuple[Pos, str]: + if multiline: + error_on = ILLEGAL_MULTILINE_BASIC_STR_CHARS + parse_escapes = parse_basic_str_escape_multiline + else: + error_on = ILLEGAL_BASIC_STR_CHARS + parse_escapes = parse_basic_str_escape + result = "" + start_pos = pos + while True: + try: + char = src[pos] + except IndexError: + raise suffixed_err(src, pos, "Unterminated string") from None + if char == '"': + if not multiline: + return pos + 1, result + src[start_pos:pos] + if src.startswith('"""', pos): + return pos + 3, result + src[start_pos:pos] + pos += 1 + continue + if char == "\\": + result += src[start_pos:pos] + pos, parsed_escape = parse_escapes(src, pos) + result += parsed_escape + start_pos = pos + continue + if char in error_on: + raise suffixed_err(src, pos, f"Illegal character {char!r}") + pos += 1 + + +def parse_value( # noqa: C901 + src: str, pos: Pos, parse_float: ParseFloat +) -> tuple[Pos, Any]: + try: + char: str | None = src[pos] + except IndexError: + char = None + + # IMPORTANT: order conditions based on speed of checking and likelihood + + # Basic strings + if char == '"': + if src.startswith('"""', pos): + return parse_multiline_str(src, pos, literal=False) + return parse_one_line_basic_str(src, pos) + + # Literal strings + if char == "'": + if src.startswith("'''", pos): + return parse_multiline_str(src, pos, literal=True) + return parse_literal_str(src, pos) + + # Booleans + if char == "t": + if src.startswith("true", pos): + return pos + 4, True + if char == "f": + if src.startswith("false", pos): + return pos + 5, False + + # Arrays + if char == "[": + return parse_array(src, pos, parse_float) + + # Inline tables + if char == "{": + return parse_inline_table(src, pos, parse_float) + + # Dates and times + datetime_match = RE_DATETIME.match(src, pos) + if datetime_match: + try: + datetime_obj = match_to_datetime(datetime_match) + except ValueError as e: + raise suffixed_err(src, pos, "Invalid date or datetime") from e + return datetime_match.end(), datetime_obj + localtime_match = RE_LOCALTIME.match(src, pos) + if localtime_match: + return localtime_match.end(), match_to_localtime(localtime_match) + + # Integers and "normal" floats. + # The regex will greedily match any type starting with a decimal + # char, so needs to be located after handling of dates and times. + number_match = RE_NUMBER.match(src, pos) + if number_match: + return number_match.end(), match_to_number(number_match, parse_float) + + # Special floats + first_three = src[pos : pos + 3] + if first_three in {"inf", "nan"}: + return pos + 3, parse_float(first_three) + first_four = src[pos : pos + 4] + if first_four in {"-inf", "+inf", "-nan", "+nan"}: + return pos + 4, parse_float(first_four) + + raise suffixed_err(src, pos, "Invalid value") + + +def suffixed_err(src: str, pos: Pos, msg: str) -> TOMLDecodeError: + """Return a `TOMLDecodeError` where error message is suffixed with + coordinates in source.""" + + def coord_repr(src: str, pos: Pos) -> str: + if pos >= len(src): + return "end of document" + line = src.count("\n", 0, pos) + 1 + if line == 1: + column = pos + 1 + else: + column = pos - src.rindex("\n", 0, pos) + return f"line {line}, column {column}" + + return TOMLDecodeError(f"{msg} (at {coord_repr(src, pos)})") + + +def is_unicode_scalar_value(codepoint: int) -> bool: + return (0 <= codepoint <= 55295) or (57344 <= codepoint <= 1114111) + + +def make_safe_parse_float(parse_float: ParseFloat) -> ParseFloat: + """A decorator to make `parse_float` safe. + + `parse_float` must not return dicts or lists, because these types + would be mixed with parsed TOML tables and arrays, thus confusing + the parser. The returned decorated callable raises `ValueError` + instead of returning illegal types. + """ + # The default `float` callable never returns illegal types. Optimize it. + if parse_float is float: # type: ignore[comparison-overlap] + return float + + def safe_parse_float(float_str: str) -> Any: + float_value = parse_float(float_str) + if isinstance(float_value, (dict, list)): + raise ValueError("parse_float must not return dicts or lists") + return float_value + + return safe_parse_float diff --git a/pygbag/support/cross/tomllib/_re.py b/pygbag/support/cross/tomllib/_re.py new file mode 100644 index 0000000..994bb74 --- /dev/null +++ b/pygbag/support/cross/tomllib/_re.py @@ -0,0 +1,107 @@ +# SPDX-License-Identifier: MIT +# SPDX-FileCopyrightText: 2021 Taneli Hukkinen +# Licensed to PSF under a Contributor Agreement. + +from __future__ import annotations + +from datetime import date, datetime, time, timedelta, timezone, tzinfo +from functools import lru_cache +import re +from typing import Any + +from ._types import ParseFloat + +# E.g. +# - 00:32:00.999999 +# - 00:32:00 +_TIME_RE_STR = r"([01][0-9]|2[0-3]):([0-5][0-9]):([0-5][0-9])(?:\.([0-9]{1,6})[0-9]*)?" + +RE_NUMBER = re.compile( + r""" +0 +(?: + x[0-9A-Fa-f](?:_?[0-9A-Fa-f])* # hex + | + b[01](?:_?[01])* # bin + | + o[0-7](?:_?[0-7])* # oct +) +| +[+-]?(?:0|[1-9](?:_?[0-9])*) # dec, integer part +(?P + (?:\.[0-9](?:_?[0-9])*)? # optional fractional part + (?:[eE][+-]?[0-9](?:_?[0-9])*)? # optional exponent part +) +""", + flags=re.VERBOSE, +) +RE_LOCALTIME = re.compile(_TIME_RE_STR) +RE_DATETIME = re.compile( + rf""" +([0-9]{{4}})-(0[1-9]|1[0-2])-(0[1-9]|[12][0-9]|3[01]) # date, e.g. 1988-10-27 +(?: + [Tt ] + {_TIME_RE_STR} + (?:([Zz])|([+-])([01][0-9]|2[0-3]):([0-5][0-9]))? # optional time offset +)? +""", + flags=re.VERBOSE, +) + + +def match_to_datetime(match: re.Match) -> datetime | date: + """Convert a `RE_DATETIME` match to `datetime.datetime` or `datetime.date`. + + Raises ValueError if the match does not correspond to a valid date + or datetime. + """ + ( + year_str, + month_str, + day_str, + hour_str, + minute_str, + sec_str, + micros_str, + zulu_time, + offset_sign_str, + offset_hour_str, + offset_minute_str, + ) = match.groups() + year, month, day = int(year_str), int(month_str), int(day_str) + if hour_str is None: + return date(year, month, day) + hour, minute, sec = int(hour_str), int(minute_str), int(sec_str) + micros = int(micros_str.ljust(6, "0")) if micros_str else 0 + if offset_sign_str: + tz: tzinfo | None = cached_tz( + offset_hour_str, offset_minute_str, offset_sign_str + ) + elif zulu_time: + tz = timezone.utc + else: # local date-time + tz = None + return datetime(year, month, day, hour, minute, sec, micros, tzinfo=tz) + + +@lru_cache(maxsize=None) +def cached_tz(hour_str: str, minute_str: str, sign_str: str) -> timezone: + sign = 1 if sign_str == "+" else -1 + return timezone( + timedelta( + hours=sign * int(hour_str), + minutes=sign * int(minute_str), + ) + ) + + +def match_to_localtime(match: re.Match) -> time: + hour_str, minute_str, sec_str, micros_str = match.groups() + micros = int(micros_str.ljust(6, "0")) if micros_str else 0 + return time(int(hour_str), int(minute_str), int(sec_str), micros) + + +def match_to_number(match: re.Match, parse_float: ParseFloat) -> Any: + if match.group("floatpart"): + return parse_float(match.group()) + return int(match.group(), 0) diff --git a/pygbag/support/cross/tomllib/_types.py b/pygbag/support/cross/tomllib/_types.py new file mode 100644 index 0000000..d949412 --- /dev/null +++ b/pygbag/support/cross/tomllib/_types.py @@ -0,0 +1,10 @@ +# SPDX-License-Identifier: MIT +# SPDX-FileCopyrightText: 2021 Taneli Hukkinen +# Licensed to PSF under a Contributor Agreement. + +from typing import Any, Callable, Tuple + +# Type annotations +ParseFloat = Callable[[str], Any] +Key = Tuple[str, ...] +Pos = int diff --git a/pygbag/support/pygbag_app.py b/pygbag/support/pygbag_app.py index 0209dce..7ad06d5 100644 --- a/pygbag/support/pygbag_app.py +++ b/pygbag/support/pygbag_app.py @@ -7,64 +7,48 @@ -from pygbag_ui import Tui, TTY -import pygbag_ui as ui -from pygbag_ux import * - -ux_dim(1280 // 2 , 720 // 2) +from pygbag_ui import Tui, TTY, clear +import pygbag_ui as ui +import pygbag_ux as ux -class vpad: - X = 0 - Z = 1 - Y = 2 - evx = [] - evy = [] - evz = [] - axis = [evx, evz, evy] +#ux.dim(1280 // 2 , 720 // 2) - LZ = 0.5 +class console: @classmethod - def get_axis(self, n): - if len(self.axis[n]): - return self.axis[n].pop(0) - return 0.0 - - @classmethod - def emit(self, axis, value): - import pygame - self.axis[axis].append(float(value)) - ev = pygame.event.Event(pygame.JOYAXISMOTION) - pygame.event.post(ev) - return False - - + def log(self, *argv, **kw): + import io + sio = io.StringIO() + kw['file']=sio + print(*argv,**kw) + ui.clog.append(sio.read()) -def console(): - import platform - try: - platform.window.pyconsole.hidden = False - platform.window.document.body.style.background = "#000000"; - except: - ... - - import os - _, LINES = os.get_terminal_size() - CONSOLE = os.get_console_size() - - # split the display - if sys.platform not in ('emscripten','wasi') or aio.cross.simulator: - LINES = LINES - CONSOLE - - ui.TTY.set_raw(1) - import select,os,platform - - platform.shell.is_interactive = False - platform.shell.interactive(True) - aio.toplevel.handler.muted = False - - ui.clear(LINES, CONSOLE, '>>> ') + @classmethod + def get(self): + import platform + try: + platform.window.pyconsole.hidden = False + platform.window.document.body.style.background = "#000000"; + except: + ... + + import os + _, LINES = os.get_terminal_size() + CONSOLE = os.get_console_size() + + # split the display + if sys.platform not in ('emscripten','wasi') or aio.cross.simulator: + LINES = LINES - CONSOLE + + TTY.set_raw(1) + + platform.shell.is_interactive = False + platform.shell.interactive(True) + aio.toplevel.handler.muted = False + + clear(LINES, CONSOLE) #, '>C> ') + return self diff --git a/pygbag/support/pygbag_fsm.py b/pygbag/support/pygbag_fsm.py index a5f08e9..82ca22a 100644 --- a/pygbag/support/pygbag_fsm.py +++ b/pygbag/support/pygbag_fsm.py @@ -67,7 +67,7 @@ def state_refs(*argv): def build(pkg, **kw): global story - steps = [] # INTRO, BORED, WANDERING, EXPLORATION, BOOTING, PLAY + steps = [] for k, v in vars(pkg).items(): if v in [State, Draft]: diff --git a/pygbag/support/pygbag_ui.py b/pygbag/support/pygbag_ui.py index eeea41d..f1fa228 100644 --- a/pygbag/support/pygbag_ui.py +++ b/pygbag/support/pygbag_ui.py @@ -158,7 +158,7 @@ def prompt(self): if len(self.prompts): self.prompts.clear() import platform - print("\r>+> ",end="") + print("\r>>> ",end="") self.flush() @@ -370,10 +370,8 @@ def clear(LINES=0, CONSOLE=0, prompt=""): break goto_xz(1, TTY.LINES + 1) - if prompt: - CSI("0J", "1J", f"{TTY.LINES+1};{TTY.LINES+TTY.CONSOLE}r", f"{TTY.LINES+TTY.CONSOLE-1};1H{prompt}") - else: - CSI("1J", f"{TTY.LINES+1};{TTY.LINES+TTY.CONSOLE}r", f"{TTY.LINES+1};1H{prompt}") + CSI("0J", "1J", f"{TTY.LINES+1};{TTY.LINES+TTY.CONSOLE}r", f"{TTY.LINES+TTY.CONSOLE-1};1H{prompt}") + elif CONSOLE < 0: goto_xz(1, TTY.LINES + 1) CSI(f"0J", f"{TTY.LINES+1};1H{prompt}") diff --git a/pygbag/support/pygbag_ux.py b/pygbag/support/pygbag_ux.py index 342df5b..e2cb479 100644 --- a/pygbag/support/pygbag_ux.py +++ b/pygbag/support/pygbag_ux.py @@ -3,7 +3,7 @@ WIDTH = 1280 # {{cookiecutter.width}} HEIGHT = 720 # {{cookiecutter.height}} -def ux_dim(w,h): +def dim(w,h): global WIDTH, HEIGHT WIDTH = int(w) HEIGHT = int(h) @@ -21,21 +21,21 @@ def u(real, ref, v): return result return int((real / ref) * v) -def ux(*argv): +def x(*argv): global WIDTH, REFX acc = 0 for v in argv: acc += u(WIDTH, REFX, v) return acc -def uy(*argv): +def y(*argv): global HEIGHT, REFY acc = 0 for v in argv: acc += u(HEIGHT, REFY, v) return acc -def ur(*argv): +def r(*argv): x = ux(argv[0]) y = uy(argv[1]) ret = [x, y] @@ -75,3 +75,29 @@ def pg_load(fn, resize=None, width=0, height=0, alpha=True): # offscreen case except: return media + + +class vpad: + X = 0 + Z = 1 + Y = 2 + evx = [] + evy = [] + evz = [] + axis = [evx, evz, evy] + + LZ = 0.5 + + @classmethod + def get_axis(self, n): + if len(self.axis[n]): + return self.axis[n].pop(0) + return 0.0 + + @classmethod + def emit(self, axis, value): + import pygame + self.axis[axis].append(float(value)) + ev = pygame.event.Event(pygame.JOYAXISMOTION) + pygame.event.post(ev) + return False diff --git a/pygbag/support/pythonrc.py b/pygbag/support/pythonrc.py index d17f4f6..52231de 100644 --- a/pygbag/support/pythonrc.py +++ b/pygbag/support/pythonrc.py @@ -1319,43 +1319,7 @@ def imports(cls, *mods, lvl=0): return wants -# # pygame must be early for plotting -# if ("matplotlib" in all) and ("pygame" not in sys.modules): -# await import_now("pygame") -# -# for req in all: -# if req == "pyyaml": -# req = "yaml" -# -# if req == "python-dateutil": -# req = "dateutil" -# -# if req == "pillow": -# req = "PIL" -# - @classmethod - async def pv(cls, track, prefix="", suffix="", decimals=1, length=70, fill="X", printEnd="\r"): - # Progress Bar Printing Function - def print_pg_bar(total, iteration): - if iteration > total: - iteration = total - percent = ("{0:." + str(decimals) + "f}").format(100 * (iteration / float(total))) - filledLength = int(length * iteration // total) - bar = fill * filledLength + "-" * (length - filledLength) - print(f"\r{prefix} |{bar}| {percent}% {suffix}", end=printEnd) - - # Update Progress Bar - while True: - if track.pos < 0: - raise IOError(404) - print_pg_bar(track.len or 100, track.pos or 0) - if track.avail: - break - await asyncio.sleep(0.02) - - # Print New Line on Complete - print() # end TopLevel_async_handler diff --git a/scripts/build-rootfs.sh b/scripts/build-rootfs.sh index 8b0d057..11b6bab 100755 --- a/scripts/build-rootfs.sh +++ b/scripts/build-rootfs.sh @@ -49,6 +49,9 @@ import asyncio.selector_events import multiprocessing +# mypy +import typing_extensions + # for dom event subscriptions and js interface import webbrowser import platform diff --git a/static/pythons.js b/static/pythons.js index da1119c..df9c3ae 100644 --- a/static/pythons.js +++ b/static/pythons.js @@ -1145,17 +1145,21 @@ function feat_snd() { if (!MM.is_safari) MM.is_safari = navigator.userAgent.search("iPhone")>=0; - if (!MM.UME && !MM.is_safari) - MM_play( {auto:1, test:1, media: new Audio(config.cdn+"empty.ogg")} , 1) - - if (MM.is_safari) { - MM.is_safari = function unlock_ume() { - console.warn("safari ume unlocking") + if (!MM.UME) { + if (MM.is_safari) { + console.warn("safari ume unlocking") + MM.is_safari = function unlock_ume() { MM.UME = 1 window.removeEventListener("click", MM.is_safari) MM.is_safari = 1 } - window.addEventListener("click", MM.is_safari) + window.addEventListener("click", MM.is_safari) + } else { + console.warn("Auto ume unlocker safari==", MM.is_safari) + MM_play( {auto:1, test:1, media: new Audio(config.cdn+"empty.ogg")} , 1) + } + } else { + console.warn("NO ume unlocker, safari ==", MM.is_safari) } } @@ -2423,7 +2427,7 @@ config.interactive = config.interactive || (location.search.search("-i")>=0) //? "malloc_stats" : 0 , "platlibdir" : "lib", "prefix" : "/data/data/org.python/assets/site-packages", - "ps1" : ">>> ", + "ps1" : ">J> ", "ps2" : "... " }`) diff --git a/static/vtx.js b/static/vtx.js index 9f87d36..2afdfc6 100644 --- a/static/vtx.js +++ b/static/vtx.js @@ -144,7 +144,7 @@ export class WasmTerminal { case 0x5b: const cursor = readline.history.length + readline.index - var histo = ">>> " + var histo = ">h> " switch ( data.charCodeAt(2) ) { // "?" @@ -172,7 +172,7 @@ export class WasmTerminal { if ( cursor >0 ) { readline.index-- - histo = ">>> " +readline.history[cursor-1] + histo = ">h> " +readline.history[cursor-1] //console.log(__FILE__," histo-up :", readline.index, cursor, histo) this.ESC("[132D","[2K") diff --git a/support/__EMSCRIPTEN__.c b/support/__EMSCRIPTEN__.c index e02ab9b..cfaac28 100644 --- a/support/__EMSCRIPTEN__.c +++ b/support/__EMSCRIPTEN__.c @@ -328,7 +328,7 @@ embed_set_ps2(PyObject *self, PyObject *_null) { static PyObject * embed_prompt(PyObject *self, PyObject *_null) { if (sys_ps==1) - fprintf( stderr, ">>> "); + fprintf( stderr, ">=> "); else fprintf( stderr, "... "); embed_flush(self,_null); @@ -1114,6 +1114,8 @@ EM_ASM({ PyRun_SimpleString("import sys, os, json, builtins, time"); + PyRun_SimpleString("sys.ps1 = ''"); + //PyRun_SimpleString("import hpy;import hpy.universal;print('HPy init done')"); #if defined(FT) int error;