Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add generic dataflow framework + example + stack value analysis #118

Draft
wants to merge 4 commits into
base: dev
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions tealer/analyses/dataflow/abstract.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
from abc import ABC, abstractmethod
from typing import List, Any, TypeVar, Generic, TYPE_CHECKING

from tealer.teal.basic_blocks import BasicBlock

AbstractValues = TypeVar("AbstractValues")

if TYPE_CHECKING:
from tealer.teal.teal import Teal


class AbstractDataflow(ABC, Generic[AbstractValues]):
def __init__(self, teal: "Teal"):
self.teal = teal

@abstractmethod
def _merge_predecessor(self, bb: BasicBlock) -> AbstractValues:
pass

@abstractmethod
def _is_fix_point(self, bb: BasicBlock, values: AbstractValues) -> bool:
pass

@abstractmethod
def _transfer_function(self, bb: BasicBlock) -> AbstractValues:
pass

@abstractmethod
def _store_values_in(self, bb: BasicBlock, values: AbstractValues) -> None:
pass

@abstractmethod
def _store_values_out(self, bb: BasicBlock, values: AbstractValues) -> None:
pass

@abstractmethod
def _filter_successors(self, bb: BasicBlock) -> List[BasicBlock]:
pass

@abstractmethod
def result(self) -> Any:
pass

def explore(self, bb: BasicBlock, is_entry_node: bool = False) -> None:

values = self._merge_predecessor(bb)

if not is_entry_node and self._is_fix_point(bb, values):
return

self._store_values_in(bb, values)
values = self._transfer_function(bb)
self._store_values_out(bb, values)

successors = self._filter_successors(bb)
for successor in successors:
self.explore(successor)

def run_analysis(self) -> None:
self.explore(self.teal.bbs[0], is_entry_node=True)
102 changes: 102 additions & 0 deletions tealer/analyses/dataflow/collect_instructions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
from collections import defaultdict
from typing import Union, Set, Any, List, Dict, TYPE_CHECKING

from tealer.analyses.dataflow.abstract import AbstractDataflow
from tealer.teal.basic_blocks import BasicBlock
from tealer.teal.instructions.instructions import Instruction

if TYPE_CHECKING:
from tealer.teal.teal import Teal

MAX_ELEMS = 35


class InstructionSet:
def __init__(self, values: Union[str, Instruction, Set[Instruction]]) -> None:

if isinstance(values, str):
assert values in ["TOP", "BOTTOM"]

if isinstance(values, set) and len(values) > MAX_ELEMS:
values = "TOP"
if isinstance(values, Instruction):
values = {values}

self.values: Union[str, Set[Instruction]] = values

@property
def is_top(self) -> bool:
return isinstance(self.values, str) and self.values == "TOP"

@property
def is_bottom(self) -> bool:
return isinstance(self.values, str) and self.values == "BOTTOM"

def union(self, instructions: "InstructionSet") -> "InstructionSet":
v0 = self.values
v1 = instructions.values
if v0 == "TOP" or v1 == "TOP":
return InstructionSet("TOP")

if v0 == "BOTTOM":
return InstructionSet(v1)

if v1 == "BOTTOM":
return InstructionSet(v0)

assert isinstance(v0, set)
assert isinstance(v1, set)

return InstructionSet(v0.union(v1))

def __eq__(self, other: Any) -> bool:
if isinstance(other, InstructionSet):
if isinstance(self.values, str) and isinstance(other.values, str):
return self.values == other.values
if isinstance(self.values, set) and isinstance(other.values, set):
return self.values == other.values
return False

def __str__(self) -> str:
if isinstance(self.values, str):
return self.values
return "[" + ",".join([str(x) for x in self.values]) + "]"


class CollectInstructions(AbstractDataflow[InstructionSet]):
def __init__(self, teal: "Teal") -> None:
super().__init__(teal)
self.bb_in: Dict[BasicBlock, InstructionSet] = defaultdict(lambda: InstructionSet("BOTTOM"))
self.bb_out: Dict[BasicBlock, InstructionSet] = defaultdict(
lambda: InstructionSet("BOTTOM")
)

def _merge_predecessor(self, bb: BasicBlock) -> InstructionSet:
s = InstructionSet("BOTTOM")
for bb_prev in bb.prev:
s = s.union(self.bb_out[bb_prev])

return s

def _is_fix_point(self, bb: BasicBlock, values: InstructionSet) -> bool:
return self.bb_in[bb] == values

def _transfer_function(self, bb: BasicBlock) -> InstructionSet:
bb_out = self.bb_in[bb]

for ins in bb.instructions:
bb_out = bb_out.union(InstructionSet(ins))

return bb_out

def _store_values_in(self, bb: BasicBlock, values: InstructionSet) -> None:
self.bb_in[bb] = values

def _store_values_out(self, bb: BasicBlock, values: InstructionSet) -> None:
self.bb_out[bb] = values

def _filter_successors(self, bb: BasicBlock) -> List[BasicBlock]:
return bb.next

def result(self) -> Dict[BasicBlock, InstructionSet]:
return self.bb_out
229 changes: 229 additions & 0 deletions tealer/analyses/dataflow/stack_value.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
from collections import defaultdict
from typing import Union, Set, Any, List, Dict, TYPE_CHECKING, Type, Callable, Tuple

from tealer.analyses.dataflow.abstract import AbstractDataflow
from tealer.teal.basic_blocks import BasicBlock
from tealer.teal.global_field import GlobalField
from tealer.teal.instructions import instructions
from tealer.teal.instructions.instructions import Instruction
from tealer.teal.instructions.transaction_field import TransactionField

if TYPE_CHECKING:
from tealer.teal.teal import Teal

MAX_ELEMS_PER_STACK_VALUE = 35
MAX_STACK_DEPTH = 100


# pylint: disable=too-few-public-methods
class TOP:
def __eq__(self, other: Any) -> bool:
return isinstance(other, TOP)

def __str__(self) -> str:
return "TOP"


# pylint: disable=too-few-public-methods
class BOTTOM:
def __eq__(self, other: Any) -> bool:
return isinstance(other, BOTTOM)

def __str__(self) -> str:
return "BOTTOM"


VALUES_TRACKED = Union[Set[Union[GlobalField, TransactionField, int, str]], TOP, BOTTOM]


class StackValue:
"""
StackValue represent an abstract value on the stack
It can be either a set of int/str/fields, or TOP/BOTTOM
The set's size is limited to MAX_ELEMS_PER_STACK_VALUE, if above, it becomes TOP

"""

def __init__(self, values: VALUES_TRACKED):
if isinstance(values, set) and len(values) > MAX_ELEMS_PER_STACK_VALUE:
values = TOP()
self.values = values

def union(self, other_stack_value: "StackValue") -> "StackValue":
self_values = self.values
other_values = other_stack_value.values
if isinstance(self_values, TOP) or isinstance(other_values, TOP):
return StackValue(TOP())
if isinstance(self_values, BOTTOM):
return StackValue(other_values)
if isinstance(other_values, BOTTOM):
return StackValue(self_values)
assert isinstance(self_values, set)
assert isinstance(other_values, set)
return StackValue(self_values | other_values)

def __eq__(self, other: Any) -> bool:
if isinstance(other, StackValue):
return self.values == other.values
return False

def __str__(self) -> str:
values = self.values
if isinstance(values, (TOP, BOTTOM)):
return str(values)
assert isinstance(values, set)
return str({str(x) for x in values})


class Stack:
"""
Represent an abstract stack
The length is limited by MAX_STACK_DEPTH
self.values contains the abstract element

If there is two paths merged, where one path push 1 (concrete stack [1])
and the other push 3; (concrete stack [3])
then the abstract stack is
[ [1;3] ]x
Ie: the top can either be 1 or 3

If we pop beyond the known values, we return TOP().
As a result, if the most left elements are TOP in the stack, we can stop tracking them

If there is two paths merged, and the stack size has a different size, then
we use TOP for the elements in the difference. Ex:
- one path push 1; push 2; (concrete stack [2;1])
- and the other push 3; (concrete stack [3])
- then the abstract stack is
- [ [TOP] ; [1;3] ]
Ie: the top can either be 1 or 3, and the second one is TOP
Because the most left element of the stack can be removed, this can be simplied as
[ [1;3] ]



"""

def __init__(self, values: List[StackValue]) -> None:
if len(values) > MAX_STACK_DEPTH:
values = values[:-MAX_STACK_DEPTH]

while values and values[0] == StackValue(TOP()):
values.pop()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this supposed to be

while values and values[0] == StackValue(Top()):
    values.pop(0)

Or

while values and values[-1] == StackValue(Top()):
    values.pop()

I'm guessing it's the first one (?)

self.values = values

def union(self, stack: "Stack") -> "Stack":

v1 = self.values
v2 = stack.values

min_length = min(len(v1), len(v2))
v1 = v1[:-min_length]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this lines be

v1 = v1[-min_length:]
v2 = v2[-min_length:]

v2 = v2[:-min_length]

v3 = []
for i in range(min_length):
v3.append(v1[i].union(v2[i]))

return Stack(v3)

def pop(self, number: int) -> Tuple["Stack", List[StackValue]]:
if number == 0:
return Stack(self.values), []
if len(self.values) < number:
diff = number - len(self.values)
poped_values = list(self.values)
return Stack([]), [StackValue(TOP()) for _ in range(diff)] + poped_values

poped_values = self.values[:-number]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this line be poped_values = self.values[-number:] (?)

return Stack(self.values[: len(self.values) - number]), poped_values

def push(self, pushed_values: List[StackValue]) -> "Stack":
return Stack(self.values + pushed_values)

def __eq__(self, other: Any) -> bool:
if isinstance(other, Stack):
return self.values == other.values
return False

def __str__(self) -> str:
return str([str(x) for x in self.values])


# pylint: disable=unused-argument
def handle_int(ins: Instruction, stack: Stack) -> List[StackValue]:
assert isinstance(ins, instructions.Int)
return [StackValue({ins.value})]


# pylint: disable=unused-argument
def handle_pushint(ins: Instruction, stack: Stack) -> List[StackValue]:
assert isinstance(ins, instructions.PushInt)
return [StackValue({ins.value})]


# pylint: disable=unused-argument
def handle_txn(ins: Instruction, stack: Stack) -> List[StackValue]:
assert isinstance(ins, instructions.Txn)
return [StackValue({ins.field})]


# pylint: disable=unused-argument
def handle_global(ins: Instruction, stack: Stack) -> List[StackValue]:
assert isinstance(ins, instructions.Global)
return [StackValue({ins.field})]


special_handling: Dict[Type[Instruction], Callable[[Instruction, Stack], List[StackValue]]] = {
instructions.Int: handle_int,
instructions.PushInt: handle_pushint,
instructions.Txn: handle_txn,
instructions.Global: handle_global,
}


class StackValueAnalysis(AbstractDataflow[Stack]):
def __init__(self, teal: "Teal") -> None:
super().__init__(teal)
self.bb_in: Dict[BasicBlock, Stack] = defaultdict(lambda: Stack([StackValue(BOTTOM())]))
self.bb_out: Dict[BasicBlock, Stack] = defaultdict(lambda: Stack([]))

self.ins_in: Dict[Instruction, Stack] = defaultdict(lambda: Stack([]))
self.ins_out: Dict[Instruction, Stack] = defaultdict(lambda: Stack([]))

def _merge_predecessor(self, bb: BasicBlock) -> Stack:
s = Stack([])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stack union depends on the length of inputs.

    def union(self, stack: "Stack") -> "Stack":

        v1 = self.values
        v2 = stack.values

        min_length = min(len(v1), len(v2))
        v1 = v1[:-min_length]
        v2 = v2[:-min_length]

        v3 = []
        for i in range(min_length):
            v3.append(v1[i].union(v2[i]))

        return Stack(v3)

Because initial s = Stack([]) has length 0, output of union operation will be empty stack. _merge_predecessor will always result in empty stack.
Correct approach would be:

if not bb.prev: # zero predecessors
    return Stack([])

s = self.bb_out[bb.prev[0]]
for bb_prev in bb.prev[1:]:
    s = s.union(self.bb_out[bb_prev])

return s

for bb_prev in bb.prev:
s = s.union(self.bb_out[bb_prev])
return s

def _is_fix_point(self, bb: BasicBlock, values: Stack) -> bool:
return self.bb_in[bb] == values

def _transfer_function(self, bb: BasicBlock) -> Stack:
bb_out = self.bb_in[bb]

for ins in bb.instructions:
self.ins_in[ins] = Stack(bb_out.values)
if type(ins) in special_handling:
pushed_elems = special_handling[type(ins)](ins, bb_out)
else:
pushed_elems = [StackValue(TOP()) for _ in range(ins.stack_push_size)]
bb_out, _ = bb_out.pop(ins.stack_pop_size)
bb_out = bb_out.push(pushed_elems)
self.ins_out[ins] = Stack(bb_out.values)

return bb_out

def _store_values_in(self, bb: BasicBlock, values: Stack) -> None:
self.bb_in[bb] = values

def _store_values_out(self, bb: BasicBlock, values: Stack) -> None:
self.bb_out[bb] = values

def _filter_successors(self, bb: BasicBlock) -> List[BasicBlock]:
return bb.next

def result(self) -> Dict[BasicBlock, Stack]:
return self.bb_out
Loading
Loading