-
Notifications
You must be signed in to change notification settings - Fork 23
/
xform_cfg.py
660 lines (530 loc) · 21.3 KB
/
xform_cfg.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
# Copyright (c) 2015-2018 Paul Sokolovsky
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Transformation passes on CFG"""
import logging
from collections import defaultdict
from core import *
from cfgutils import *
from dce import *
from xform_expr import *
from xform_inst import *
from xform_bblock import *
from utils import set_union
import arch
import progdb
import copy
_log = logging.getLogger(__name__)
def check_prop(cfg, prop_name, err_msg):
entry_addr = cfg.entry()
if prop_name not in cfg[entry_addr]:
assert 0, err_msg
def find_1st_def(cfg, reg, in_bblock=False):
if in_bblock:
for n, info in cfg.nodes_props():
if reg in info["val"].defs():
return info["val"].addr
else:
raise NotImplementedError
def number_postorder(cfg):
cfg.number_postorder()
def number_postorder_from_exit(cfg):
cfg.number_postorder_from_exit("_EXIT_")
def remove_trailing_jumps(cfg):
remove_returns = False
exits = cfg.exits()
if len(exits) == 1:
remove_returns = True
foreach_bblock(cfg, remove_trailing_jumps_bblock, remove_returns=remove_returns)
cfg.props["trailing_jumps"] = False
def remove_unreachable_entries(cfg):
# Remove disconnected graph nodes.
entries = cfg.entries()
if len(entries) == 1:
return
for e in entries:
if e == cfg.props["addr"]:
continue
def remove_component(e):
if cfg.pred(e):
return
succ = cfg.succ(e)
try:
cfg.remove_node(e)
except KeyError:
# Already removed
pass
for e in succ:
remove_component(e)
remove_component(e)
def remove_unreachable_nodes(cfg):
"Remove CFG nodes not reachable from entry."
assert "postno" in cfg[cfg.first_node], "Need number_postorder"
for node, info in list(cfg.nodes_props()):
if info["postno"] is None:
cfg.remove_node(node)
# Remove any jumps to jumps, replacing destination of first jump
# to be destination of 2nd.
# This "simplifies" graph, but may make it less structured. This is useful
# transformation for generating machine code, but for decompilation,
# it actually makes sense to add some extra jump landing sites to make
# loops more structured.
#
# http://en.wikipedia.org/wiki/Jump_threading
#
def remove_jump_over_jump(cfg):
for v in cfg.nodes():
# If node is not entry, has a single exit and empty
if cfg.degree_in(v) > 0 and cfg.degree_out(v) == 1 and not cfg.node(v)["val"].items:
cfg.move_pred(v, cfg.succ(v)[0])
cfg.remove_node(v)
_log.info("jump_over_jump: removed node: %s", v)
return True
# If possible, make a single back-edge to a loop header, by introducing
# intermediate jump landing nodes.
def loop_single_entry(cfg):
for v in cfg.nodes():
if cfg.degree_in(v) >= 2:
preds = cfg.pred(v)
back_preds = list(filter(lambda x: v <= x, preds))
if len(back_preds) < 2:
continue
_log.info("loop_single_entry: node: %s", v)
_log.info("back_preds: %s", back_preds)
back_jumps = list(filter(lambda x: cfg.degree_out(x) == 1, back_preds))
_log.info("back_jumps: %s", back_jumps)
# find existing landing site
landing_site = None
for p in back_jumps:
b = cfg.node(p)["val"]
if not b.items:
landing_site = p
if not landing_site:
farthest = max(back_preds)
_log.info("farthest: %s", farthest)
newb = BBlock(farthest + "_1")
cfg.add_node(newb.addr, val=newb)
cfg.add_edge(newb.addr, v)
landing_site = newb.addr
_log.info("landing_site: %s", landing_site)
for p in back_preds:
if p != landing_site:
e = cfg.edge(p, v).get("cond")
cfg.remove_edge(p, v)
cfg.add_edge(p, landing_site, cond=e)
return True
def cfg_single_entry(cfg):
first = cfg.first_node
if cfg.pred(first):
# First (== entry) node has a backedge
entryb = BBlock(".ENTRY")
entryb.cfg = cfg
cfg.add_node(entryb.addr, val=entryb)
cfg.add_edge(entryb.addr, first)
# Can still have multiple entries at this point
assert len(cfg.entries()) >= 1
# Unconditionally add a new empty entry node, to hold anything needed later
def cfg_preheader(cfg):
first = cfg.first_node
if 1: #cfg.pred(first):
entryb = BBlock(".ENTRY")
entryb.cfg = cfg
cfg.add_node(entryb.addr, val=entryb)
cfg.add_edge(entryb.addr, first)
cfg.first_node = entryb.addr
# Can still have multiple entries at this point
assert len(cfg.entries()) >= 1
# Make sure that CFG has a single exit, as required for some algorithms.
# Note that this doesn't do anything to former individual exit BBlocks,
# so they likely still end with "return" instructions.
def cfg_single_exit(cfg):
exits = cfg.exits()
exitb = BBlock("_EXIT_")
exitb.cfg = cfg
exitb.add(Inst(None, "return", [], addr=exitb.addr))
cfg.add_node(exitb.addr, val=exitb)
for e in exits:
cfg.add_edge(e, exitb.addr)
if not exits:
# Infinite loop
cfg.props["noreturn"] = True
# This pass finds infinite loops, and adds virtual exit edges from
# them (effectively turning infinite loops into "do {...} while (1)"
# loops) to a special "_DEADEND_" node, which in turn is connected to
# the single exit node. Using intermediate deadend node will allow
# later to remove these virtual edges easily, and also to have a
# single adhoc rule for live var analysis, that live-out of DEADEND
# is empty.
def cfg_infloops_exit(cfg):
deadend_nodes = []
for addr, info in cfg.nodes_props():
# We're interested only in nodes unreachable from exit
if info["postno_exit"]:
continue
succ = cfg.succ(addr)
if not succ:
continue
my_postno = info["postno"]
deadend = True
for s in succ:
if cfg[s]["postno"] < my_postno:
deadend = False
break
if deadend:
#print("deadend:", addr)
deadend_nodes.append(addr)
if deadend_nodes:
if not cfg.props.get("noreturn"):
cfg.props["has_infloops"] = True
# Add intermediate node, so later we can remove it, and all
# extra edges are gone
bb = BBlock("_DEADEND_")
bb.cfg = cfg
cfg.add_node("_DEADEND_", val=bb)
cfg.add_edge("_DEADEND_", "_EXIT_", cond=VALUE(0, 10))
for d in deadend_nodes:
cfg.add_edge(d, "_DEADEND_", cond=VALUE(0, 10))
# Graph was modified
return True
def split_bblock(cfg, n, suffix, only_non_empty=False, before=False):
# If a node is non-empty bblock, splits it in two, with 2nd one being
# empty, and having all out edges, and returns this 2nd one. If bblock
# is already empty, returns it directly.
if only_non_empty and not cfg[n]["val"].items:
return n
new_addr = n + suffix
new_bb = BBlock(new_addr)
cfg.add_node(new_addr, val=new_bb)
cfg.move_succ(n, new_addr)
cfg.add_edge(n, new_addr)
# Too keep addresses lexicographically sorted, we now just move insts
# to the new bblock, leaving original bblock empty.
if before:
new_bb.items = cfg[n]["val"].items
cfg[n]["val"].items = []
return new_addr
def split_crit_nodes(g):
"""Split "critical nodes".
Critical nodes are those which have more than one predecessor and more than
successor. We split them so original node becomes empty, and add new which
contains bblock content (this way it's compatible with trailing_jumps=True
CFGs).
"""
crit_nodes = [n for n in g.nodes() if g.degree_in(n) > 1 and g.degree_out(n) > 1]
for n in crit_nodes:
split_bblock(g, n, ".critn", before=True)
def split_crit_edges(g):
to_split = []
for from_n, to_n in g.edges():
if g.degree_out(from_n) > 1 and g.degree_in(to_n) > 1:
to_split.append((from_n, to_n))
for from_n, to_n in to_split:
new_addr = from_n + ".crite"
g.add_node(new_addr, val=BBlock(new_addr))
attrs = g[(from_n, to_n)]
g.remove_edge(from_n, to_n)
g.add_edge(from_n, new_addr, **attrs)
g.add_edge(new_addr, to_n)
def collect_state_in(cfg):
# This is pretty backwards actually. It uses ReachDef information,
# but post-processes it pretty heavily, and instead should be done
# as a dataflow analysis itself.
changed = False
for bblock_addr, node_props in cfg.iter_sorted_nodes():
org_state = node_props["val"].props.get("state_in", {})
state = org_state.copy()
by_var = defaultdict(set)
for reg, from_bblock in node_props["reachdef_in"]:
by_var[reg].add(from_bblock)
#print(bblock_addr, by_var)
for var, from_bblocks in by_var.items():
val_set = set()
for bb in from_bblocks:
val = None
if bb is not None:
val = cfg[bb]["val"].props["state_out"].get(var)
if val is None:
val_set = set()
break
val_set.add(val)
if len(val_set) == 1:
state[var] = val_set.pop()
elif len(val_set) > 1:
_log.debug("%s: in value set for %s are: %s" % (bblock_addr, var, val_set))
if state != org_state:
_log.debug("CHANGED: %s: %r ==VS== %r" % (node_props["val"], org_state, state))
node_props["val"].props["state_in"] = state
changed = True
return changed
def collect_cond_out(cfg):
"""Collect known conditions (constraints) on registers at the
exit (and entry) of basic blocks."""
# Just as collect_state_in, this implements adhoc partial dataflow
# analysis as needed for trivial cases of jumptable analysis.
# Rewrite to use real dataflow framework.
for bblock_addr, node_props in cfg.nodes_props():
succ = cfg.succ(bblock_addr)
if len(succ) < 2:
continue
last_cond = None
for s in succ:
cond = cfg.edge(bblock_addr, s).get("cond")
if cond is None:
cond = last_cond.neg()
# As cond's are updated in-place (later), make a copy
cond = copy.copy(cond)
succ_bblock = cfg[s]["val"]
succ_bblock.props.setdefault("cond_in", set()).add((bblock_addr, cond))
last_cond = cond
for bblock_addr, node_props in cfg.nodes_props():
bblock = node_props["val"]
cond_in = bblock.props.get("cond_in")
if not cond_in:
continue
# Less conditions than input edges, we definitely don't meet
# over all paths.
if len(cond_in) != len(cfg.pred(bblock_addr)):
del bblock.props["cond_in"]
continue
# This assumes each predecessor contributes only one condition (i.e.
# conditions are compounded in complex conditions if needed).
met_conds = {cond[1] for cond in cond_in}
if len(met_conds) != 1:
del bblock.props["cond_in"]
continue
bblock.props["cond_in"] = met_conds
for bblock_addr, node_props in cfg.nodes_props():
bblock = node_props["val"]
cond_in = bblock.props.get("cond_in")
if not cond_in:
continue
reachdef_gen_regs = {r[0] for r in node_props["reachdef_gen"]}
cond_out = set()
for c in cond_in:
killed = False
for r in c.regs():
if r in reachdef_gen_regs:
killed = True
if not killed:
cond_out.add(c)
bblock.props["cond_out"] = cond_out
def propagate(cfg, bblock_propagator):
check_prop(cfg, "reachdef_in", "This pass requires reaching defs information")
while True:
foreach_bblock(cfg, lambda bb: bblock_propagator(bb, False))
if not collect_state_in(cfg):
break
foreach_bblock(cfg, lambda bb: bblock_propagator(bb, True))
def const_propagation(cfg):
propagate(cfg, bblock_const_propagation)
def copy_propagation(cfg):
propagate(cfg, bblock_copy_propagation)
def mem_propagation(cfg):
propagate(cfg, bblock_mem_propagation)
def expr_propagation(cfg):
propagate(cfg, bblock_expr_propagation)
def insert_sp0(cfg):
entry_addr = cfg.entry()
first_bblock = cfg[entry_addr]["val"]
first_bblock.items.insert(0, Inst(REG("sp"), "=", [REG("sp_0")], addr=entry_addr + ".init0"))
# Generalization of insert_sp0
# For each register live on entry to function, insert to function pre-header
# assignment of $r = $r_0, where $r_0 represents an input register, whereas $r
# is a work register. Overall, this implements poor-man's (but very practical)
# SSA subset.
# Requires analyze_live_vars
def insert_initial_regs(cfg):
entry_addr = cfg.entry()
# used_regs = reversed(sorted([x[0] for x in cfg[entry_addr]["reachdef_in"]]))
check_prop(cfg, "live_in", "This pass requires live variable information")
used_regs = cfg[entry_addr]["live_in"]
first_bblock = cfg[entry_addr]["val"]
for i, r in enumerate(sorted(list(used_regs))):
first_bblock.items.insert(i, Inst(r, "=", [REG(r.name + "_0")], addr=entry_addr + ".init%d" % i))
def insert_params(cfg):
entry_addr = cfg.entry()
first_bblock = cfg[entry_addr]["val"]
for i, reg in enumerate(sorted(list(arch.call_params(cfg.props["name"])))):
first_bblock.items.insert(i, Inst(reg, "=", [REG("arg_" + reg.name)], addr=entry_addr + ".arg%d" % i))
# Requires insert_initial_regs pass followed by expr_propagation passes
# (normal, then stack vars propagation), and should be run before DCE.
def collect_preserveds(cfg):
exit_addr = cfg.exit()
exit_bblock = cfg[exit_addr]["val"]
state_out = exit_bblock.props["state_out"]
preserveds = set()
for k, v in state_out.items():
if is_reg(k) and is_reg(v):
if v.name == k.name + "_0":
preserveds.add(k)
progdb.update_cfg_prop(cfg, "preserveds", preserveds)
# Requires expr_propagation
def collect_calls(cfg):
calls = []
calls_indir = []
def collect(inst):
if inst.op == "call":
arg = inst.args[0]
if is_addr(arg):
calls.append(arg)
else:
calls_indir.append(arg)
foreach_inst(cfg, collect)
cfg.props["calls"] = calls
if calls_indir:
cfg.props["calls_indir"] = calls_indir
# While collect_calls collects direct calls, this pass
# collects function references by address. Requires
# funcdb to know what address is a function.
def collect_func_refs(cfg):
refs = []
def collect(inst):
if inst.op == "=":
arg = inst.args[0]
if is_addr(arg):
import progdb
if arg.addr in progdb.FUNC_DB:
refs.append(arg)
foreach_inst(cfg, collect)
cfg.props["func_refs"] = refs
def collect_mem_refs(cfg, pred, prop_name="mem_refs"):
"""Collect references to non-symbolic memory address (ones represented
by VALUE objects). This is useful e.g. to see which function accesses
which MMIO addresses. Takes a predicate telling which addresses should
be captured and property name to store summary.
"""
refs = []
def collect(inst):
def collect_mmio(expr):
if expr and is_mem(expr):
mem = expr.expr
if is_value(mem) and pred(mem.val):
refs.append(mem)
elif is_expr(mem):
if mem.op != "+":
#print(mem.op, expr)
return
# TODO: This means that MMIO accessed as array
for a in mem.args:
if is_value(a) and pred(a.val):
#refs.append(a)
refs.append(mem)
break
inst.foreach_subexpr(collect_mmio)
foreach_inst(cfg, collect)
if refs:
#cfg.props[prop_name] = refs
cfg.props[prop_name] = sorted(list(set(refs)))
def local_defines(cfg):
"""Locations defined by function's own instructions, disregarding
calls to other functions. This may be useful e.g. to assess
preserveds."""
defs = set()
def func(inst):
if inst.op != "call":
defs.update(inst.defs())
foreach_inst(cfg, func)
cfg.props["local_defines"] = defs
return defs
def collect_reach_exit(cfg):
all_defs = foreach_bblock(cfg, lambda b: b.defs(True), set_union)
exit = cfg.exit()
if "reachdef_out" in cfg.node(exit):
all_defs2 = set(x[0] for x in cfg.node(exit)["reachdef_out"] if x[1] is not None)
if "_DEADEND_" not in cfg:
assert all_defs == all_defs2, "%r vs %r" % (all_defs, all_defs2)
all_defs = all_defs2
progdb.update_cfg_prop(cfg, "reach_exit", all_defs)
# Collect registers which may be either defined or not on the exit.
# These registers often represent parasitic arguments (as they
# may be either modified or not, the way to return the original value
# of the reg is take it as a param).
# Requires reachdef on raw CFG (before insert_initial_regs).
def collect_reach_exit_maybe(cfg):
exit = cfg.exit()
vardict = {}
for var, addr in cfg.node(exit)["reachdef_out"]:
vardict.setdefault(var, set()).add(addr)
mod_maybe = set()
for var, addrs in vardict.items():
if len(addrs) > 1 and None in addrs:
mod_maybe.add(var)
if 1: #mod_maybe or "reach_exit_maybe" in cfg.props:
progdb.update_cfg_prop(cfg, "reach_exit_maybe", mod_maybe)
import dataflow
def analyze_live_vars(cfg, **kwargs):
ana = dataflow.LiveVarAnalysis(cfg, **kwargs)
ana.solve()
def analyze_reach_defs(cfg):
ana = dataflow.ReachDefAnalysis(cfg)
ana.solve()
def analyze_dom(cfg):
ana = dataflow.DominatorAnalysis(cfg)
ana.solve()
# Regs in live_in set on function entry are estimated params
# (may miss something, e.g. if value in a reg is passed thru
# (without referencing it) to another function).
def estimate_params(cfg):
#ana = dataflow.LiveVarAnalysis(cfg, skip_calls=True)
#ana.solve()
check_prop(cfg, "live_in", "This pass requires live variable information")
func_addr = cfg.entry()
assert func_addr == ".ENTRY", "cfg_preheader pass required"
real_entry = cfg.succ(func_addr)
assert len(real_entry) == 1
real_entry = real_entry[0]
e = cfg[real_entry]
args = set(REG(r.name[:-2] if r.name.endswith("_0") else r.name) for r in e["live_in"])
args -= set([REG("sp")])
progdb.update_cfg_prop(cfg, "estimated_params", args)
# Precisely compute func params
def collect_params(cfg):
func_addr = cfg.entry()
e = cfg[func_addr]
args = set(REG(r.name[:-2] if r.name.endswith("_0") else r.name) for r in e["live_in"])
args = arch.param_filter(args)
progdb.update_cfg_prop(cfg, "params", args)
# Kind of "AI" telling why this or that reg has got into "params" list.
# The explanations are approximate, e.g. a register may be "modified only
# along some paths" (especially if overestimated as modified in the
# presense of unknown functions), and still be a genuine param.
def annotate_params(cfg):
res = {}
for reg in cfg.props["params"]:
if reg in cfg.props.get("estimated_params", ()):
res[reg] = "100% genuine param"
elif reg == REG("sp"):
res[reg] = "Param because address of object on stack is taken"
elif reg in cfg.props.get("reach_exit_maybe", ()):
res[reg] = "Param because modified only along some paths (and maybe param to some callee)"
else:
res[reg] = "Likely param passed down to some callee"
cfg.props["params_why"] = res
#progdb.update_cfg_prop(cfg, "params_why", res)
# Collect regs which are live after each function call within current
# function. Triples of (bblock_addr, func, live_out) are stored in CFG's
# "calls_live_out" property (to be later further unioned and stored in
# funcdb). This corresponds to Van Emmerik's callsite use collector.
def collect_call_live_out(cfg):
calls_live_out = []
def collect(node):
bb = node["val"]
if bb.items and bb[-1].op == "call":
inst = bb[-1]
arg = inst.args[0]
# TODO: Perhaps filter in the real regs?
regs = {r for r in node["live_out"] if not r.name.endswith("_0")}
calls_live_out.append((bb.addr, arg, regs))
foreach_node(cfg, collect)
progdb.update_cfg_prop(cfg, "calls_live_out", calls_live_out)