Skip to content

Commit

Permalink
tracer: Move more code to pathlib and f-strings
Browse files Browse the repository at this point in the history
  • Loading branch information
oleavr committed Oct 15, 2024
1 parent 3a5996c commit 886b2d0
Showing 1 changed file with 28 additions and 32 deletions.
60 changes: 28 additions & 32 deletions frida_tools/tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import gzip
import http
import mimetypes
import os
import re
import shlex
import subprocess
Expand Down Expand Up @@ -275,17 +274,17 @@ def on_trace_events(self, raw_events) -> None:
self._last_event_tid = thread_id
self._print("%6d ms %s%s%s%s" % (timestamp, attributes, indent, message, no_attributes))

def on_trace_handler_create(self, target: TraceTarget, handler: str, source: str) -> None:
def on_trace_handler_create(self, target: TraceTarget, handler: str, source: Path) -> None:
self._register_handler(target, source)
if self._quiet:
return
self._print('%s: Auto-generated handler at "%s"' % (target, source.replace("\\", "\\\\")))
self._print(f'{target}: Auto-generated handler at "{source}"')

def on_trace_handler_load(self, target: TraceTarget, handler: str, source: str) -> None:
def on_trace_handler_load(self, target: TraceTarget, handler: str, source: Path) -> None:
self._register_handler(target, source)
if self._quiet:
return
self._print('%s: Loaded handler at "%s"' % (target, source.replace("\\", "\\\\")))
self._print(f'{target}: Loaded handler at "{source}"')

def _register_handler(self, target: TraceTarget, source: str) -> None:
config = {"capture_backtraces": False}
Expand Down Expand Up @@ -599,9 +598,8 @@ def on_update(target, handler, source) -> None:
)

ui.on_trace_progress("initializing")
data_dir = os.path.dirname(__file__)
with codecs.open(os.path.join(data_dir, "tracer_agent.js"), "r", "utf-8") as f:
source = f.read()
data_dir = Path(__file__).parent
source = (data_dir / "tracer_agent.js").read_text(encoding="utf-8")
script = session.create_script(name="tracer", source=source, runtime=runtime)

self._script = script
Expand Down Expand Up @@ -832,7 +830,7 @@ def _create_stub_native_handler(self, target: TraceTarget, decorate: bool) -> st

def _create_cstyle_logging_code(self, target: TraceTarget, decorate: bool) -> str:
if decorate:
module_string = " [%s]" % os.path.basename(target.scope)
module_string = f" [{Path(target.scope).name}]"
else:
module_string = ""

Expand Down Expand Up @@ -865,7 +863,7 @@ def objc_arg(m):

def _create_swift_logging_code(self, target: TraceTarget, decorate: bool) -> str:
if decorate:
module_string = " [%s]" % os.path.basename(target.scope)
module_string = f" [{Path(target.scope).name}]"
else:
module_string = ""
return "'%(name)s()%(module_string)s'" % {"name": target.name, "module_string": module_string}
Expand Down Expand Up @@ -1092,7 +1090,7 @@ def __init__(self, reactor: Reactor, decorate: bool) -> None:
self._handler_by_file = {}
self._changed_files = set()
self._last_change_id = 0
self._repo_dir = os.path.join(os.getcwd(), "__handlers__")
self._repo_dir = Path.cwd() / "__handlers__"
self._repo_monitors = {}
self._decorate = decorate

Expand All @@ -1116,24 +1114,19 @@ def ensure_handler(self, target: TraceTarget) -> str:

scope = target.scope
if len(scope) > 0:
handler_file = os.path.join(
self._repo_dir, to_filename(os.path.basename(scope)), to_handler_filename(target.name)
)
handler_file = self._repo_dir / to_filename(Path(scope).name) / to_handler_filename(target.name)
else:
handler_file = os.path.join(self._repo_dir, to_handler_filename(target.name))
handler_file = self._repo_dir / to_handler_filename(target.name)

if os.path.isfile(handler_file):
with codecs.open(handler_file, "r", "utf-8") as f:
handler = f.read()
if handler_file.is_file():
handler = self._load_handler(handler_file)
self._notify_load(target, handler, handler_file)

if handler is None:
handler = self._create_stub_handler(target, self._decorate)
handler_dir = os.path.dirname(handler_file)
if not os.path.isdir(handler_dir):
os.makedirs(handler_dir)
with codecs.open(handler_file, "w", "utf-8") as f:
f.write(handler)
handler_dir = handler_file.parent
handler_dir.mkdir(parents=True, exist_ok=True)
handler_file.write_text(handler, encoding="utf-8")
self._notify_create(target, handler, handler_file)

entry = (target, handler, handler_file)
Expand All @@ -1144,28 +1137,33 @@ def ensure_handler(self, target: TraceTarget) -> str:

return handler

def _load_handler(self, file: Path) -> None:
handler = file.read_text(encoding="utf-8")
return handler

def update_handler(self, target: TraceTarget, handler: str) -> None:
_, _, handler_file = self._handler_by_id.get(target.identifier)
entry = (target, handler, handler_file)
self._handler_by_id[target.identifier] = entry
self._handler_by_file[handler_file] = entry
self._notify_update(target, handler, handler_file)

Path(handler_file).write_text(handler, encoding="utf-8")
handler_file.write_text(handler, encoding="utf-8")

def _ensure_monitor(self, handler_file) -> None:
handler_dir = os.path.dirname(handler_file)
def _ensure_monitor(self, handler_file: Path) -> None:
handler_dir = handler_file.parent
monitor = self._repo_monitors.get(handler_dir)
if monitor is None:
monitor = frida.FileMonitor(handler_dir)
monitor = frida.FileMonitor(str(handler_dir))
monitor.on("change", self._on_change)
self._repo_monitors[handler_dir] = monitor

def commit_handlers(self) -> None:
for monitor in self._repo_monitors.values():
monitor.enable()

def _on_change(self, changed_file, other_file, event_type) -> None:
def _on_change(self, raw_changed_file: str, other_file: str, event_type: str) -> None:
changed_file = Path(raw_changed_file)
if changed_file not in self._handler_by_file or event_type == "changes-done-hint":
return
self._changed_files.add(changed_file)
Expand All @@ -1180,10 +1178,8 @@ def _sync_handlers(self, change_id) -> None:
self._changed_files.clear()
for changed_handler_file in changes:
(target, old_handler, handler_file) = self._handler_by_file[changed_handler_file]
with codecs.open(handler_file, "r", "utf-8") as f:
new_handler = f.read()
changed = new_handler != old_handler
if changed:
new_handler = self._load_handler(handler_file)
if new_handler != old_handler:
entry = (target, new_handler, handler_file)
self._handler_by_id[target.identifier] = entry
self._handler_by_file[handler_file] = entry
Expand Down

0 comments on commit 886b2d0

Please sign in to comment.