From c1002d27b2c6b7aab39e2c74fbb20a255037f407 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ole=20Andr=C3=A9=20Vadla=20Ravn=C3=A5s?= Date: Wed, 17 Jul 2024 00:19:37 +0200 Subject: [PATCH] lsd: Rework UI Fetching system parameters may take a while with some devices, so we should render the list without OS details first, and update the UI as the fetching completes for the different devices. --- frida_tools/lsd.py | 150 +++++++++++++++++++++++++++++---------------- 1 file changed, 98 insertions(+), 52 deletions(-) diff --git a/frida_tools/lsd.py b/frida_tools/lsd.py index 1c0ec5ba..5358ae4d 100644 --- a/frida_tools/lsd.py +++ b/frida_tools/lsd.py @@ -1,80 +1,126 @@ def main() -> None: import functools + import threading import frida + from prompt_toolkit.application import Application + from prompt_toolkit.key_binding import KeyBindings + from prompt_toolkit.layout.containers import HSplit, VSplit + from prompt_toolkit.layout.layout import Layout + from prompt_toolkit.widgets import Label from frida_tools.application import ConsoleApplication + from frida_tools.reactor import Reactor class LSDApplication(ConsoleApplication): + def __init__(self) -> None: + super().__init__(self._process_input, self._on_stop) + self._ui_app = None + self._pending_labels = set() + self._spinner_frames = ["v", "<", "^", ">"] + self._spinner_offset = 0 + self._lock = threading.Lock() + def _usage(self) -> str: return "%(prog)s [options]" def _needs_device(self) -> bool: return False - def _start(self) -> None: + def _process_input(self, reactor: Reactor) -> None: try: devices = frida.enumerate_devices() except Exception as e: self._update_status(f"Failed to enumerate devices: {e}") self._exit(1) return - device_name = {} - device_os = {} - for device in devices: - device_name[device.id] = device.name - try: + + bindings = KeyBindings() + + @bindings.add("") + def _(event): + self._reactor.io_cancellable.cancel() + + self._ui_app = Application(key_bindings=bindings, full_screen=False) + + id_rows = [] + type_rows = [] + name_rows = [] + os_rows = [] + for device in sorted(devices, key=functools.cmp_to_key(compare_devices)): + id_rows.append(Label(device.id, dont_extend_width=True)) + type_rows.append(Label(device.type, dont_extend_width=True)) + name_rows.append(Label(device.name, dont_extend_width=True)) + os_label = Label("", dont_extend_width=True) + os_rows.append(os_label) + + with self._lock: + self._pending_labels.add(os_label) + worker = threading.Thread(target=self._fetch_parameters, args=(device, os_label)) + worker.start() + + status_label = Label(" ") + body = HSplit( + [ + VSplit( + [ + HSplit([Label("Id", dont_extend_width=True), HSplit(id_rows)], padding_char="-", padding=1), + HSplit( + [Label("Type", dont_extend_width=True), HSplit(type_rows)], padding_char="-", padding=1 + ), + HSplit( + [Label("Name", dont_extend_width=True), HSplit(name_rows)], padding_char="-", padding=1 + ), + HSplit([Label("OS", dont_extend_width=True), HSplit(os_rows)], padding_char="-", padding=1), + ], + padding=2, + ), + status_label, + ] + ) + + self._ui_app.layout = Layout(body, focused_element=status_label) + + self._reactor.schedule(self._update_progress) + self._ui_app.run() + self._ui_app._redraw() + + def _on_stop(self): + if self._ui_app is not None: + self._ui_app.exit() + + def _update_progress(self): + with self._lock: + if not self._pending_labels: + self._exit(0) + return + + glyph = self._spinner_frames[self._spinner_offset % len(self._spinner_frames)] + self._spinner_offset += 1 + for label in self._pending_labels: + label.text = glyph + self._ui_app.invalidate() + + self._reactor.schedule(self._update_progress, delay=0.1) + + def _fetch_parameters(self, device, os_label): + try: + with self._reactor.io_cancellable: params = device.query_system_parameters() - except: - continue - device_name[device.id] = params.get("name", device.name) os = params["os"] version = os.get("version") if version is not None: - device_os[device.id] = os["name"] + " " + version + text = os["name"] + " " + version else: - device_os[device.id] = os["name"] - id_column_width = max(map(lambda device: len(device.id) if device.id is not None else 0, devices)) - type_column_width = max(map(lambda device: len(device.type) if device.type is not None else 0, devices)) - name_column_width = max(map(lambda name: len(name) if name is not None else 0, device_name.values())) - os_column_width = max(map(lambda os: len(os) if os is not None else 0, device_os.values())) - header_format = ( - "%-" - + str(id_column_width) - + "s " - + "%-" - + str(type_column_width) - + "s " - + "%-" - + str(name_column_width) - + "s " - + "%-" - + str(os_column_width) - + "s" - ) - self._print(header_format % ("Id", "Type", "Name", "OS")) - self._print( - f"{id_column_width * '-'} {type_column_width * '-'} {name_column_width * '-'} {os_column_width * '-'}" - ) - line_format = ( - "%-" - + str(id_column_width) - + "s " - + "%-" - + str(type_column_width) - + "s " - + "%-" - + str(name_column_width) - + "s " - + "%-" - + str(os_column_width) - + "s" - ) - for device in sorted(devices, key=functools.cmp_to_key(compare_devices)): - self._print( - line_format % (device.id, device.type, device_name.get(device.id), device_os.get(device.id, "")) - ) - self._exit(0) + text = os["name"] + except: + text = "" + + with self._lock: + os_label.text = text + self._pending_labels.remove(os_label) + + self._ui_app.invalidate() def compare_devices(a: frida.core.Device, b: frida.core.Device) -> int: a_score = score(a)