-
-
Notifications
You must be signed in to change notification settings - Fork 250
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Co-authored-by: Håvard Sørbø <[email protected]>
- Loading branch information
Showing
3 changed files
with
182 additions
and
16 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,29 +1,80 @@ | ||
import frida | ||
from frida_tools.application import Reactor | ||
from pathlib import Path | ||
import subprocess | ||
import sys | ||
import threading | ||
import time | ||
|
||
|
||
def on_message(message, data): | ||
print("on_message:", message) | ||
class Controller: | ||
def __init__(self): | ||
self._stop_requested = threading.Event() | ||
self._reactor = Reactor(run_until_return=lambda reactor: self._stop_requested.wait()) | ||
|
||
runner_src_dir = Path(__file__).parent | ||
self._runner_js = runner_src_dir / "runner.js" | ||
self._runner_dylib = runner_src_dir.parent.parent.parent.parent / "build" / "tmp-macos-arm64" / "frida-gum" / "tests" / "core" / "swiftapiresolver" / "libtestswiftapiresolver.dylib" | ||
|
||
runner_src_dir = Path(__file__).parent | ||
runner_path = runner_src_dir.parent.parent.parent.parent / "build" / "tmp-macos-arm64" / "frida-gum" / "tests" / "core" / "swiftapiresolver" / "libtestswiftapiresolver.dylib" | ||
self._device = None | ||
self._session = None | ||
self._script = None | ||
|
||
device = frida.get_remote_device() | ||
def run(self): | ||
self._reactor.schedule(lambda: self._start()) | ||
self._reactor.run() | ||
|
||
session = device.attach("Xcode") | ||
def _start(self): | ||
device = frida.get_remote_device() | ||
self._device = device | ||
|
||
script = session.create_script((runner_src_dir / "runner.js").read_text(encoding="utf-8")) | ||
script.on("message", on_message) | ||
script.load() | ||
session = device.attach("Xcode") | ||
session.on("detached", lambda reason: self._reactor.schedule(lambda: self._on_detached(reason))) | ||
self._session = session | ||
|
||
script.post({ "type": "start" }, runner_path.read_bytes()) | ||
script = session.create_script(self._runner_js.read_text(encoding="utf-8")) | ||
script.on("message", lambda message, data: self._reactor.schedule(lambda: self._on_message(message, data))) | ||
script.load() | ||
self._script = script | ||
|
||
print("Running...") | ||
t1 = time.time() | ||
num_matches = script.exports_sync.run("*!*") | ||
t2 = time.time() | ||
duration = int((t2 - t1) * 1000) | ||
print(f"Got {num_matches} matches in {duration} ms.") | ||
script.post({ "type": "start" }, self._runner_dylib.read_bytes()) | ||
|
||
worker = threading.Thread(target=self._run_tests) | ||
worker.start() | ||
|
||
def _run_tests(self): | ||
print("Running...") | ||
t1 = time.time() | ||
num_matches = self._script.exports_sync.run("*!*") | ||
t2 = time.time() | ||
duration = int((t2 - t1) * 1000) | ||
print(f"Got {num_matches} matches in {duration} ms.") | ||
self._stop_requested.set() | ||
|
||
def _on_detached(self, reason): | ||
print(f"⚡ detached: reason='{reason}'") | ||
self._script = None | ||
self._session = None | ||
self._stop_requested.set() | ||
|
||
def _on_message(self, message, data): | ||
handled = False | ||
if message["type"] == "send": | ||
payload = message["payload"] | ||
if payload["type"] == "ready": | ||
self._on_ready(payload["symbols"]) | ||
handled = True | ||
if not handled: | ||
print(f"⚡ message: payload={message['payload']}") | ||
|
||
def _on_ready(self, symbols): | ||
for line in subprocess.run(["nm", self._runner_dylib], capture_output=True, encoding="utf-8").stdout.split("\n"): | ||
if line.endswith(" T _init"): | ||
tokens = line.split(" ") | ||
init_rva = int(tokens[0], 16) | ||
runner_base = int(symbols["init"], 16) - init_rva | ||
print(f"Runner is loaded at 0x{runner_base:x}") | ||
|
||
|
||
controller = Controller() | ||
controller.run() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,114 @@ | ||
#!/usr/bin/env python3 | ||
|
||
import argparse | ||
from collections.abc import Iterable, Mapping | ||
from dataclasses import dataclass | ||
from pathlib import Path | ||
import re | ||
import subprocess | ||
|
||
|
||
RAW_ADDRESS_PATTERN = re.compile(r"\b(0x[0-9a-f]+)\b") | ||
|
||
@dataclass | ||
class DeclaredModule: | ||
path: Path | ||
start: int | ||
end: int | ||
|
||
def __hash__(self): | ||
return self.path.__hash__() | ||
|
||
PendingAddresses = Mapping[DeclaredModule, set[int]] | ||
|
||
|
||
def main(): | ||
parser = argparse.ArgumentParser(description="Symbolicate stack traces.") | ||
parser.add_argument("--input", dest="input", required=True, | ||
help="the file to symbolicate") | ||
parser.add_argument("--output", dest="output", required=True, | ||
help="where the symbolicated file will be written") | ||
parser.add_argument("--declare-module", dest="modules", required=True, action="append", | ||
help="declare a module at path:base") | ||
args = parser.parse_args() | ||
|
||
modules = [] | ||
for mod in args.modules: | ||
raw_path, raw_base = mod.split(":", maxsplit=1) | ||
path = Path(raw_path) | ||
base = int(raw_base, 16) | ||
size = compute_module_size(path) | ||
modules.append(DeclaredModule(path, base, base + size)) | ||
|
||
with Path(args.input).open(encoding="utf-8") as input_file: | ||
addresses = compute_pending_addresses(input_file, modules) | ||
|
||
symbols = symbolicate_pending_addresses(addresses) | ||
|
||
def symbolicate(m): | ||
raw_address = m.group(1) | ||
address = int(raw_address, 16) | ||
|
||
name = symbols.get(address, None) | ||
if name is not None: | ||
return name | ||
|
||
return raw_address | ||
|
||
with Path(args.input).open(encoding="utf-8") as input_file, \ | ||
Path(args.output).open("w", encoding="utf-8") as output_file: | ||
for line_raw in input_file: | ||
line_symbolicated = RAW_ADDRESS_PATTERN.sub(symbolicate, line_raw) | ||
output_file.write(line_symbolicated) | ||
|
||
|
||
def compute_pending_addresses(data: Iterable[str], modules: Iterable[DeclaredModule]) -> PendingAddresses: | ||
addresses = {} | ||
for raw_line in data: | ||
for match in RAW_ADDRESS_PATTERN.finditer(raw_line): | ||
address = int(match.group(1), 16) | ||
module = find_declared_module_by_address(address, modules) | ||
if module is not None: | ||
pending = addresses.get(module, None) | ||
if pending is None: | ||
pending = set() | ||
addresses[module] = pending | ||
pending.add(address) | ||
return addresses | ||
|
||
|
||
def symbolicate_pending_addresses(addresses: PendingAddresses) -> Mapping[int, str]: | ||
symbols = {} | ||
for module, pending in addresses.items(): | ||
pending = list(pending) | ||
pending.sort() | ||
query = subprocess.run([ | ||
"atos", | ||
"-o", module.path, | ||
"-l", hex(module.start), | ||
] + [hex(address) for address in pending], | ||
capture_output=True, | ||
encoding="utf-8", | ||
check=True) | ||
symbols.update(dict(zip(pending, query.stdout.split("\n")))) | ||
return symbols | ||
|
||
|
||
def find_declared_module_by_address(address, modules): | ||
for m in modules: | ||
if address >= m.start and address < m.end: | ||
return m | ||
return None | ||
|
||
|
||
def compute_module_size(path: Path) -> int: | ||
for raw_line in subprocess.run(["otool", "-l", path], capture_output=True, encoding="utf-8").stdout.split("\n"): | ||
line = raw_line.lstrip() | ||
if line.startswith("vmsize"): | ||
tokens = line.split(" ", maxsplit=1) | ||
return int(tokens[1], 16) | ||
assert False | ||
|
||
|
||
if __name__ == "__main__": | ||
main() |