Skip to content

Commit

Permalink
Merge pull request #5 from delftdata/merge-to-collect
Browse files Browse the repository at this point in the history
Use collect operator in test programs
  • Loading branch information
lucasvanmol authored Jan 20, 2025
2 parents 1746feb + 89e86ab commit 8d8ed31
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 16 deletions.
18 changes: 14 additions & 4 deletions src/cascade/dataflow/test_dataflow.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from typing import Any
from cascade.dataflow.dataflow import DataFlow, Edge, Event, EventResult, InvokeMethod, OpNode
from cascade.dataflow.dataflow import CollectNode, CollectTarget, DataFlow, Edge, Event, EventResult, InvokeMethod, OpNode
from cascade.dataflow.operator import StatefulOperator

class DummyUser:
Expand Down Expand Up @@ -79,9 +79,19 @@ def test_simple_df_propogation():
def test_merge_df_propogation():
df = DataFlow("user.buy_2_items")
n0 = OpNode(DummyUser, InvokeMethod("buy_2_items_0"))
n1 = OpNode(DummyItem, InvokeMethod("get_price"))
n2 = OpNode(DummyItem, InvokeMethod("get_price"))
n3 = MergeNode()
n3 = CollectNode(assign_result_to="item_prices", read_results_from="item_price")
n1 = OpNode(
DummyItem,
InvokeMethod("get_price"),
assign_result_to="item_price",
collect_target=CollectTarget(n3, 2, 0)
)
n2 = OpNode(
DummyItem,
InvokeMethod("get_price"),
assign_result_to="item_price",
collect_target=CollectTarget(n3, 2, 1)
)
n4 = OpNode(DummyUser, InvokeMethod("buy_2_items_1"))
df.add_edge(Edge(n0, n1))
df.add_edge(Edge(n0, n2))
Expand Down
80 changes: 68 additions & 12 deletions test_programs/expected/checkout_two_items.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
from typing import Any
# from ..target.checkout_item import User, Item
# from cascade.dataflow.dataflow import DataFlow, OpNode, InvokeMethod, Edge

from cascade.dataflow.operator import StatefulOperator
from ..target.checkout_two_items import User, Item
from cascade.dataflow.dataflow import DataFlow, OpNode, InvokeMethod, Edge, CollectNode, CollectTarget

def buy_two_items_0_compiled(variable_map: dict[str, Any], state: User, key_stack: list[str]) -> Any:
key_stack.append(variable_map['item_1_key'])
key_stack.append(variable_map['item_2_key'])
key_stack.append(
[variable_map["item1_key"], variable_map["item2_key"]]
)
return None


def buy_two_items_1_compiled(variable_map: dict[str, Any], state: User, key_stack: list[str]) -> Any:
key_stack.pop()
item_price_1_0 = variable_map['item_price_1_0']
item_price_2_0 = variable_map['item_price_2_0']
item_price_1_0 = variable_map['item_price_1']
item_price_2_0 = variable_map['item_price_2']
total_price_0 = item_price_1_0 + item_price_2_0
state.balance -= total_price_0
return state.balance >= 0
Expand All @@ -22,13 +24,67 @@ def get_price_0_compiled(variable_map: dict[str, Any], state: Item, key_stack: l
return state.price


def user_buy_item_df():
df = DataFlow("user.buy_item")
n0 = OpNode(User, InvokeMethod("buy_item_0"))
n1 = OpNode(Item, InvokeMethod("get_price"), assign_result_to="item_price")
n2 = OpNode(User, InvokeMethod("buy_item_1"))
# An operator is defined by the underlying class and the functions that can be called
user_op = StatefulOperator(
User,
{
"buy_two_items_0": buy_two_items_0_compiled,
"buy_two_items_1": buy_two_items_1_compiled
},
None)

item_op = StatefulOperator(
Item, {"get_price": get_price_0_compiled}, None
)

def user_buy_two_items_df():
df = DataFlow("user.buy_2_items")
n0 = OpNode(user_op, InvokeMethod("buy_2_items_0"))
n1 = OpNode(
item_op,
InvokeMethod("get_price"),
assign_result_to="item_price_1",
)
n2 = OpNode(
item_op,
InvokeMethod("get_price"),
assign_result_to="item_price_2",
)
n3 = OpNode(user_op, InvokeMethod("buy_2_items_1"))
df.add_edge(Edge(n0, n1))
df.add_edge(Edge(n0, n2))
df.add_edge(Edge(n1, n2))
df.add_edge(Edge(n2, n3))
df.entry = n0
return df


# For future optimizations (not used)
def user_buy_two_items_df_parallelized():
df = DataFlow("user.buy_2_items")
n0 = OpNode(user_op, InvokeMethod("buy_2_items_0"))
n3 = CollectNode(assign_result_to="item_prices", read_results_from="item_price")
n1 = OpNode(
item_op,
InvokeMethod("get_price"),
assign_result_to="item_price",
collect_target=CollectTarget(n3, 2, 0)
)
n2 = OpNode(
item_op,
InvokeMethod("get_price"),
assign_result_to="item_price",
collect_target=CollectTarget(n3, 2, 1)
)
n4 = OpNode(user_op, InvokeMethod("buy_2_items_1"))
df.add_edge(Edge(n0, n1))
df.add_edge(Edge(n0, n2))
df.add_edge(Edge(n1, n3))
df.add_edge(Edge(n2, n3))
df.add_edge(Edge(n3, n4))
df.entry = n0
return df

user_op.dataflows = {
"buy_two_items": user_buy_two_items_df(),
}

0 comments on commit 8d8ed31

Please sign in to comment.