diff --git a/README.rst b/README.rst
index 53fa9c2..a5646e6 100644
--- a/README.rst
+++ b/README.rst
@@ -1,14 +1,99 @@
-*Note: the branch ``post-dominator-refactor`` branch README.rst https://github.com/rocky/python-control-flow/blob/post-dominator-refactor/README.rst has the more information.*
+Introduction
+------------
-This is a Toolkit for getting control flow informaion from Python bytecode
+This is a Toolkit for getting control flow information from Python bytecode.
Specifically:
-* creates basic blocks from Python bytecode
-* creates control-flow graph from the basic blocks
-* creates a dominator tree
-* Graphs via dot the control-flow graph and dominator tree
+* Creates basic blocks from Python bytecode.
+* Creates control-flow graph from the basic blocks.
+* Creates dominator trees and dominator regions for the control flow.
+* Graphs via `dot `_ the control-flow graph and dominator tree.
-I've used some routines from Romain Gaucher's equip as a starting point.
-equip is (c) 2014 by Romain Gaucher
+I've used some routines from Romain Gaucher's `equip `_ as a starting point.
+
+Example
+-------
+
+For now the Python in ``test/test_bb2.py`` show what's up the best.
+
+Consider this simple Python program taken from my `BlackHat Asia 2024 talk `_:
+
+.. code-block:: python
+
+ # Program to count the number of bits in the integer 6.
+ i: int = 6
+ zero_bits = 0
+ one_bits = 0
+ while i > 0: # loop point
+ # loop alternative
+ if i % 0:
+ # first alternative
+ one_bits += 1
+ else:
+ # second alternative
+ zero_bits += 1
+ # join point
+ i << 1
+ # loop-end join point
+
+You can find this byte-compiled to Python 3.8 bytecode in `doc-example/count-bits.cpython-38.pyc `_.
+We can get control flow information using for this program using::
+
+ python ./test/test-bb2.py doc-example/count-bits.cpython-38.pyc
+
+After running, in ``/tmp`` you'll find some ``.dot`` files and some ``.png`` images generated for the main routine.
+
+``flow-3.8--count-bits.cpython-38-module.png`` is a PNG image for the control flow.
+
+.. image:: doc-example/flow-3.8--count-bits.cpython-38-module.png
+
+Here is what the colors on the arrows indicate:
+
+red
+ the first alternative of a group of two alternatives
+
+blue
+ the second alternative of a group of two alternatives
+
+green
+ a looping (backwards) jump
+
+Here is what the line styles on the arrows indicate:
+
+solid
+ an unconditional (and forward) jump
+
+dashed
+ the fallthough path of a conditional jump
+
+dotted
+ the jump path of a conditional jump
+
+If there is no arrow head on an arrow, then the block follows the
+previous block in the bytecode although there is not control flow to
+it. We aligng blocks linarly using the offset addresses. You can find
+the offset ranges listed inside the block. The entry block has is
+marked with an additional border. We also show the basic block number
+and block flags.
+
+Control-Flow with Dominator Regions
++++++++++++++++++++++++++++++++++++
+
+In addition to the basic control flow, we also mark and color boxes with dominator regions.
+
+.. image:: doc-example/flow+dom-3.8--count-bits.cpython-38-module.png
+
+
+Regions with the the same nesting level have the same color. So Basic blocks 3 and 7 are at the same nesting level. Blocks 4 and 5 are at the same nesting level and are the same color. However even though Block 6 is the same color it is not at the same nesting level, although it *is* inside the same dominator region.
+
+Colors get darker as the region is more nested.
+
+Here the additional border indicates that a block is part of some non-trivial dominator region. (A "trivial" dominator region is where the block just dominates itself.)
+
+In addition, if a jump or fallthough jumps out of its dominator region that is shown in brown. If any basic block is jumped to using a jump-out (or end scope) kind of edge, then the box has a brown outline.
+
+Inside the block text we now add the dominator region number of for a block in parenthesis. For example Basic blocks, 4 and 5 are in dominator region 3 and so are marked "(3)" after their basic block number. The dominator number for a basic block is the same as its basic block number. So Basic Block 3 is also Dominator Region 3.
+
+Note that even though basic blocks 4 and 5 are at the same indentation level, they are in different *scopes* under basic block 3.
diff --git a/control_flow/augment_disasm.py b/control_flow/augment_disasm.py
index 6453b14..a785a5e 100644
--- a/control_flow/augment_disasm.py
+++ b/control_flow/augment_disasm.py
@@ -19,7 +19,14 @@
from control_flow.bb import BBMgr, BasicBlock
from control_flow.cfg import ControlFlowGraph
-from control_flow.graph import Node, BB_FOR, BB_LOOP, BB_NOFOLLOW
+from control_flow.graph import (
+ Node,
+ BB_FOR,
+ BB_JOIN_POINT,
+ BB_LOOP,
+ BB_NOFOLLOW,
+ ScopeEdgeKind,
+)
class JumpTarget(IntEnum):
@@ -210,6 +217,18 @@ class _ExtendedInstruction(NamedTuple):
dominator: Optional[Node] = None
+EXTENDED_OPMAP = {
+ "BB_END": 1001,
+ "BB_START": 1002,
+ "BREAK_FOR": 1003,
+ "BREAK_LOOP": 1004,
+ "BLOCK_END_FALLTHROUGH_JOIN": 1005,
+ "BLOCK_END_JUMP_JOIN": 1006,
+ "JUMP_FOR": 1007,
+ "JUMP_LOOP": 1008,
+}
+
+
class ExtendedInstruction(_ExtendedInstruction, Instruction):
"""Details for an extended bytecode operation
@@ -372,10 +391,11 @@ def augment_instructions(
"""Augment instructions in fn_or_code with dominator information"""
current_block = cfg.entry_node
- dom_tree = cfg.dom_tree
- bb2dom_node = {node.bb: node for node in dom_tree.nodes}
- version_tuple = opc.version_tuple
- # block_stack = [current_block]
+ # Create a mapping from a basic block, which has dominator information, to a graph node.
+ # Note: unreachable basic blocks do not have a "doms" field.
+ bb2dom_node = {
+ bb: next(iter(bb.doms - bb.dom_set)) for bb in cfg.blocks if hasattr(bb, "doms")
+ }
starts = {current_block.start_offset: current_block}
dom_reach_ends = {}
@@ -405,6 +425,8 @@ def augment_instructions(
# These are done for basic blocks, dominators,
# and jump target locations.
offset = inst.offset
+ opname = inst.opname
+ opcode = inst.opcode
new_bb = starts.get(offset, None)
if new_bb:
@@ -414,21 +436,47 @@ def augment_instructions(
new_dom = bb2dom_node.get(bb, dom)
if new_dom is not None:
dom = new_dom
- dom_number = dom.bb.number
+ # dom_number = dom.bb.number
reach_ends = dom_reach_ends.get(dom.reach_offset, [])
reach_ends.append(dom)
dom_reach_ends[dom.reach_offset] = reach_ends
- if inst.opcode in bb_mgr.FOR_INSTRUCTIONS or BB_LOOP in bb.flags:
+ if opcode in bb_mgr.FOR_INSTRUCTIONS or BB_LOOP in bb.flags:
# Use the basic block of the block loop successor,
# this is the main body of the loop, as the block to
# check for leaving the loop.
loop_block_dom_set = tuple(dom.bb.successors)[0].doms
loop_stack.append((dom, loop_block_dom_set, inst))
+ # For now we will assume that edges are sorted so in outermost-to-innermost nesting order.
+ # Add any psuedo-token join markers
+ if offset in cfg.offset2edges:
+ for edge in reversed(cfg.offset2edges[offset]):
+ if edge.scoping_kind == ScopeEdgeKind.Join:
+ from_bb_number = edge.source.bb.number
+ op_name = "BLOCK_END_FALLTHROUGH_JOIN" if edge.kind == "fallthrough" else "BLOCK_END_JUMP_JOIN"
+ pseudo_inst = ExtendedInstruction(
+ opname=op_name,
+ opcode=EXTENDED_OPMAP[op_name],
+ optype="pseudo",
+ inst_size=0,
+ arg=from_bb_number,
+ argval=edge,
+ argrepr=f"from basic block #{from_bb_number}",
+ has_arg=True,
+ offset=offset,
+ starts_line=None,
+ is_jump_target=False,
+ has_extended_arg=False,
+ positions=None,
+ basic_block=bb,
+ dominator=dom,
+ )
+ augmented_instrs.append(pseudo_inst)
+
pseudo_inst = ExtendedInstruction(
opname="BB_START",
- opcode=1001,
+ opcode=EXTENDED_OPMAP["BB_START"],
optype="pseudo",
inst_size=0,
arg=bb.number,
@@ -457,41 +505,41 @@ def augment_instructions(
# FIXME: this shouldn't be needed
bb = dom.bb
- if inst.opcode in opc.JUMP_OPS:
+ if opcode in opc.JUMP_OPS:
jump_target = inst.argval
target_inst = instructions[offset2inst_index[jump_target]]
target_bb = offset2bb[target_inst.offset]
target_dom_set = target_bb.dom_set
if inst.argval < offset:
- # Classify backward loop jumps
- pseudo_op_name = (
- "JUMP_FOR"
- if target_inst.opcode in bb_mgr.FOR_INSTRUCTIONS
- else "JUMP_LOOP"
- )
- pseudo_inst = ExtendedInstruction(
- opname=pseudo_op_name,
- opcode=1001,
- optype="pseudo",
- inst_size=0,
- arg=target_dom_set,
- argval=target_dom_set,
- argrepr=f"{target_dom_set}",
- has_arg=True,
- offset=offset,
- starts_line=None,
- is_jump_target=False,
- has_extended_arg=False,
- positions=None,
- basic_block=bb,
- dominator=dom,
- )
- augmented_instrs.append(pseudo_inst)
+ if opcode in bb_mgr.JUMP_UNCONDITIONAL:
+ # Classify backward loop jumps
+ pseudo_op_name = (
+ "JUMP_FOR"
+ if target_inst.opcode in bb_mgr.FOR_INSTRUCTIONS
+ else "JUMP_LOOP"
+ )
+ pseudo_inst = ExtendedInstruction(
+ opname=pseudo_op_name,
+ opcode=EXTENDED_OPMAP[pseudo_op_name],
+ optype="pseudo",
+ inst_size=0,
+ arg=target_dom_set,
+ argval=target_dom_set,
+ argrepr=f"{target_dom_set}",
+ has_arg=True,
+ offset=offset,
+ starts_line=None,
+ is_jump_target=False,
+ has_extended_arg=False,
+ positions=None,
+ basic_block=bb,
+ dominator=dom,
+ )
+ augmented_instrs.append(pseudo_inst)
else:
# Not backward jump, Note: if jump == offset, then we have an
# infinite loop. We won't check for that here though.
# Check for jump break out of a loop
- loop_related_jump = False
if len(loop_stack) > 0:
# Check for loop-related jumps such as those that
# can occur from break, continue. Note: we also
@@ -511,7 +559,7 @@ def augment_instructions(
pseudo_op_name = "BREAK_LOOP"
pseudo_inst = ExtendedInstruction(
opname=pseudo_op_name,
- opcode=1002,
+ opcode=EXTENDED_OPMAP[pseudo_op_name],
optype="pseudo",
inst_size=0,
arg=target_dom_set,
@@ -528,33 +576,7 @@ def augment_instructions(
dominator=dom,
)
augmented_instrs.append(pseudo_inst)
- loop_related_jump = True
pass
- if not loop_related_jump:
- # Classify jumps that jump to the join of some
- # high-level Python block
- # We find the join offset using reverse dominators?
- # FIXME: complete...
-
- # if jump_target == follow_bb_offset:
- # pseudo_inst = ExtendedInstruction(
- # "JUMP_END_BLOCK",
- # 1002,
- # "pseudo",
- # 0,
- # target_dom_set,
- # target_dom_set,
- # f"{target_dom_set}",
- # True,
- # offset,
- # None,
- # False,
- # False,
- # bb,
- # dom,
- # )
- # augmented_instrs.append(pseudo_inst)
- pass
block_kind = jump_target_kind.get(offset)
if block_kind is not None:
@@ -580,8 +602,8 @@ def augment_instructions(
augmented_instrs.append(pseudo_inst)
extended_inst = ExtendedInstruction(
- opname=inst.opname,
- opcode=inst.opcode,
+ opname=opname,
+ opcode=opcode,
optype=inst.optype,
inst_size=inst.inst_size,
arg=inst.arg,
@@ -604,7 +626,7 @@ def augment_instructions(
if bb:
pseudo_inst = ExtendedInstruction(
opname="BB_END",
- opcode=1002,
+ opcode=EXTENDED_OPMAP["BB_END"],
optype="pseudo",
inst_size=0,
arg=bb.number,
@@ -624,72 +646,45 @@ def augment_instructions(
if bb.flags in [BB_FOR, BB_LOOP]:
loop_stack.pop()
- dom_list = dom_reach_ends.get(offset, None)
- if dom_list is not None:
- for dom in reversed(dom_list):
- dom_number = dom.bb.number
- post_end_set = post_ends(dom.bb)
- if post_end_set:
- pseudo_inst = ExtendedInstruction(
- opname="BLOCK_END_JOIN",
- opcode=1003,
- optype="pseudo",
- inst_size=0,
- arg=dom_number,
- argval=dom_number,
- argrepr=f"Basic Block {post_end_set}",
- has_arg=True,
- offset=offset,
- starts_line=None,
- is_jump_target=False,
- has_extended_arg=False,
- positions=None,
- start_offset=None,
- basic_block=dom.bb,
- dominator=dom,
- )
- augmented_instrs.append(pseudo_inst)
- pass
- pass
-
- # We have a dummy bb at the end+1.
- # Add the end dominator info for that which should exist
- if version_tuple >= (3, 6):
- offset += 2
- else:
- offset += 1
- # FIXME: DRY with above
- dom_list = dom_reach_ends.get(offset, None)
- if dom_list is not None:
- block_end_join_added = False
- for dom in reversed(dom_list):
- dom_number = dom.bb.number
- post_end_set = post_ends(dom.bb)
- if post_end_set and not block_end_join_added:
- pseudo_inst = ExtendedInstruction(
- opname="BLOCK_END_JOIN_NO_ARG",
- opcode=1003,
- optype="pseudo",
- inst_size=0,
- arg=dom_number,
- argval=dom_number,
- argrepr=f"Basic Block {post_end_set}",
- has_arg=False,
- offset=offset,
- starts_line=None,
- is_jump_target=False,
- has_extended_arg=False,
- positions=None,
- basic_block=dom.bb,
- dominator=dom,
- start_offset=None,
- )
- augmented_instrs.append(pseudo_inst)
- block_end_join_added = True
- pass
+ # # We have a dummy bb at the end+1.
+ # # Add the end dominator info for that which should exist
+ # if version_tuple >= (3, 6):
+ # offset += 2
+ # else:
+ # offset += 1
+ # # FIXME: DRY with above
+ # dom_list = dom_reach_ends.get(offset, None)
+ # if dom_list is not None:
+ # block_end_join_added = False
+ # for dom in reversed(dom_list):
+ # dom_number = dom.bb.number
+ # post_end_set = post_ends(dom.bb)
+ # if post_end_set and not block_end_join_added:
+ # pseudo_inst = ExtendedInstruction(
+ # opname="BLOCK_END_JOIN_NO_ARG",
+ # opcode=1003,
+ # optype="pseudo",
+ # inst_size=0,
+ # arg=dom_number,
+ # argval=dom_number,
+ # argrepr=f"Basic Block {post_end_set}",
+ # has_arg=False,
+ # offset=offset,
+ # starts_line=None,
+ # is_jump_target=False,
+ # has_extended_arg=False,
+ # positions=None,
+ # basic_block=dom.bb,
+ # dominator=dom,
+ # start_offset=None,
+ # )
+ # augmented_instrs.append(pseudo_inst)
+ # block_end_join_added = True
+ # pass
# for inst in augmented_instrs:
# print(inst)
+
return augmented_instrs
diff --git a/control_flow/bb.py b/control_flow/bb.py
index 3ca00c2..f22db7a 100644
--- a/control_flow/bb.py
+++ b/control_flow/bb.py
@@ -1,30 +1,30 @@
# Copyright (c) 2021, 2023-2024 by Rocky Bernstein
import sys
-
from typing import Optional
from xdis import next_offset
-from xdis.version_info import PYTHON_VERSION_TRIPLE, IS_PYPY
from xdis.bytecode import get_instructions_bytes
from xdis.op_imports import get_opcode_module
+from xdis.version_info import IS_PYPY, PYTHON_VERSION_TRIPLE
+
from control_flow.graph import (
- BB_POP_BLOCK,
- BB_SINGLE_POP_BLOCK,
- BB_STARTS_POP_BLOCK,
- BB_EXCEPT,
+ BB_BREAK,
+ BB_END_FINALLY,
BB_ENTRY,
- BB_TRY,
+ BB_EXCEPT,
BB_EXIT,
BB_FINALLY,
- BB_END_FINALLY,
BB_FOR,
- BB_BREAK,
BB_JUMP_CONDITIONAL,
- BB_JUMP_UNCONDITIONAL,
BB_JUMP_TO_FALLTHROUGH,
+ BB_JUMP_UNCONDITIONAL,
BB_LOOP,
BB_NOFOLLOW,
+ BB_POP_BLOCK,
BB_RETURN,
+ BB_SINGLE_POP_BLOCK,
+ BB_STARTS_POP_BLOCK,
+ BB_TRY,
FLAG2NAME,
)
@@ -51,7 +51,7 @@ def get_jump_val(jump_arg: int, version: tuple) -> int:
return jump_arg * 2 if version[:2] >= (3, 10) else jump_arg
-class BasicBlock(object):
+class BasicBlock:
"""Extended Basic block from the bytecode.
An extended basic block has a single entry. It can have multiple exits though,
@@ -73,12 +73,13 @@ class BasicBlock(object):
def __init__(
self,
- start_offset,
- end_offset,
- follow_offset,
- loop_offset,
+ start_offset: int,
+ end_offset: int,
+ follow_offset: int,
+ loop_offset: int,
flags=set(),
jump_offsets=set(),
+ starts_line=None,
):
global end_bb
@@ -110,6 +111,8 @@ def __init__(
# "Flags" is a set of interesting bits about the basic block.
# Elements of the bits are BB_... constants
self.flags = flags
+
+ self.starts_line = starts_line
self.index = (start_offset, end_offset)
# Lists of predecessor and successor basic blocks.
@@ -156,7 +159,8 @@ def __repr__(self):
flag_text = ", flags={%s}" % flag_str
else:
flag_text = ""
- return "BasicBlock(#%d range: %s%s, follow_offset=%s, edge_count=%d%s%s)" % (
+ line_text = "" if self.starts_line is None else f", line {self.starts_line}"
+ return "BasicBlock(#%d range: %s%s, follow_offset=%s, edge_count=%d%s%s%s)" % (
self.number,
self.index,
flag_text,
@@ -164,6 +168,7 @@ def __repr__(self):
self.edge_count,
jump_text,
exception_text,
+ line_text,
)
def __str__(self):
@@ -175,11 +180,13 @@ def __str__(self):
exception_text = f", exceptions={sorted(self.exception_offsets)}"
else:
exception_text = ""
- return "BasicBlock(#%d range: %s, %s%s)" % (
+ line_text = "" if self.starts_line is None else f", line {self.starts_line}"
+ return "BasicBlock(#%d range: %s%s%s%s)" % (
self.number,
self.index,
jump_text,
exception_text,
+ line_text,
)
# Define "<" so we can compare and sort basic blocks.
@@ -199,7 +206,14 @@ def __init__(self, version=PYTHON_VERSION_TRIPLE, is_pypy=IS_PYPY):
self.opcode = opcode = get_opcode_module(version)
- self.EXCEPT_INSTRUCTIONS = {opcode.opmap["POP_TOP"]}
+ # FIXME: why is POP_TOP *ever* an except instruction?
+ # If it can be a start an except instruction, then we need
+ # something more to determine this.
+ if version < (3, 10):
+ self.EXCEPT_INSTRUCTIONS = {opcode.opmap["POP_TOP"]}
+ else:
+ self.EXCEPT_INSTRUCTIONS = set()
+
if "SETUP_FINALLY" in opcode.opmap:
self.FINALLY_INSTRUCTIONS = {opcode.opmap["SETUP_FINALLY"]}
self.FOR_INSTRUCTIONS = {opcode.opmap["FOR_ITER"]}
@@ -207,7 +221,10 @@ def __init__(self, version=PYTHON_VERSION_TRIPLE, is_pypy=IS_PYPY):
self.JREL_INSTRUCTIONS = set(opcode.hasjrel)
self.JUMP_INSTRUCTIONS = self.JABS_INSTRUCTIONS | self.JREL_INSTRUCTIONS
if "JUMP_ABSOLUTE" in opcode.opmap:
- self.JUMP_UNCONDITIONAL = {opcode.opmap["JUMP_ABSOLUTE"], opcode.opmap["JUMP_FORWARD"]}
+ self.JUMP_UNCONDITIONAL = {
+ opcode.opmap["JUMP_ABSOLUTE"],
+ opcode.opmap["JUMP_FORWARD"],
+ }
self.POP_BLOCK_INSTRUCTIONS = set()
if "POP_BLOCK" in opcode.opmap:
@@ -222,8 +239,6 @@ def __init__(self, version=PYTHON_VERSION_TRIPLE, is_pypy=IS_PYPY):
self.LOOP_INSTRUCTIONS = set()
self.TRY_INSTRUCTIONS = set()
self.END_FINALLY_INSTRUCTIONS = set()
- self.LOOP_INSTRUCTIONS = set()
- self.TRY_INSTRUCTIONS = set()
if version < (3, 10):
if version < (3, 8):
@@ -234,7 +249,6 @@ def __init__(self, version=PYTHON_VERSION_TRIPLE, is_pypy=IS_PYPY):
# FIXME: add WITH_EXCEPT_START
self.END_FINALLY_INSTRUCTIONS = {opcode.opmap["END_FINALLY"]}
pass
-
else:
self.EXCEPT_INSTRUCTIONS.add(opcode.opmap["RAISE_VARARGS"])
@@ -248,8 +262,11 @@ def __init__(self, version=PYTHON_VERSION_TRIPLE, is_pypy=IS_PYPY):
if opname in opcode.opmap:
self.JUMP_CONDITIONAL.add(opcode.opmap[opname])
- self.NOFOLLOW_INSTRUCTIONS = {opcode.opmap["RETURN_VALUE"], opcode.opmap["YIELD_VALUE"],
- opcode.opmap["RAISE_VARARGS"]}
+ self.NOFOLLOW_INSTRUCTIONS = {
+ opcode.opmap["RETURN_VALUE"],
+ opcode.opmap["YIELD_VALUE"],
+ opcode.opmap["RAISE_VARARGS"],
+ }
if "RERAISE" in opcode.opmap:
self.NOFOLLOW_INSTRUCTIONS.add(opcode.opmap["RAISE_VARARGS"])
@@ -266,7 +283,14 @@ def __init__(self, version=PYTHON_VERSION_TRIPLE, is_pypy=IS_PYPY):
self.JUMP_UNCONDITIONAL.add(opcode.opmap[opname])
def add_bb(
- self, start_offset, end_offset, loop_offset, follow_offset, flags, jump_offsets
+ self,
+ start_offset: int,
+ end_offset: int,
+ loop_offset: int,
+ follow_offset: int,
+ flags: int,
+ jump_offsets: set,
+ starts_line: Optional[int] = None,
):
if BB_STARTS_POP_BLOCK in flags and start_offset == end_offset:
@@ -280,6 +304,7 @@ def add_bb(
flags=flags,
jump_offsets=jump_offsets,
loop_offset=loop_offset,
+ starts_line=starts_line,
)
self.bb_list.append(block)
@@ -295,6 +320,7 @@ def add_bb(
def basic_blocks(
code,
+ linestarts: dict,
offset2inst_index,
version_tuple=PYTHON_VERSION_TRIPLE,
is_pypy=IS_PYPY,
@@ -314,12 +340,13 @@ def basic_blocks(
loop_targets = set()
instructions = list(
get_instructions_bytes(
- code.co_code,
- bb.opcode,
- code.co_varnames,
- code.co_names,
- code.co_consts,
- code.co_cellvars,
+ bytecode=code.co_code,
+ opc=bb.opcode,
+ varnames=code.co_varnames,
+ names=code.co_names,
+ constants=code.co_consts,
+ cells=code.co_cellvars,
+ linestarts=linestarts
)
)
for i, inst in enumerate(instructions):
@@ -351,9 +378,7 @@ def basic_blocks(
else:
end_bb_offset = end_offset + 1
- end_block, _, _ = bb.add_bb(
- end_bb_offset, end_bb_offset, None, None, {BB_EXIT}, []
- )
+ end_block, _, _ = bb.add_bb(end_bb_offset, end_bb_offset, None, None, {BB_EXIT}, [])
start_offset = 0
end_offset = -1
@@ -389,7 +414,6 @@ def basic_blocks(
loop_offset = offset
elif offset == endloop_offsets[-1]:
endloop_offsets.pop()
- pass
if op in bb.LOOP_INSTRUCTIONS:
flags.add(BB_LOOP)
@@ -403,6 +427,7 @@ def basic_blocks(
follow_offset,
flags,
jump_offsets,
+ inst.starts_line,
)
loop_offset = None
if BB_TRY in block.flags:
@@ -421,6 +446,7 @@ def basic_blocks(
end_offset,
flags,
jump_offsets,
+ inst.starts_line,
)
loop_offset = None
if BB_TRY in block.flags:
@@ -473,6 +499,7 @@ def basic_blocks(
follow_offset,
flags,
jump_offsets,
+ inst.starts_line,
)
loop_offset = None
start_offset = follow_offset
@@ -491,7 +518,10 @@ def basic_blocks(
flags.add(BB_JUMP_UNCONDITIONAL)
if jump_offset == follow_offset:
flags.add(BB_JUMP_TO_FALLTHROUGH)
- pass
+ else:
+ # Also note that the edge does not
+ # fall through to the next block.
+ flags.add(BB_NOFOLLOW)
block, flags, jump_offsets = bb.add_bb(
start_offset,
end_offset,
@@ -499,6 +529,7 @@ def basic_blocks(
follow_offset,
flags,
jump_offsets,
+ inst.starts_line,
)
loop_offset = None
if BB_TRY in block.flags:
@@ -517,6 +548,7 @@ def basic_blocks(
follow_offset,
flags,
jump_offsets,
+ inst.starts_line,
)
loop_offset = None
if BB_TRY in block.flags:
@@ -535,6 +567,7 @@ def basic_blocks(
follow_offset,
flags,
jump_offsets,
+ inst.starts_line,
)
loop_offset = None
start_offset = follow_offset
diff --git a/control_flow/build_control_flow.py b/control_flow/build_control_flow.py
index 6b5d052..aa3cdb3 100644
--- a/control_flow/build_control_flow.py
+++ b/control_flow/build_control_flow.py
@@ -2,7 +2,6 @@
import sys
from xdis.codetype.base import iscode
-from xdis.disasm import disco
from xdis.op_imports import get_opcode_module
from xdis.version_info import IS_PYPY, PYTHON_VERSION_TRIPLE
@@ -22,6 +21,8 @@ def build_and_analyze_control_flow(
code_version_tuple=PYTHON_VERSION_TRIPLE[:2],
func_or_code_timestamp=None,
func_or_code_name: str = "",
+ debug: dict = {},
+ file_part: str = "",
):
"""
Compute control-flow graph, dominator information, and
@@ -52,7 +53,8 @@ def build_and_analyze_control_flow(
opc = get_opcode_module(code_version_tuple, VARIANT)
offset2inst_index = {}
- bb_mgr = basic_blocks(code, offset2inst_index, code_version_tuple)
+ linestarts = dict(opc.findlinestarts(code, dup_lines=True))
+ bb_mgr = basic_blocks(code, linestarts, offset2inst_index, code_version_tuple)
# for bb in bb_mgr.bb_list:
# print("\t", bb)
@@ -63,15 +65,16 @@ def build_and_analyze_control_flow(
version = ".".join((str(n) for n in code_version_tuple[:2]))
if graph_options in ("all", "control-flow"):
write_dot(
- func_or_code_name,
+ f"{file_part}{func_or_code_name}",
f"/tmp/flow-{version}-",
cfg.graph,
write_png=True,
exit_node=cfg.exit_node,
)
+ assert cfg.graph is not None
try:
- DominatorTree.compute_dominators_in_cfg(cfg, debug_dict.get("dom", False))
+ cfg.dom_tree = DominatorTree.compute_dominators_in_cfg(cfg, debug_dict.get("dom", False))
for node in cfg.graph.nodes:
if node.bb.nesting_depth < 0:
node.is_dead_code = True
@@ -83,16 +86,17 @@ def build_and_analyze_control_flow(
if graph_options in ("all", "dominators"):
write_dot(
- func_or_code_name,
+ f"{file_part}{func_or_code_name}",
f"/tmp/flow-dom-{version}-",
- cfg.dom_tree,
+ cfg.dom_forest,
write_png=True,
exit_node=cfg.exit_node,
)
+ cfg.classify_edges()
if graph_options in ("all",):
write_dot(
- func_or_code_name,
+ f"{file_part}{func_or_code_name}",
f"/tmp/flow+dom-{version}-",
cfg.graph,
write_png=True,
@@ -102,12 +106,14 @@ def build_and_analyze_control_flow(
assert cfg.graph
- # print("=" * 30)
augmented_instrs = augment_instructions(
func_or_code, cfg, opc, offset2inst_index, bb_mgr
)
- # for inst in augmented_instrs:
- # print(inst.disassemble(opc))
+ if graph_options in ("all", "augmented-instructions"):
+ print("=" * 30)
+ print("Augmented Instructions:")
+ for inst in augmented_instrs:
+ print(inst.disassemble(opc))
# return cs_str
except Exception:
diff --git a/control_flow/cfg.py b/control_flow/cfg.py
index 24810d0..a85c3bb 100644
--- a/control_flow/cfg.py
+++ b/control_flow/cfg.py
@@ -1,12 +1,15 @@
# Copyright (c) 2021, 2024 by Rocky Bernstein
#
from operator import attrgetter
-from typing import Dict, Optional, Tuple
+from typing import Dict, List, Optional, Tuple
from control_flow.graph import (
DiGraph,
+ Edge,
Node,
+ ScopeEdgeKind,
TreeGraph,
jump_flags,
+ BB_JOIN_POINT,
BB_JUMP_CONDITIONAL,
BB_LOOP,
BB_NOFOLLOW,
@@ -29,16 +32,21 @@ def __init__(self, bb_mgr):
self.blocks = bb_mgr.bb_list
self.offset2block: Dict[int, Node] = {}
self.offset2block_sorted: Tuple[int, Node] = tuple()
+ self.offset2edges: Dict[int, List[Edge]] = {}
self.block_nodes = {}
self.graph = None
self.entry_node = None
self.exit_node = bb_mgr.exit_block
#
- self.dom_tree: Optional[TreeGraph] = None
- # Maximum nesting in control flow grapy. -1 means this hasn't been
+ # Maximum nesting in control flow graph. -1 means this hasn't been
# computed. It is computed when self.dom_tree is computed and also is
# stored in there.
+
+ # Result from running dfs_forest.
+ # FIXME: organize this better.
+ self.dom_forest: Optional[TreeGraph] = None
+
self.max_nesting_depth: int = -1
self.analyze(self.blocks, bb_mgr.exit_block)
@@ -55,8 +63,22 @@ def analyze(self, blocks, exit_block):
self.build_flowgraph(blocks, exit_block)
def build_flowgraph(self, blocks, exit_block):
+ """
+ Build a control-flow graph from basic blocks `blocks`.
+ The exit block is `exit_block`.
+ """
+
g = DiGraph()
+ def add_edge(source_node, dest_node, edge_kind: str) -> Edge:
+ new_edge = g.make_add_edge(source_node, dest_node, edge_kind)
+ target_offset = new_edge.dest.bb.start_offset
+ if target_offset not in self.offset2edges:
+ self.offset2edges[target_offset] = [new_edge]
+ else:
+ self.offset2edges[target_offset].append(new_edge)
+ return new_edge
+
self.block_nodes = {}
# Add nodes
@@ -132,25 +154,26 @@ def build_flowgraph(self, blocks, exit_block):
# Is this dead code? (Remove self loops in calculation)
# Entry node, blocks[0] is never unreachable
- if not block.predecessors - {block} and block != blocks[0]:
+ if not (block.predecessors - {block} and block != blocks[0]
+ or BB_ENTRY in block.flags):
block.unreachable = True
block = sorted_blocks[i]
if block.follow_offset:
if BB_NOFOLLOW in block.flags:
kind = "no fallthrough"
- g.make_add_edge(
+ add_edge(
self.block_nodes[block], self.exit_block, "exit edge"
)
else:
kind = "fallthrough"
- g.make_add_edge(
+ add_edge(
self.block_nodes[block],
self.block_nodes[self.block_offsets[block.follow_offset]],
kind,
)
elif BB_EXIT not in block.flags:
- g.make_add_edge(self.block_nodes[block], self.exit_block, "exit edge")
+ add_edge(self.block_nodes[block], self.exit_block, "exit edge")
# Connect the current block to its jump targets
for jump_index in block.jump_offsets:
@@ -160,31 +183,31 @@ def build_flowgraph(self, blocks, exit_block):
target_block = self.block_offsets[jump_index]
if jump_index > block.start_offset:
if BB_LOOP in block.flags:
- edge_type = "forward-scope"
+ edge_kind = "for-finish"
elif BB_JUMP_CONDITIONAL in self.block_nodes[block].flags:
- edge_type = "forward-conditional"
+ edge_kind = "forward-conditional"
else:
- edge_type = "forward"
+ edge_kind = "forward"
else:
- edge_type = "looping"
+ edge_kind = "looping"
pass
if self.block_nodes[target_block] == self.block_nodes[block]:
- edge_type = "self-loop"
+ edge_kind = "self-loop"
- g.make_add_edge(
+ add_edge(
self.block_nodes[block],
self.block_nodes[target_block],
- edge_type,
+ edge_kind,
)
pass
pass
for jump_index in block.exception_offsets:
source_block = self.block_offsets[jump_index]
assert jump_index <= source_block.start_offset
- edge_type = "exception"
- g.make_add_edge(
- self.block_nodes[source_block], self.block_nodes[block], edge_type
+ edge_kind = "exception"
+ add_edge(
+ self.block_nodes[source_block], self.block_nodes[block], edge_kind
)
pass
pass
@@ -192,6 +215,57 @@ def build_flowgraph(self, blocks, exit_block):
self.graph = g
return
+ def classify_edges(self):
+ """
+ Classify edges into alternate edges, looping edges, or join edges.
+ There is a lower-level classification going on in edge.kind.
+ """
+
+ for edge in self.graph.edges:
+
+ if edge.kind == "no fallthrough":
+ # Edge is not to be followed.
+ continue
+
+ # If the immediate dominator of the source and destination
+ # node is the same, then we have an alternate edge.
+ # If the the edge is a backwards jump, then it is a looping edge
+ # If the edge is not looping and the immediate dominator is
+ # not the same, then we have a join edge.
+
+ # Looping edges have already been classified, so use those when
+ # we can.
+ if edge.kind in ("looping", "self-loop"):
+ edge.scoping_kind = ScopeEdgeKind.Looping
+ continue
+ source_block = edge.source.bb
+ target_block = edge.dest.bb
+
+ if source_block.unreachable:
+ continue
+
+ # print(f"Block #{source_block.number} -> Block #{target_block.number}")
+ # if (source_block.number, target_block.number) == (2, 4):
+ # from trepan.api import debug; debug()
+
+ if source_block.number == self.dom_tree.doms[target_block].number:
+ # Jump to target starts a new scope.
+ # Example:
+ # if then ... end
+ edge.scoping_kind = ScopeEdgeKind.Alternate
+ elif (self.dom_tree.doms[source_block] > self.dom_tree.doms[target_block]
+ or self.dom_tree.doms[source_block] == self.dom_tree.doms[target_block]):
+ # The source block is jumping or falling out of a scope: its
+ # `dom` or `scope number` is more nested than the target scope.
+ # Examples:
+ # "if ... else ... end" or
+ # "if ... end" or
+ # "while ... break ... end
+ edge.scoping_kind = ScopeEdgeKind.Join
+ target_block.flags.add(BB_JOIN_POINT)
+ pass
+ return
+
def get_node(self, offset: int) -> Node:
block = self.offset2block.get(offset, None)
if block is not None:
diff --git a/control_flow/dominators.py b/control_flow/dominators.py
index b79d854..3639141 100644
--- a/control_flow/dominators.py
+++ b/control_flow/dominators.py
@@ -20,7 +20,7 @@ def __str__(self) -> str:
class DominatorTree:
"""Handles the dominator trees, dominator, post-dominator
- releation, and the computation of the dominance/post-dominance
+ relation, and the computation of the dominance/post-dominance
frontier.
"""
@@ -37,7 +37,7 @@ def __init__(self, cfg, debug=False):
@classmethod
def compute_dominators_in_cfg(cls, cfg, debug):
- DominatorTree(cfg, debug)
+ return DominatorTree(cfg, debug)
def build(self):
entry = self.cfg.entry_node
diff --git a/control_flow/dotio.py b/control_flow/dotio.py
index 8fc29cf..11b8f1f 100644
--- a/control_flow/dotio.py
+++ b/control_flow/dotio.py
@@ -14,24 +14,16 @@
BB_ENTRY,
BB_EXIT,
BB_END_FINALLY,
+ BB_JOIN_POINT,
BB_JUMP_TO_FALLTHROUGH,
BB_JUMP_UNCONDITIONAL,
BB_NOFOLLOW,
- Node,
+ ScopeEdgeKind,
format_flags_with_width,
)
-DOT_STYLE: Final = """
- graph[fontsize=10 fontname="DejaVu Sans Mono"];
-
- mclimit=1.5;
- rankdir=TD; ordering=out;
- color="#efefef";
-
- node[shape=box style=filled fontsize=10 fontname="DejaVu Sans Mono"
- fillcolor="#efefef", width=2];
- edge[fontsize=10 fontname="Verdana"];
-"""
+DARK_GREEN = "#006400"
+GRAY92 = "#ededed"
BB_LEVEL_BACKGROUNDS = (
{"name": "DodgerBlue4", "hex": "#104e8b", "bg": "white"},
@@ -46,15 +38,27 @@
{"name": "LightSteelBlue1", "hex": "#cae1ff", "bg": "black"},
)
+DOT_STYLE: Final = f"""
+ graph[fontsize=10 fontname="DejaVu Sans Mono"];
+
+ mclimit=1.5;
+ rankdir=TD; ordering=out;
+ color="{GRAY92}";
+
+ node[shape=box style=filled fontsize=10 fontname="DejaVu Sans Mono"
+ fillcolor="{GRAY92}", width=2];
+ edge[fontsize=10 fontname="Verdana"];
+"""
+
+
MAX_COLOR_LEVELS: Final = len(BB_LEVEL_BACKGROUNDS) - 1
flags_prefix: Final = "flags="
FEL: Final = len(flags_prefix)
NODE_TEXT_WIDTH = 26 + FEL
-
-class DotConverter(object):
- def __init__(self, graph, exit_node: Optional[Node] = None):
+class DotConverter:
+ def __init__(self, graph):
self.g = graph
self.exit_node = graph
self.buffer = ""
@@ -74,8 +78,8 @@ def get_node_colors(self, nesting_depth: int) -> Tuple[str, str]:
return color_info["hex"], color_info["bg"]
@staticmethod
- def process(graph, exit_node: Optional[BasicBlock], is_dominator_format: bool):
- converter = DotConverter(graph, exit_node)
+ def process(graph, exit_node: BasicBlock, is_dominator_format: bool):
+ converter = DotConverter(graph)
converter.run(exit_node, is_dominator_format)
return converter.buffer
@@ -131,30 +135,31 @@ def add_edge(self, edge, exit_node: BasicBlock, edge_seen):
dest_port = ""
weight = 1
- if edge.is_join:
+ if edge.scoping_kind == ScopeEdgeKind.Join:
arrow_color = ":brown;0.01"
else:
arrow_color = ""
- color = f'[color="blue:{arrow_color}"]' if edge.is_conditional_jump() else ""
+ color = f'[color="blue{arrow_color}"]' if edge.is_conditional_jump() else ""
if edge.kind in (
"fallthrough",
"no fallthrough",
- "follow",
"exit edge",
"dom-edge",
"pdom-edge",
):
- if edge.kind == "follow":
- style = '[style="invis"]'
+ if edge.kind == "no fallthrough":
+ style = '[style="dashed"] [arrowhead="none"]'
elif edge.kind == "fallthrough":
color = f'[color="red{arrow_color}"]'
+ if BB_NOFOLLOW in edge.source.flags:
+ style = '[style="dashed"] [arrowhead="none"]'
pass
if edge.kind != "exit edge":
weight = 10
elif edge.kind == "exception":
- style = '[color="red"]'
+ style = f'[color="red{arrow_color}"]'
if edge.source.bb.number + 1 == edge.dest.bb.number:
weight = 10
else:
@@ -167,8 +172,9 @@ def add_edge(self, edge, exit_node: BasicBlock, edge_seen):
# edge_port = '[headport=nw] [tailport=sw]';
# edge_port = '[headport=_] [tailport=_]';
else:
- if edge.kind == "forward-scope":
+ if edge.kind == "for-finish":
style = '[style="dotted"]'
+ color = '[color="MediumBlue"]'
if edge.source.bb.number + 1 == edge.dest.bb.number:
weight = 10
source_port = ":c"
@@ -179,16 +185,16 @@ def add_edge(self, edge, exit_node: BasicBlock, edge_seen):
dest_port = ":ne"
pass
elif edge.kind == "self-loop":
- edge_port = '[headport=ne, tailport=se, color="#006400"]'
+ edge_port = f"[headport=ne, tailport=se, color='{DARK_GREEN}{arrow_color}']"
pass
elif edge.kind == "looping":
+ color = f'[color="{DARK_GREEN}{arrow_color}"]'
if edge.dest.bb.number + 1 == edge.source.bb.number:
# For a loop to the immediate predecessor we use
# a somewhat straight centered backward arrow.
source_port = ":c"
dest_port = ":c"
else:
- color = f'[color="#006400{arrow_color}"]'
source_port = ":nw"
dest_port = ":sw"
pass
@@ -215,9 +221,6 @@ def add_edge(self, edge, exit_node: BasicBlock, edge_seen):
source_port = ":se"
dest_port = ":ne"
pass
- elif BB_NOFOLLOW in edge.source.flags:
- style = '[style="dashed"] [arrowhead="none"]'
- weight = 10
if style == "" and edge.source.bb.unreachable:
style = '[style="dashed"] [arrowhead="empty"]'
@@ -256,37 +259,36 @@ def node_repr(self, node, align, is_exit, is_dominator_format: bool):
jump_text = ""
reach_offset_text = ""
flag_text = ""
- if not is_dominator_format:
- if not is_exit and len(node.jump_offsets) > 0:
- jump_text = f"\\ljumps={sorted(node.jump_offsets)}"
- pass
+ if not is_exit and len(node.jump_offsets) > 0:
+ jump_text = f"\\ljumps={sorted(node.jump_offsets)}"
+ pass
- if node.flags:
- flag_text = "%s%s%s" % (
- align,
- flags_prefix,
- format_flags_with_width(
- node.flags,
- NODE_TEXT_WIDTH - FEL,
- align + (" " * (len("flags="))),
- ),
- )
- else:
- flag_text = ""
- pass
+ if node.flags:
+ flag_text = "%s%s%s" % (
+ align,
+ flags_prefix,
+ format_flags_with_width(
+ node.flags,
+ NODE_TEXT_WIDTH - FEL,
+ align + (" " * (len("flags="))),
+ ),
+ )
+ else:
+ flag_text = ""
+ pass
- if hasattr(node, "reach_offset"):
- reach_offset_text = "\\lreach_offset=%d" % node.reach_offset
- pass
+ if hasattr(node, "reach_offset"):
+ reach_offset_text = "\\lreach_offset=%d" % node.reach_offset
pass
+ pass
if is_exit:
return "flags=exit"
- offset_text = "offset: %d..%d" % (node.start_offset, node.end_offset)
- l = len(offset_text)
- if l < NODE_TEXT_WIDTH:
- offset_text += " " * (NODE_TEXT_WIDTH - l)
+ offset_text = f"offset: {node.start_offset}..{node.end_offset}"
+ text_len = len(offset_text)
+ if text_len < NODE_TEXT_WIDTH:
+ offset_text += " " * (NODE_TEXT_WIDTH - text_len)
return f"{offset_text}{flag_text}{jump_text}{reach_offset_text}"
@@ -306,9 +308,9 @@ def add_node(
if exit_node in {node.bb for node in node.bb.dom_set}:
dom_set_len -= 1
if BB_ENTRY in node.bb.flags or dom_set_len > 0:
- style = '[shape = "box3d"]'
+ style = '[shape = "box", peripheries=2]'
elif BB_EXIT in node.bb.flags:
- style = '[shape = "diamond"]'
+ style = '[style = "rounded"]'
align = "\n"
is_exit = True
elif not node.bb.predecessors:
@@ -318,10 +320,14 @@ def add_node(
if is_dominator_format:
fillcolor, fontcolor = self.get_node_colors(node.bb.nesting_depth)
# print("XXX", node.bb, node.bb.nesting_depth, fillcolor, fontcolor)
- style += f'[fontcolor = "{fontcolor}", fillcolor = "{fillcolor}"]'
+ color = 'color=brown, ' if BB_JOIN_POINT in node.bb.flags else ""
+ style += f'[{color}fontcolor = "{fontcolor}", fillcolor = "{fillcolor}"]'
level = " (%d)" % (node.bb.nesting_depth) if node.bb.nesting_depth >= 0 else ""
+ if node.bb.starts_line is not None:
+ level += f", Line {node.bb.starts_line} "
+
label = '[label="Basic Block %d%s%s%s%s"]' % (
node.number,
level,
diff --git a/control_flow/graph.py b/control_flow/graph.py
index 00790ea..66d49a6 100644
--- a/control_flow/graph.py
+++ b/control_flow/graph.py
@@ -8,13 +8,16 @@
"""
from typing import Optional, Set
+from enum import Enum
# First or Basic block that we entered on. Usually
# at offset 0.
# Does this need to be a set?
BB_ENTRY = 0
-# Block is at the end and doesn't have a following instruction.
+# Block is at the end, and doesn't have a following instruction.
+# We have though an edge to the successor *instruction* for assisting displaying
+# the control-flow graph the way the program was written.
BB_NOFOLLOW = 1
# a SETUP_LOOP instruction marking the beginning of a loop.
@@ -77,9 +80,12 @@
# sure the jump arrow points straight down.
BB_JUMP_TO_FALLTHROUGH = 15
+# The beginning of the basic block is a join.
+BB_JOIN_POINT = 16
+
# Basic block ends in a return or an raise that is not inside
# a "try" block.
-BB_RETURN = 16
+BB_RETURN = 17
# Unreachable block
BB_DEAD_CODE = 17
@@ -94,6 +100,7 @@
BB_SINGLE_POP_BLOCK: "single pop block",
BB_STARTS_POP_BLOCK: "starts with pop block",
BB_EXCEPT: "except",
+ BB_JOIN_POINT: "join block",
BB_JUMP_UNCONDITIONAL: "unconditional",
BB_JUMP_CONDITIONAL: "conditional jump",
BB_JUMP_TO_FALLTHROUGH: "jump to fallthough",
@@ -104,9 +111,31 @@
BB_RETURN: "return",
}
+# FIXME: some of the classifications may be overkill.
+ScopeEdgeKind = Enum(
+ "ScopeEdgeKind",
+ [
+ # Edge hasn't been computed yet:
+ "Unknown",
+ # Edge starts a new scope.
+ # Example:
+ # if then ... end
+ "NewScope",
+ # Edge jumps from one alternate to the next one
+ # Example:
+ # if ... elif ... end
+ "Alternate",
+ # Edge joins from an inner scope to an outer one, e.g.
+ # "if ... else ... end" or
+ # "if ... end" or
+ # "while ... break ... end
+ "Join",
+ # Edge jumps to a loop head
+ "Looping",
+ ],
+)
jump_flags = set([BB_JUMP_UNCONDITIONAL, BB_BREAK])
-nofollow_flags = set([BB_NOFOLLOW])
def format_flags(flags):
@@ -200,15 +229,10 @@ def __init__(self, source, dest, kind, data):
self.source = source
self.dest = dest
self.kind = kind
+ self.scoping_kind = ScopeEdgeKind.Unknown
self.flags = set()
self.data = data
- # True edge is a "join" edge. Note that a "join" edge
- # can be an implicit fallthrough edge.
- # Join edges are a non-loop edges where the source
- # node's nesting depth jumps to a target of lesser depth.
- self.is_join = False
-
@classmethod
def reset(self):
self.GLOBAL_COUNTER = 0
@@ -308,7 +332,7 @@ def make_add_node(self, bb):
self.add_node(node)
return node
- def make_add_edge(self, source=None, dest=None, kind=None, data=None):
+ def make_add_edge(self, source=None, dest=None, kind=None, data=None) -> Edge:
edge = DiGraph.make_edge(source=source, dest=dest, kind=kind, data=data)
self.add_edge(edge)
return edge
@@ -380,14 +404,14 @@ def write_dot(
return
path_safe = name.translate(name.maketrans(" <>", "_[]"))
- dot_path = f"{prefix}{path_safe}.dot"
+ dot_path = f"{prefix}-{path_safe}.dot"
open(dot_path, "w").write(graph.to_dot(exit_node, is_dominator_format))
if debug:
print(f"{dot_path} written")
if write_png:
import os
- png_path = f"{prefix}{path_safe}.png"
+ png_path = f"{prefix}-{path_safe}.png"
os.system(f"dot -Tpng {dot_path} > {png_path}")
if debug:
print(f"{png_path} written")
diff --git a/doc-example/count-bits.cpython-38.pyc b/doc-example/count-bits.cpython-38.pyc
new file mode 100644
index 0000000..3325ebd
Binary files /dev/null and b/doc-example/count-bits.cpython-38.pyc differ
diff --git a/doc-example/flow+dom-3.8--count-bits.cpython-38-module.dot b/doc-example/flow+dom-3.8--count-bits.cpython-38-module.dot
new file mode 100644
index 0000000..9c8a516
--- /dev/null
+++ b/doc-example/flow+dom-3.8--count-bits.cpython-38-module.dot
@@ -0,0 +1,38 @@
+digraph G {
+ graph[fontsize=10 fontname="DejaVu Sans Mono"];
+
+ mclimit=1.5;
+ rankdir=TD; ordering=out;
+ color="#efefef";
+
+ node[shape=box style=filled fontsize=10 fontname="DejaVu Sans Mono"
+ fillcolor="#efefef", width=2];
+ edge[fontsize=10 fontname="Verdana"];
+
+ # basic blocks:
+ block_1 [shape = "box", peripheries=2][fontcolor = "black", fillcolor = "#cae1ff"][label="Basic Block 1 (0)\loffset: 0..20 \lflags=entry \lreach_offset=70\l"];
+ block_2 [shape = "box", peripheries=2][fontcolor = "black", fillcolor = "#bcd2ee"][label="Basic Block 2 (1)\loffset: 22..28 \lflags=loop, conditional jump \ljumps=[66]\lreach_offset=70\l"];
+ block_3 [shape = "box", peripheries=2][fontcolor = "black", fillcolor = "#a2b5cd"][label="Basic Block 3 (2)\loffset: 30..36 \lflags=conditional jump \ljumps=[48]\lreach_offset=64\l"];
+ block_4 [fontcolor = "black", fillcolor = "#63b8ff"][label="Basic Block 4 (3)\loffset: 38..46 \lflags=no fallthrough,\l unconditional\ljumps=[56]\lreach_offset=46\l"];
+ block_5 [fontcolor = "black", fillcolor = "#63b8ff"][label="Basic Block 5 (3)\loffset: 48..54 \lreach_offset=54\l"];
+ block_6 [color=brown, fontcolor = "black", fillcolor = "#63b8ff"][label="Basic Block 6 (3)\loffset: 56..64 \lflags=no fallthrough, except,\l join block,\l unconditional\ljumps=[22]\lreach_offset=64\l"];
+ block_7 [fontcolor = "black", fillcolor = "#a2b5cd"][label="Basic Block 7 (2)\loffset: 66..68 \lflags=no fallthrough, return \lreach_offset=68\l"];
+
+ # Edges should be ordered from innermost block edges to outmost.
+ # If layout gives ugly edge crossing, change the order or the edges
+ # and/or add port directions on nodes For example:
+ # block_1:sw -> block_4:nw or
+ # block_0 -> block_3:ne
+ # See https://stackoverflow.com/questions/53468814/how-can-i-influence-graphviz-dot-to-prefer-which-edges-can-cross/53472852#53472852
+
+ block_6:nw -> block_2:sw [weight=1][color="#006400"];
+ block_6 -> block_7 [weight=10][style="dashed"] [arrowhead="none"];
+ block_5 -> block_6 [weight=10][color="red:brown;0.01"][style="dashed"];
+ block_4 -> block_5 [weight=10][style="dashed"] [arrowhead="none"];
+ block_4 -> block_6 [weight=1];
+ block_3 -> block_4 [weight=10][color="red"][style="dashed"];
+ block_3 -> block_5 [weight=1][color="blue"][style="dotted"];
+ block_2 -> block_3 [weight=10][color="red"][style="dashed"];
+ block_2:se -> block_7:ne [weight=1][color="MediumBlue"][style="dotted"];
+ block_1 -> block_2 [weight=10][color="red"][style="dashed"];
+}
diff --git a/doc-example/flow+dom-3.8--count-bits.cpython-38-module.png b/doc-example/flow+dom-3.8--count-bits.cpython-38-module.png
new file mode 100644
index 0000000..f975479
Binary files /dev/null and b/doc-example/flow+dom-3.8--count-bits.cpython-38-module.png differ
diff --git a/doc-example/flow-3.8--count-bits.cpython-38-module.png b/doc-example/flow-3.8--count-bits.cpython-38-module.png
new file mode 100644
index 0000000..430b41e
Binary files /dev/null and b/doc-example/flow-3.8--count-bits.cpython-38-module.png differ
diff --git a/doc-example/flow-dom-3.8--count-bits.cpython-38-module.dot b/doc-example/flow-dom-3.8--count-bits.cpython-38-module.dot
new file mode 100644
index 0000000..10d79fc
--- /dev/null
+++ b/doc-example/flow-dom-3.8--count-bits.cpython-38-module.dot
@@ -0,0 +1,34 @@
+digraph G {
+ graph[fontsize=10 fontname="DejaVu Sans Mono"];
+
+ mclimit=1.5;
+ rankdir=TD; ordering=out;
+ color="#efefef";
+
+ node[shape=box style=filled fontsize=10 fontname="DejaVu Sans Mono"
+ fillcolor="#efefef", width=2];
+ edge[fontsize=10 fontname="Verdana"];
+
+ # basic blocks:
+ block_1 [shape = "box", peripheries=2][label="Basic Block 1 (0)\loffset: 0..20 \lflags=entry \lreach_offset=70\l"];
+ block_2 [shape = "box", peripheries=2][label="Basic Block 2 (1)\loffset: 22..28 \lflags=loop, conditional jump \ljumps=[66]\lreach_offset=70\l"];
+ block_3 [shape = "box", peripheries=2][label="Basic Block 3 (2)\loffset: 30..36 \lflags=conditional jump \ljumps=[48]\lreach_offset=64\l"];
+ block_4 [label="Basic Block 4 (3)\loffset: 38..46 \lflags=no fallthrough,\l unconditional\ljumps=[56]\lreach_offset=46\l"];
+ block_5 [label="Basic Block 5 (3)\loffset: 48..54 \lreach_offset=54\l"];
+ block_6 [label="Basic Block 6 (3)\loffset: 56..64 \lflags=no fallthrough, except,\l unconditional\ljumps=[22]\lreach_offset=64\l"];
+ block_7 [label="Basic Block 7 (2)\loffset: 66..68 \lflags=no fallthrough, return \lreach_offset=68\l"];
+
+ # Edges should be ordered from innermost block edges to outmost.
+ # If layout gives ugly edge crossing, change the order or the edges
+ # and/or add port directions on nodes For example:
+ # block_1:sw -> block_4:nw or
+ # block_0 -> block_3:ne
+ # See https://stackoverflow.com/questions/53468814/how-can-i-influence-graphviz-dot-to-prefer-which-edges-can-cross/53472852#53472852
+
+ block_3 -> block_4 [weight=10];
+ block_3 -> block_5 [weight=10][color="blue"];
+ block_3 -> block_6 [weight=10];
+ block_2 -> block_3 [weight=10];
+ block_2 -> block_7 [weight=10][color="blue"];
+ block_1 -> block_2 [weight=10];
+}
diff --git a/pytest/test_bb.py b/pytest/test_bb.py
index 85fa849..458d3fa 100644
--- a/pytest/test_bb.py
+++ b/pytest/test_bb.py
@@ -45,7 +45,8 @@ def test_basic():
print(f"{fn_name}: ")
dis.dis(fn)
print()
- bb_mgr = basic_blocks(fn.__code__, offset2inst_index)
+ # FIXME: add linestarts instead of None below
+ bb_mgr = basic_blocks(fn.__code__, None, offset2inst_index)
check_blocks(bb_mgr.bb_list, fn_name)
diff --git a/pytest/test_cfg.py b/pytest/test_cfg.py
index ef41cf9..3644904 100644
--- a/pytest/test_cfg.py
+++ b/pytest/test_cfg.py
@@ -78,7 +78,8 @@ def test_basic():
print(fn.__name__)
dis.dis(fn)
print()
- bb_mgr = basic_blocks(fn.__code__, offset2inst_index)
+ # FIXME: add linestarts instead of None below
+ bb_mgr = basic_blocks(fn.__code__, None, offset2inst_index)
cfg = ControlFlowGraph(bb_mgr)
if DEBUG:
write_dot(fn.__name__, f"/tmp/test_cfg-{version}-", cfg.graph, write_png=True)
diff --git a/pytest/test_dom.py b/pytest/test_dom.py
index e063a87..b99291b 100755
--- a/pytest/test_dom.py
+++ b/pytest/test_dom.py
@@ -60,7 +60,8 @@ def test_basic():
print(name)
dis.dis(fn)
print()
- bb_mgr = basic_blocks(fn.__code__, offset2inst_index)
+ # FIXME: add linestarts instead of None below
+ bb_mgr = basic_blocks(fn.__code__, None, offset2inst_index)
cfg = ControlFlowGraph(bb_mgr)
if DEBUG:
write_dot(name, f"/tmp/test_dom-{version}-", cfg.graph, write_png=True)
diff --git a/test/test-all-examples.py b/test/test-all-examples.py
index 97e0a89..efccf18 100755
--- a/test/test-all-examples.py
+++ b/test/test-all-examples.py
@@ -9,7 +9,7 @@ def testing():
assert (
False
), (
- "This should have been replaced via read-in python script with a function called"
+ "This should have been replaced via a read-in Python script with a function called"
" testing"
)
diff --git a/test/test-bb.py b/test/test-bb.py
index 315678a..036683e 100755
--- a/test/test-bb.py
+++ b/test/test-bb.py
@@ -15,6 +15,21 @@ def trivial_or(a, b):
return a or b
+def and3(a, b, c):
+ return a and b and c
+
+
+def and4(a, b, c, d):
+ return a and b and c and d
+
+
+def for_simple(a):
+ x = 0
+ for i in a:
+ x += i
+ return x
+
+
def if_else(a):
if a:
a += 1
@@ -86,6 +101,18 @@ def foo2(a):
return a
+def or2(a, b):
+ return a or b
+
+
+def or3(a, b, c):
+ return a or b or c
+
+
+def or4(a, b, c, d):
+ return a or b or c or d
+
+
def while_if_continue(a):
a += 1
while a > 5:
@@ -131,7 +158,7 @@ def for_break():
def try_except():
try:
a = 1
- except:
+ except Exception:
a = 2
return a
@@ -140,7 +167,7 @@ def try_finally():
x = 1
except RuntimeError:
x = 2
- except:
+ except Exception:
x = 3
finally:
x = 4
diff --git a/test/test-bb2.py b/test/test-bb2.py
index 9cbde3a..72bd344 100755
--- a/test/test-bb2.py
+++ b/test/test-bb2.py
@@ -1,7 +1,11 @@
#!/usr/bin/env python
import sys
+import os
import os.path as osp
-from control_flow.__main__ import main
+from control_flow.build_control_flow import build_and_analyze_control_flow
+from types import CodeType
+from xdis.load import check_object_path, load_module
+from xdis.version_info import PYTHON_VERSION_TRIPLE
def testing():
@@ -18,10 +22,56 @@ def testing():
filename = sys.argv[1]
short = ""
+stat = os.stat(filename)
if filename.endswith(".py"):
exec(open(filename).read())
short = osp.basename(filename)[0:-3]
+ source = open(filename, "r").read()
+ co = compile(source, filename, "exec")
+ timestamp = stat.st_mtime
+ version_tuple = PYTHON_VERSION_TRIPLE
+
+ name = co.co_name
+ if name.startswith("<"):
+ name = name[1:]
+ if name.endswith(">"):
+ name = name[:-1]
+
elif filename.endswith(".pyc"):
+ timestamp = stat.st_mtime
short = osp.basename(filename)[0:-4]
+ pyc_filename = check_object_path(filename)
+ (
+ version_tuple,
+ timestamp,
+ _, # magic_int,
+ co,
+ _, # is_pypy,
+ _, # source_size,
+ _ # sip_hash,
+ ) = load_module(pyc_filename)
+
+func_name=""
+if len(sys.argv) == 3:
+ func_name = sys.argv[2]
+ func_codes = [const for const in co.co_consts if isinstance(const, CodeType) and const.co_name == func_name]
+ len_func_codes = len(func_codes)
+ if len_func_codes == 0:
+ print(f"Did not find a code object named {func_name}")
+ sys.exit(1)
+ elif len_func_codes == 1:
+ co = func_codes[0]
+ elif len_func_codes > 1:
+ print(f"Found too many code objects named {func_name}:\n{func_codes}")
+ sys.exit(1)
+
-main(testing, short)
+build_and_analyze_control_flow(
+ co,
+ graph_options="all",
+ code_version_tuple=version_tuple,
+ func_or_code_timestamp=timestamp,
+ func_or_code_name=func_name,
+ debug={},
+ file_part=f"{short}-"
+)