Skip to content

Commit

Permalink
[WIP] tracer: Optimize manpage lookups
Browse files Browse the repository at this point in the history
Co-authored-by: Håvard Sørbø <[email protected]>
  • Loading branch information
oleavr and hsorbo committed Sep 9, 2024
1 parent 704131c commit 9eb80ea
Showing 1 changed file with 199 additions and 86 deletions.
285 changes: 199 additions & 86 deletions frida_tools/tracer.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,24 @@
from __future__ import annotations

Check failure on line 1 in frida_tools/tracer.py

View workflow job for this annotation

GitHub Actions / isort

Imports are incorrectly sorted and/or formatted.
import argparse
import binascii
import codecs
from dataclasses import dataclass
import gzip
import os
from pathlib import Path
import platform
import re
import shlex
import subprocess
from typing import Callable, List, Optional, Union
from typing import Callable, Generator, List, Optional, Union

import frida

from frida_tools.reactor import Reactor

MANPAGE_CONTROL_CHARS = re.compile(r"\.[a-zA-Z]*(\s|$)|\s?\"")
MANPAGE_FUNCTION_PROTOTYPE = re.compile(r"([a-zA-Z_]\w+)\(([^\)]+)")


def main() -> None:
import json
Expand Down Expand Up @@ -476,6 +484,7 @@ def __init__(self) -> None:
self._on_load_callback: Optional[Callable[[TraceTarget, str, str], None]] = None
self._on_update_callback: Optional[Callable[[TraceTarget, str, str], None]] = None
self._decorate = False
self._manpages = None

def ensure_handler(self, target: TraceTarget):
raise NotImplementedError("not implemented")
Expand Down Expand Up @@ -512,93 +521,11 @@ def _create_stub_handler(self, target: TraceTarget, decorate: bool) -> str:

def _create_stub_native_handler(self, target: TraceTarget, decorate: bool) -> str:
if target.flavor == "objc":
state = {"index": 2}

def objc_arg(m):
index = state["index"]
r = ":${args[%d]} " % index
state["index"] = index + 1
return r

log_str = "`" + re.sub(r":", objc_arg, target.display_name) + "`"
if log_str.endswith("} ]`"):
log_str = log_str[:-3] + "]`"
log_str = self._create_objc_logging_code(target)
elif target.flavor == "swift":
if decorate:
module_string = " [%s]" % os.path.basename(target.scope)
else:
module_string = ""
log_str = "'%(name)s()%(module_string)s'" % {"name": target.name, "module_string": module_string}
log_str = self._create_swift_logging_code(target, decorate)
else:
for man_section in (2, 3):
args = []
try:
with open(os.devnull, "w") as devnull:
man_argv = ["man"]
if platform.system() != "Darwin":
man_argv.extend(["-E", "UTF-8"])
man_argv.extend(["-P", "col -b", str(man_section), target.name])
output = subprocess.check_output(man_argv, stderr=devnull)
match = re.search(
r"^SYNOPSIS(?:.|\n)*?((?:^.+$\n)* {5}\w+[ \*\n]*"
+ target.name
+ r"\((?:.+\,\s*?$\n)*?(?:.+\;$\n))(?:.|\n)*^DESCRIPTION",
output.decode("UTF-8", errors="replace"),
re.MULTILINE,
)
if match:
decl = match.group(1)

for argm in re.finditer(r"[\(,]\s*(.+?)\s*\b(\w+)(?=[,\)])", decl):
typ = argm.group(1)
arg = argm.group(2)
if arg == "void":
continue
if arg == "...":
args.append('", ..." +')
continue

read_ops = ""
annotate_pre = ""
annotate_post = ""

normalized_type = re.sub(r"\s+", "", typ)
if normalized_type.endswith("*restrict"):
normalized_type = normalized_type[:-8]
if normalized_type in ("char*", "constchar*"):
read_ops = ".readUtf8String()"
annotate_pre = '"'
annotate_post = '"'

arg_index = len(args)

args.append(
"%(arg_name)s=%(annotate_pre)s${args[%(arg_index)s]%(read_ops)s}%(annotate_post)s"
% {
"arg_name": arg,
"arg_index": arg_index,
"read_ops": read_ops,
"annotate_pre": annotate_pre,
"annotate_post": annotate_post,
}
)
break
except Exception:
pass

if decorate:
module_string = " [%s]" % os.path.basename(target.scope)
else:
module_string = ""

if len(args) == 0:
log_str = "'%(name)s()%(module_string)s'" % {"name": target.name, "module_string": module_string}
else:
log_str = "`%(name)s(%(args)s)%(module_string)s`" % {
"name": target.name,
"args": ", ".join(args),
"module_string": module_string,
}
log_str = self._create_cstyle_logging_code(target, decorate)

return """\
/*
Expand Down Expand Up @@ -644,6 +571,46 @@ def objc_arg(m):
"log_str": log_str,
}

def _create_cstyle_logging_code(self, target: TraceTarget, decorate: bool) -> str:
if decorate:
module_string = " [%s]" % os.path.basename(target.scope)
else:
module_string = ""

args = self._generate_cstyle_argument_logging_code(target)
if len(args) == 0:
code = "'%(name)s()%(module_string)s'" % {"name": target.name, "module_string": module_string}
else:
code = "`%(name)s(%(args)s)%(module_string)s`" % {
"name": target.name,
"args": ", ".join(args),
"module_string": module_string,
}

return code

def _create_objc_logging_code(self, target: TraceTarget) -> str:
state = {"index": 2}

def objc_arg(m):
index = state["index"]
r = ":${args[%d]} " % index
state["index"] = index + 1
return r

code = "`" + re.sub(r":", objc_arg, target.display_name) + "`"
if code.endswith("} ]`"):
code = code[:-3] + "]`"

return code

def _create_swift_logging_code(self, target: TraceTarget, decorate: bool) -> str:
if decorate:
module_string = " [%s]" % os.path.basename(target.scope)
else:
module_string = ""
return "'%(name)s()%(module_string)s'" % {"name": target.name, "module_string": module_string}

def _create_stub_java_handler(self, target: TraceTarget, decorate) -> str:
return """\
/*
Expand Down Expand Up @@ -685,6 +652,152 @@ def _create_stub_java_handler(self, target: TraceTarget, decorate) -> str:
"display_name": target.display_name
}

def _generate_cstyle_argument_logging_code(self, target: TraceTarget) -> List[str]:
if self._manpages is None:
self._manpages = {}
try:
manroots = [Path(d) for d in subprocess.run(["manpath"],
stdout=subprocess.PIPE,
encoding="utf-8",
check=True).stdout.strip().split(":")]
for section in (2, 3):
for manroot in manroots:
mandir = (manroot / f"man{section}")
if not mandir.exists():
continue
raw_section = str(section)
for entry in mandir.iterdir():
tokens = entry.name.split(".")
if len(tokens) < 2:
continue
if not tokens[1].startswith(raw_section):
continue
name = tokens[0]
if name in self._manpages:
continue
self._manpages[name] = (entry, section)
except Exception as e:
return []

man_entry = self._manpages.get(target.name)
if man_entry is None:
return []
man_location, man_section = man_entry

try:
args = []
cfunc = next(f for f in self._read_manpage(man_location) if f.name == target.name)
for arg in cfunc.arguments:
if arg == "void":
continue
if arg.startswith("..."):
args.append("...")
continue

tokens = arg.split(" ")

arg_type = "".join(tokens[:-1])

arg_name = tokens[-1]
if arg_name.startswith("*"):
arg_type += "*"
arg_name = arg_name[1:]
elif arg_name.endswith("]"):
arg_type += "*"
arg_name = arg_name[:arg_name.index("[")]

read_ops = ""
annotate_pre = ""
annotate_post = ""

if arg_type.endswith("*restrict"):
arg_type = arg_type[:-8]
if arg_type in ("char*", "constchar*"):
read_ops = ".readUtf8String()"
annotate_pre = '"'
annotate_post = '"'

arg_index = len(args)

args.append(
"%(arg_name)s=%(annotate_pre)s${args[%(arg_index)s]%(read_ops)s}%(annotate_post)s"
% {
"arg_name": arg_name,
"arg_index": arg_index,
"read_ops": read_ops,
"annotate_pre": annotate_pre,
"annotate_post": annotate_post,
}
)
return args
except Exception:
import traceback
traceback.print_exc()
return []

def _read_manpage(self, man_location: Path) -> Generator[CFuncSpec]:
if man_location.suffix == ".gz":
man_file = gzip.open(man_location, "rt", encoding="utf-8", errors="replace")
else:
man_file = open(man_location, "r", encoding="utf-8", errors="replace")
with man_file:
man_data = man_file.read()

manpage_format = "gnu"
synopsis_lines = []
found_synopsis = False
for raw_line in man_data.split("\n"):
line = raw_line.strip()
if line.startswith(".so "):
redirected_location = man_location.parent.parent / Path(line[4:])
if not redirected_location.exists():
redirected_location = redirected_location.parent / (redirected_location.name + ".gz")
yield from self._read_manpage(redirected_location)
return
if not found_synopsis and "SYNOPSIS" in line:
found_synopsis = True
continue
elif found_synopsis and line.endswith("DESCRIPTION"):
break
elif not found_synopsis:
continue
if line.startswith(".Fn ") or line.startswith(".Fo "):
manpage_format = "bsd"
synopsis_lines.append(line)

if manpage_format == "gnu":
raw_synopsis = "\n".join(synopsis_lines)
synopsis = MANPAGE_CONTROL_CHARS.sub("", raw_synopsis).replace("\n", " ").replace(" [", "[").replace(" ]", "]")

for m in MANPAGE_FUNCTION_PROTOTYPE.finditer(synopsis):
name = protom.group(1)
signature = protom.group(2)
args = [a.strip() for a in signature.split(",")]
yield CFuncSpec(name, args)
else:
name = None
args = None
for line in synopsis_lines:
tokens = line.split(" ", maxsplit=1)
directive = tokens[0]
data = tokens[1] if len(tokens) == 2 else None
if directive == ".Fn":
argv = shlex.split(data)
yield CFuncSpec(argv[0], argv[1:])
elif directive == ".Fo":
name = data
args = []
elif directive == ".Fa":
args.append(shlex.split(data)[0])
elif directive == ".Fc":
yield CFuncSpec(name, args)


@dataclass
class CFuncSpec:
name: str
arguments: List[str]


class MemoryRepository(Repository):
def __init__(self) -> None:
Expand Down

0 comments on commit 9eb80ea

Please sign in to comment.