From 6cb88504dcde36bc2d54d60aea8d91227d020132 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ole=20Andr=C3=A9=20Vadla=20Ravn=C3=A5s?= Date: Mon, 9 Sep 2024 15:29:05 +0200 Subject: [PATCH] [WIP] tracer: Optimize manpage lookups MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Håvard Sørbø --- frida_tools/tracer.py | 293 +++++++++++++++++++++++++++++------------- 1 file changed, 207 insertions(+), 86 deletions(-) diff --git a/frida_tools/tracer.py b/frida_tools/tracer.py index 49299ec3..250ea762 100644 --- a/frida_tools/tracer.py +++ b/frida_tools/tracer.py @@ -1,16 +1,24 @@ +from __future__ import annotations import argparse import binascii import codecs +from dataclasses import dataclass +import gzip import os +from pathlib import Path import platform import re +import shlex import subprocess -from typing import Callable, List, Optional, Union +from typing import Callable, Generator, List, Optional, Union import frida from frida_tools.reactor import Reactor +MANPAGE_CONTROL_CHARS = re.compile(r"\.[a-zA-Z]*(\s|$)|\s?\"") +MANPAGE_FUNCTION_PROTOTYPE = re.compile(r"([a-zA-Z_]\w+)\(([^\)]+)") + def main() -> None: import json @@ -476,6 +484,7 @@ def __init__(self) -> None: self._on_load_callback: Optional[Callable[[TraceTarget, str, str], None]] = None self._on_update_callback: Optional[Callable[[TraceTarget, str, str], None]] = None self._decorate = False + self._manpages = None def ensure_handler(self, target: TraceTarget): raise NotImplementedError("not implemented") @@ -512,93 +521,11 @@ def _create_stub_handler(self, target: TraceTarget, decorate: bool) -> str: def _create_stub_native_handler(self, target: TraceTarget, decorate: bool) -> str: if target.flavor == "objc": - state = {"index": 2} - - def objc_arg(m): - index = state["index"] - r = ":${args[%d]} " % index - state["index"] = index + 1 - return r - - log_str = "`" + re.sub(r":", objc_arg, target.display_name) + "`" - if log_str.endswith("} ]`"): - log_str = log_str[:-3] + "]`" + log_str = self._create_objc_logging_code(target) elif target.flavor == "swift": - if decorate: - module_string = " [%s]" % os.path.basename(target.scope) - else: - module_string = "" - log_str = "'%(name)s()%(module_string)s'" % {"name": target.name, "module_string": module_string} + log_str = self._create_swift_logging_code(target, decorate) else: - for man_section in (2, 3): - args = [] - try: - with open(os.devnull, "w") as devnull: - man_argv = ["man"] - if platform.system() != "Darwin": - man_argv.extend(["-E", "UTF-8"]) - man_argv.extend(["-P", "col -b", str(man_section), target.name]) - output = subprocess.check_output(man_argv, stderr=devnull) - match = re.search( - r"^SYNOPSIS(?:.|\n)*?((?:^.+$\n)* {5}\w+[ \*\n]*" - + target.name - + r"\((?:.+\,\s*?$\n)*?(?:.+\;$\n))(?:.|\n)*^DESCRIPTION", - output.decode("UTF-8", errors="replace"), - re.MULTILINE, - ) - if match: - decl = match.group(1) - - for argm in re.finditer(r"[\(,]\s*(.+?)\s*\b(\w+)(?=[,\)])", decl): - typ = argm.group(1) - arg = argm.group(2) - if arg == "void": - continue - if arg == "...": - args.append('", ..." +') - continue - - read_ops = "" - annotate_pre = "" - annotate_post = "" - - normalized_type = re.sub(r"\s+", "", typ) - if normalized_type.endswith("*restrict"): - normalized_type = normalized_type[:-8] - if normalized_type in ("char*", "constchar*"): - read_ops = ".readUtf8String()" - annotate_pre = '"' - annotate_post = '"' - - arg_index = len(args) - - args.append( - "%(arg_name)s=%(annotate_pre)s${args[%(arg_index)s]%(read_ops)s}%(annotate_post)s" - % { - "arg_name": arg, - "arg_index": arg_index, - "read_ops": read_ops, - "annotate_pre": annotate_pre, - "annotate_post": annotate_post, - } - ) - break - except Exception: - pass - - if decorate: - module_string = " [%s]" % os.path.basename(target.scope) - else: - module_string = "" - - if len(args) == 0: - log_str = "'%(name)s()%(module_string)s'" % {"name": target.name, "module_string": module_string} - else: - log_str = "`%(name)s(%(args)s)%(module_string)s`" % { - "name": target.name, - "args": ", ".join(args), - "module_string": module_string, - } + log_str = self._create_cstyle_logging_code(target, decorate) return """\ /* @@ -644,6 +571,46 @@ def objc_arg(m): "log_str": log_str, } + def _create_cstyle_logging_code(self, target: TraceTarget, decorate: bool) -> str: + if decorate: + module_string = " [%s]" % os.path.basename(target.scope) + else: + module_string = "" + + args = self._generate_cstyle_argument_logging_code(target) + if len(args) == 0: + code = "'%(name)s()%(module_string)s'" % {"name": target.name, "module_string": module_string} + else: + code = "`%(name)s(%(args)s)%(module_string)s`" % { + "name": target.name, + "args": ", ".join(args), + "module_string": module_string, + } + + return code + + def _create_objc_logging_code(self, target: TraceTarget) -> str: + state = {"index": 2} + + def objc_arg(m): + index = state["index"] + r = ":${args[%d]} " % index + state["index"] = index + 1 + return r + + code = "`" + re.sub(r":", objc_arg, target.display_name) + "`" + if code.endswith("} ]`"): + code = code[:-3] + "]`" + + return code + + def _create_swift_logging_code(self, target: TraceTarget, decorate: bool) -> str: + if decorate: + module_string = " [%s]" % os.path.basename(target.scope) + else: + module_string = "" + return "'%(name)s()%(module_string)s'" % {"name": target.name, "module_string": module_string} + def _create_stub_java_handler(self, target: TraceTarget, decorate) -> str: return """\ /* @@ -685,6 +652,160 @@ def _create_stub_java_handler(self, target: TraceTarget, decorate) -> str: "display_name": target.display_name } + def _generate_cstyle_argument_logging_code(self, target: TraceTarget) -> List[str]: + if self._manpages is None: + self._manpages = {} + try: + manroots = [Path(d) for d in subprocess.run(["manpath"], + stdout=subprocess.PIPE, + encoding="utf-8", + check=True).stdout.strip().split(":")] + for section in (2, 3): + for manroot in manroots: + mandir = (manroot / f"man{section}") + if not mandir.exists(): + continue + raw_section = str(section) + for entry in mandir.iterdir(): + tokens = entry.name.split(".") + if len(tokens) < 2: + continue + if not tokens[1].startswith(raw_section): + continue + name = tokens[0] + if name in self._manpages: + continue + self._manpages[name] = (entry, section) + except Exception as e: + return [] + + man_entry = self._manpages.get(target.name) + if man_entry is None: + return [] + man_location, man_section = man_entry + + try: + args = [] + cfunc = next(f for f in self._read_manpage(man_location) if f.name == target.name) + for arg in cfunc.arguments: + if arg == "void": + continue + if arg.startswith("..."): + args.append("...") + continue + + tokens = arg.split(" ") + + arg_type = "".join(tokens[:-1]) + + arg_name = tokens[-1] + if arg_name.startswith("*"): + arg_type += "*" + arg_name = arg_name[1:] + elif arg_name.endswith("]"): + arg_type += "*" + arg_name = arg_name[:arg_name.index("[")] + + read_ops = "" + annotate_pre = "" + annotate_post = "" + + if arg_type.endswith("*restrict"): + arg_type = arg_type[:-8] + if arg_type in ("char*", "constchar*"): + read_ops = ".readUtf8String()" + annotate_pre = '"' + annotate_post = '"' + + arg_index = len(args) + + args.append( + "%(arg_name)s=%(annotate_pre)s${args[%(arg_index)s]%(read_ops)s}%(annotate_post)s" + % { + "arg_name": arg_name, + "arg_index": arg_index, + "read_ops": read_ops, + "annotate_pre": annotate_pre, + "annotate_post": annotate_post, + } + ) + return args + except Exception: + import traceback + traceback.print_exc() + return [] + + def _read_manpage(self, man_location: Path) -> Generator[CFuncSpec]: + if man_location.suffix == ".gz": + man_file = gzip.open(man_location, "rt", encoding="utf-8", errors="replace") + else: + man_file = open(man_location, "r", encoding="utf-8", errors="replace") + with man_file: + man_data = man_file.read() + + manpage_format = "gnu" + synopsis_lines = [] + found_synopsis = False + in_multiline = False + for raw_line in man_data.split("\n"): + line = raw_line.strip() + if line.startswith(".so "): + redirected_location = man_location.parent.parent / Path(line[4:]) + if not redirected_location.exists(): + redirected_location = redirected_location.parent / (redirected_location.name + ".gz") + yield from self._read_manpage(redirected_location) + return + if not found_synopsis and "SYNOPSIS" in line: + found_synopsis = True + continue + elif found_synopsis and line.endswith("DESCRIPTION"): + break + elif not found_synopsis: + continue + if line.startswith(".Fn ") or line.startswith(".Fo "): + manpage_format = "bsd" + escaped_newline = line.endswith("\\") + if escaped_newline: + line = line[:-1] + if in_multiline: + synopsis_lines[-1] += line + else: + synopsis_lines.append(line) + in_multiline = escaped_newline + + if manpage_format == "gnu": + raw_synopsis = "\n".join(synopsis_lines) + synopsis = MANPAGE_CONTROL_CHARS.sub("", raw_synopsis).replace("\n", " ").replace(" [", "[").replace(" ]", "]") + + for m in MANPAGE_FUNCTION_PROTOTYPE.finditer(synopsis): + name = protom.group(1) + signature = protom.group(2) + args = [a.strip() for a in signature.split(",")] + yield CFuncSpec(name, args) + else: + name = None + args = None + for line in synopsis_lines: + tokens = line.split(" ", maxsplit=1) + directive = tokens[0] + data = tokens[1] if len(tokens) == 2 else None + if directive == ".Fn": + argv = shlex.split(data) + yield CFuncSpec(argv[0], argv[1:]) + elif directive == ".Fo": + name = data + args = [] + elif directive == ".Fa": + args.append(shlex.split(data)[0]) + elif directive == ".Fc": + yield CFuncSpec(name, args) + + +@dataclass +class CFuncSpec: + name: str + arguments: List[str] + class MemoryRepository(Repository): def __init__(self) -> None: