From c23a33b409c50a24e3ae218db8688ed05eb10bf4 Mon Sep 17 00:00:00 2001 From: lucasvanmol Date: Wed, 18 Dec 2024 12:24:29 +0100 Subject: [PATCH 1/9] Basic example running --- src/cascade/dataflow/dataflow.py | 50 ++++-- src/cascade/runtime/flink_runtime.py | 26 ++- src/cascade/runtime/test_global_state.py | 199 +++++++++++++++++++++++ 3 files changed, 261 insertions(+), 14 deletions(-) create mode 100644 src/cascade/runtime/test_global_state.py diff --git a/src/cascade/dataflow/dataflow.py b/src/cascade/dataflow/dataflow.py index 3546196..446a3e8 100644 --- a/src/cascade/dataflow/dataflow.py +++ b/src/cascade/dataflow/dataflow.py @@ -1,6 +1,6 @@ from abc import ABC from dataclasses import dataclass, field -from typing import Any, List, Optional, Type, Union +from typing import Any, Callable, List, Optional, Type, Union @dataclass @@ -13,6 +13,11 @@ class InvokeMethod: """A method invocation of the underlying method indentifier.""" method_name: str +@dataclass +class Filter: + """Filter by this function""" + filter_fn: Callable + @dataclass class Node(ABC): """Base class for Nodes.""" @@ -33,9 +38,17 @@ class OpNode(Node): A `Dataflow` may reference the same `StatefulOperator` multiple times. The `StatefulOperator` that this node belongs to is referenced by `cls`.""" cls: Type - method_type: Union[InitClass, InvokeMethod] + method_type: Union[InitClass, InvokeMethod, Filter] assign_result_to: Optional[str] = None +@dataclass +class SelectAllNode(Node): + """A node type that will yield all items of an entity filtered by + some function. + + Think of this as executing `SELECT * FROM cls`""" + cls: Type + @dataclass class MergeNode(Node): """A node in a `Dataflow` corresponding to a merge operator. @@ -155,18 +168,33 @@ def propogate(self, key_stack, result) -> Union['EventResult', list['Event']]: if len(targets) == 0: return EventResult(self._id, result) else: - # An event with multiple targets should have the same number of keys in a list on top of its key stack keys = key_stack.pop() if not isinstance(keys, list): keys = [keys] - return [Event( - target, - key_stack + [key], - self.variable_map, - self.dataflow, - _id=self._id) - - for target, key in zip(targets, keys)] + + if len(targets) == 1: + # We assume that all keys need to go to the same target + # this is only used for SelectAll propogation + return [Event( + targets[0], + key_stack + [key], + self.variable_map, + self.dataflow, + _id=self._id) + + for key in keys] + else: + # An event with multiple targets should have the same number of + # keys in a list on top of its key stack + assert len(targets) == len(keys) + return [Event( + target, + key_stack + [key], + self.variable_map, + self.dataflow, + _id=self._id) + + for target, key in zip(targets, keys)] @dataclass class EventResult(): diff --git a/src/cascade/runtime/flink_runtime.py b/src/cascade/runtime/flink_runtime.py index 86c71c8..a236f7d 100644 --- a/src/cascade/runtime/flink_runtime.py +++ b/src/cascade/runtime/flink_runtime.py @@ -1,14 +1,14 @@ import os -from typing import Optional, Union +from typing import Optional, Type, Union from pyflink.common.typeinfo import Types, get_gateway from pyflink.common import Configuration, DeserializationSchema, SerializationSchema, WatermarkStrategy from pyflink.datastream.connectors import DeliveryGuarantee from pyflink.datastream.data_stream import CloseableIterator from pyflink.datastream.functions import KeyedProcessFunction, RuntimeContext, ValueState, ValueStateDescriptor from pyflink.datastream.connectors.kafka import KafkaOffsetsInitializer, KafkaRecordSerializationSchema, KafkaSource, KafkaSink -from pyflink.datastream import StreamExecutionEnvironment +from pyflink.datastream import ProcessFunction, StreamExecutionEnvironment import pickle -from cascade.dataflow.dataflow import Event, EventResult, InitClass, InvokeMethod, MergeNode, OpNode +from cascade.dataflow.dataflow import Event, EventResult, Filter, InitClass, InvokeMethod, MergeNode, OpNode, SelectAllNode from cascade.dataflow.operator import StatefulOperator from confluent_kafka import Producer import logging @@ -51,6 +51,12 @@ def process_element(self, event: Event, ctx: KeyedProcessFunction.Context): # TODO: check if state actually needs to be updated if state is not None: self.state.update(pickle.dumps(state)) + elif isinstance(event.target.method_type, Filter): + state = pickle.loads(self.state.value()) + result = event.target.method_type.filter_fn(event.variable_map, state) + if not result: + return + result = event.key_stack[-1] if event.target.assign_result_to is not None: event.variable_map[event.target.assign_result_to] = result @@ -63,6 +69,20 @@ def process_element(self, event: Event, ctx: KeyedProcessFunction.Context): logger.debug(f"FlinkOperator {event.target.cls.__name__}[{ctx.get_current_key()}]: Propogated {len(new_events)} new Events") yield from new_events +class SelectAllOperator(ProcessFunction): + """A process function that yields all keys of a certain class""" + def __init__(self, ids: dict[Type, list[str]]): + self.ids = ids + + def process_element(self, event: Event, ctx: 'ProcessFunction.Context'): + assert isinstance(event.target, SelectAllNode) + logger.debug(f"SelectAllOperator {event.target.cls.__name__}: Processing: {event}") + + # yield all the hotel_ids we know about + event.key_stack.append(self.ids[event.target.cls]) + new_events = event.propogate(event.key_stack, None) + logger.debug(f"SelectAll [{event.target.cls}]: Propogated {len(new_events)} events") + yield from new_events class FlinkMergeOperator(KeyedProcessFunction): """Flink implementation of a merge operator.""" diff --git a/src/cascade/runtime/test_global_state.py b/src/cascade/runtime/test_global_state.py new file mode 100644 index 0000000..0d87044 --- /dev/null +++ b/src/cascade/runtime/test_global_state.py @@ -0,0 +1,199 @@ +""" +Basically we need a way to search through all state. +""" +import math +import random +from dataclasses import dataclass +from typing import Any, Type + +from pyflink.common import Configuration +from pyflink.datastream import ProcessFunction, StreamExecutionEnvironment +from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer, FlinkKafkaProducer + +from cascade.dataflow.dataflow import DataFlow, Edge, Event, EventResult, Filter, InitClass, OpNode, SelectAllNode +from cascade.dataflow.operator import StatefulOperator +from cascade.runtime.flink_runtime import ByteSerializer, FlinkOperator, SelectAllOperator +from confluent_kafka import Producer +import os +import pickle # problems with pickling functions (e.g. lambdas)? use cloudpickle +import logging +import time + +def add_kafka_source(env: StreamExecutionEnvironment, topics, broker="localhost:9092"): + kafka_jar = os.path.join(os.path.abspath(os.path.dirname(__file__)), + 'bin/flink-sql-connector-kafka-3.3.0-1.20.jar') + serializer_jar = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'bin/flink-kafka-bytes-serializer.jar') + + if os.name == 'nt': + env.add_jars(f"file:///{kafka_jar}",f"file://{serializer_jar}") + else: + env.add_jars(f"file://{kafka_jar}",f"file://{serializer_jar}") + + deserialization_schema = ByteSerializer() + properties: dict = { + "bootstrap.servers": broker, + "auto.offset.reset": "earliest", + "group.id": "test_group_1", + } + kafka_consumer = FlinkKafkaConsumer(topics, deserialization_schema, properties) + return env.add_source(kafka_consumer) + +def dbg(e): + # print(e) + return e + +@dataclass +class Geo: + x: int + y: int + +class Hotel: + def __init__(self, name: str, loc: Geo): + self.name = name + self.loc = loc + + def distance(self, loc: Geo) -> float: + return math.sqrt((self.loc.x - loc.x) ** 2 + (self.loc.y - loc.y) ** 2) + + def __repr__(self) -> str: + return f"Hotel({self.name}, {self.loc})" + +def distance_compiled(variable_map: dict[str, Any], state: Hotel, key_stack: list[str]) -> Any: + loc = variable_map["loc"] + return math.sqrt((state.loc.x - loc.x) ** 2 + (state.loc.y - loc.y) ** 2) + + + +# We compile just the predicate, the select is implemented using a selectall node +def get_nearby_predicate_compiled(variable_map: dict[str, Any], state: Hotel) -> bool: + return state.distance(variable_map["loc"]) < variable_map["dist"] + + + + +def test_yeeter(): + + hotel_op = StatefulOperator(Hotel, {"distance": distance_compiled}, {}) + hotel_op = FlinkOperator(hotel_op) + + hotels = [] + random.seed(42) + for i in range(100): + coord_x = random.randint(-10, 10) + coord_y = random.randint(-10, 10) + hotels.append(Hotel(f"h_{i}", Geo(coord_x, coord_y))) + + def get_nearby(loc: Geo, dist: int) -> list[Hotel]: + return [hotel for hotel in hotels if hotel.distance(loc) < dist] + + # Configure the local Flink instance with the ui at http://localhost:8081 + config = Configuration() # type: ignore + config.set_string("rest.port", "8081") + env = StreamExecutionEnvironment.get_execution_environment(config) + + # Add the kafka producer and consumers + topic = "input-topic" + broker = "localhost:9092" + ds = add_kafka_source(env, topic) + producer = Producer({"bootstrap.servers": 'localhost:9092'}) + deserialization_schema = ByteSerializer() + properties: dict = { + "bootstrap.servers": broker, + "group.id": "test_group_1", + } + kafka_external_sink = FlinkKafkaProducer("out-topic", deserialization_schema, properties) + kafka_internal_sink = FlinkKafkaProducer(topic, deserialization_schema, properties) + + # Create the datastream that will handle + # - simple (single node) dataflows and, + # - init classes + stream = ( + ds.map(lambda x: pickle.loads(x)) + ) + + + select_all_op = SelectAllOperator({Hotel: [hotel.name for hotel in hotels]}) + + select_all_stream = ( + stream.filter(lambda e: isinstance(e.target, SelectAllNode)) + .process(select_all_op) # yield all the hotel_ids + ) + + op_stream = ( + stream.union(select_all_stream).filter(lambda e: isinstance(e.target, OpNode)) + ) + + + hotel_stream = ( + op_stream + .filter(lambda e: e.target.cls == Hotel) + .key_by(lambda e: e.key_stack[-1]) + .process(hotel_op) + ) + + full_stream = hotel_stream #.union... + + full_stream_filtered = ( + full_stream + .filter(lambda e: isinstance(e, Event)) + .filter(lambda e: isinstance(e.target, Filter)) + .filter(lambda e: e.target.filter_fn()) + ) + + full_stream_unfiltered = ( + full_stream + .filter(lambda e: not isinstance(e, Event) or not isinstance(e.target, Filter)) + ) + + # have to remove items from full_stream as well?? + ds = full_stream_unfiltered.union(full_stream_filtered) + + # INIT HOTELS + init_hotel = OpNode(Hotel, InitClass()) + for hotel in hotels: + event = Event(init_hotel, [hotel.name], {"name": hotel.name, "loc": hotel.loc}, None) + producer.produce( + topic, + value=pickle.dumps(event), + ) + + + + ds_external = ds.map(lambda e: dbg(e)).filter(lambda e: isinstance(e, EventResult)).filter(lambda e: e.event_id > 99).print() #.add_sink(kafka_external_sink) + ds_internal = ds.map(lambda e: dbg(e)).filter(lambda e: isinstance(e, Event)).map(lambda e: pickle.dumps(e)).add_sink(kafka_internal_sink) + producer.flush() + + env.execute_async() + + print("sleepin") + time.sleep(2) + + # GET NEARBY + # dataflow for getting all hotels within region + df = DataFlow("get_nearby_hotels") + n0 = SelectAllNode(Hotel) + n1 = OpNode(Hotel, Filter(get_nearby_predicate_compiled)) + df.add_edge(Edge(n0, n1)) + + dist = 5 + loc = Geo(0, 0) + event = Event(n0, [], {"loc": loc, "dist": dist}, df) + producer.produce( + topic, + value=pickle.dumps(event), + ) + + nearby = [] + for hotel in hotels: + if hotel.distance(loc) < dist: + nearby.append(hotel.name) + print(nearby) + # ok thats pretty good. But now we need to solve the problem of merging + # an arbitray number of nodes. but like we naturally want to merge as late + # as possible, right? ideally we want to process results in a streaming + # fashion + + # I want another example that does something after filtering, + # for example buying all items less than 10 price + input() + From d398b1764da1328b5ecc84e7ebbae7d894ace762 Mon Sep 17 00:00:00 2001 From: lucasvanmol Date: Wed, 18 Dec 2024 15:47:38 +0100 Subject: [PATCH 2/9] Add FlinkRuntime test --- src/cascade/runtime/flink_runtime.py | 102 +++++++++++++--- src/cascade/runtime/test_global_state.py | 142 ++++++----------------- 2 files changed, 121 insertions(+), 123 deletions(-) diff --git a/src/cascade/runtime/flink_runtime.py b/src/cascade/runtime/flink_runtime.py index a236f7d..34f0c36 100644 --- a/src/cascade/runtime/flink_runtime.py +++ b/src/cascade/runtime/flink_runtime.py @@ -1,3 +1,4 @@ +from dataclasses import dataclass import os from typing import Optional, Type, Union from pyflink.common.typeinfo import Types, get_gateway @@ -8,7 +9,7 @@ from pyflink.datastream.connectors.kafka import KafkaOffsetsInitializer, KafkaRecordSerializationSchema, KafkaSource, KafkaSink from pyflink.datastream import ProcessFunction, StreamExecutionEnvironment import pickle -from cascade.dataflow.dataflow import Event, EventResult, Filter, InitClass, InvokeMethod, MergeNode, OpNode, SelectAllNode +from cascade.dataflow.dataflow import Event, EventResult, Filter, InitClass, InvokeMethod, MergeNode, Node, OpNode, SelectAllNode from cascade.dataflow.operator import StatefulOperator from confluent_kafka import Producer import logging @@ -20,6 +21,18 @@ console_handler.setFormatter(formatter) logger.addHandler(console_handler) +@dataclass +class FlinkRegisterKeyNode(Node): + """A node that will register a key with the SelectAll operator. + + This node is specific to Flink, and will be automatically generated. + It should not be used in a `DataFlow`. + + @private + """ + key: str + cls: Type + class FlinkOperator(KeyedProcessFunction): """Wraps an `cascade.dataflow.datflow.StatefulOperator` in a KeyedProcessFunction so that it can run in Flink. """ @@ -41,6 +54,17 @@ def process_element(self, event: Event, ctx: KeyedProcessFunction.Context): # otherwise, order of variable_map matters for variable assignment result = self.operator.handle_init_class(*event.variable_map.values()) + # Register the created key in FlinkSelectAllOperator + register_key_event = Event( + FlinkRegisterKeyNode(key_stack[-1], self.operator._cls), # problem is that this id goes up when we don't rly watn it + [], + {}, + None, + _id = event._id + ) + logger.debug(f"FlinkOperator {event.target.cls.__name__}[{ctx.get_current_key()}]: Registering key: {register_key_event}") + yield register_key_event + # Pop this key from the key stack so that we exit key_stack.pop() self.state.update(pickle.dumps(result)) @@ -69,20 +93,36 @@ def process_element(self, event: Event, ctx: KeyedProcessFunction.Context): logger.debug(f"FlinkOperator {event.target.cls.__name__}[{ctx.get_current_key()}]: Propogated {len(new_events)} new Events") yield from new_events -class SelectAllOperator(ProcessFunction): +class FlinkSelectAllOperator(KeyedProcessFunction): """A process function that yields all keys of a certain class""" - def __init__(self, ids: dict[Type, list[str]]): - self.ids = ids + def __init__(self): + self.state: ValueState = None # type: ignore (expect state to be initialised on .open()) + + def open(self, runtime_context: RuntimeContext): + descriptor = ValueStateDescriptor("entity-keys", Types.PICKLED_BYTE_ARRAY()) #,Types.OBJECT_ARRAY(Types.STRING())) + self.state: ValueState = runtime_context.get_state(descriptor) def process_element(self, event: Event, ctx: 'ProcessFunction.Context'): - assert isinstance(event.target, SelectAllNode) - logger.debug(f"SelectAllOperator {event.target.cls.__name__}: Processing: {event}") + state: list[str] = self.state.value() + if state is None: + state = [] + + if isinstance(event.target, FlinkRegisterKeyNode): + logger.debug(f"SelectAllOperator [{event.target.cls.__name__}]: Processing: {event}") + + state.append(event.target.key) + self.state.update(state) - # yield all the hotel_ids we know about - event.key_stack.append(self.ids[event.target.cls]) - new_events = event.propogate(event.key_stack, None) - logger.debug(f"SelectAll [{event.target.cls}]: Propogated {len(new_events)} events") - yield from new_events + elif isinstance(event.target, SelectAllNode): + logger.debug(f"SelectAllOperator [{event.target.cls.__name__}]: Processing: {event}") + + # Yield all the keys we now about + event.key_stack.append(state) + new_events = event.propogate(event.key_stack, None) + logger.debug(f"SelectAllOperator [{event.target.cls.__name__}]: Propogated {len(new_events)} events") + yield from new_events + else: + raise Exception(f"Unexpected target for SelectAllOperator: {event.target}") class FlinkMergeOperator(KeyedProcessFunction): """Flink implementation of a merge operator.""" @@ -235,7 +275,7 @@ def init(self, kafka_broker="localhost:9092", bundle_time=1, bundle_size=5): ) """Kafka sink that will be ingested again by the Flink runtime.""" - stream = ( + event_stream = ( self.env.from_source( kafka_source, WatermarkStrategy.no_watermarks(), @@ -245,11 +285,29 @@ def init(self, kafka_broker="localhost:9092", bundle_time=1, bundle_size=5): # .filter(lambda e: isinstance(e, Event)) # Enforced by `add_operator` type safety ) - self.stateful_op_stream = stream.filter(lambda e: isinstance(e.target, OpNode)) + # Events with a `SelectAllNode` will first be processed by the select + # all operator, which will send out multiple other Events that can + # then be processed by operators in the same steam. + select_all_stream = ( + event_stream.filter(lambda e: + isinstance(e.target, SelectAllNode) or isinstance(e.target, FlinkRegisterKeyNode)) + .key_by(lambda e: e.target.cls) + .process(FlinkSelectAllOperator()) + ) + """Stream that ingests events with an `SelectAllNode` or `FlinkRegisterKeyNode`""" + not_select_all_stream = ( + event_stream.filter(lambda e: + not (isinstance(e.target, SelectAllNode) or isinstance(e.target, FlinkRegisterKeyNode))) + ) + + event_stream = select_all_stream.union(not_select_all_stream) + + + self.stateful_op_stream = event_stream.filter(lambda e: isinstance(e.target, OpNode)) """Stream that ingests events with an `cascade.dataflow.dataflow.OpNode` target""" self.merge_op_stream = ( - stream.filter(lambda e: isinstance(e.target, MergeNode)) + event_stream.filter(lambda e: isinstance(e.target, MergeNode)) .key_by(lambda e: e._id) # might not work in the future if we have multiple merges in one dataflow? .process(FlinkMergeOperator()) ) @@ -289,7 +347,21 @@ def run(self, run_async=False, collect=False) -> Union[CloseableIterator, None]: assert self.env is not None, "FlinkRuntime must first be initialised with `init()`." # Combine all the operator streams - ds = self.merge_op_stream.union(*self.stateful_op_streams) + operator_streams = self.merge_op_stream.union(*self.stateful_op_streams) + + # Add filtering for nodes with a `Filter` target + full_stream_filtered = ( + operator_streams + .filter(lambda e: isinstance(e, Event) and isinstance(e.target, Filter)) + .filter(lambda e: e.target.filter_fn()) + ) + full_stream_unfiltered = ( + operator_streams + .filter(lambda e: not (isinstance(e, Event) and isinstance(e.target, Filter))) + ) + ds = full_stream_filtered.union(full_stream_unfiltered) + + # Output the stream if collect: ds_external = ds.filter(lambda e: isinstance(e, EventResult)).execute_and_collect() else: diff --git a/src/cascade/runtime/test_global_state.py b/src/cascade/runtime/test_global_state.py index 0d87044..45d1e32 100644 --- a/src/cascade/runtime/test_global_state.py +++ b/src/cascade/runtime/test_global_state.py @@ -9,13 +9,14 @@ from pyflink.common import Configuration from pyflink.datastream import ProcessFunction, StreamExecutionEnvironment from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer, FlinkKafkaProducer +from pyflink.datastream.data_stream import CloseableIterator from cascade.dataflow.dataflow import DataFlow, Edge, Event, EventResult, Filter, InitClass, OpNode, SelectAllNode from cascade.dataflow.operator import StatefulOperator -from cascade.runtime.flink_runtime import ByteSerializer, FlinkOperator, SelectAllOperator +from cascade.runtime.flink_runtime import ByteSerializer, FlinkOperator, FlinkRegisterKeyNode, FlinkRuntime, FlinkSelectAllOperator from confluent_kafka import Producer import os -import pickle # problems with pickling functions (e.g. lambdas)? use cloudpickle +import cloudpickle # problems with pickling functions (e.g. lambdas)? use cloudcloudpickle import logging import time @@ -38,8 +39,8 @@ def add_kafka_source(env: StreamExecutionEnvironment, topics, broker="localhost: kafka_consumer = FlinkKafkaConsumer(topics, deserialization_schema, properties) return env.add_source(kafka_consumer) -def dbg(e): - # print(e) +def dbg(e, msg=""): + print(msg + str(e)) return e @dataclass @@ -58,6 +59,7 @@ def distance(self, loc: Geo) -> float: def __repr__(self) -> str: return f"Hotel({self.name}, {self.loc})" + def distance_compiled(variable_map: dict[str, Any], state: Hotel, key_stack: list[str]) -> Any: loc = variable_map["loc"] return math.sqrt((state.loc.x - loc.x) ** 2 + (state.loc.y - loc.y) ** 2) @@ -68,105 +70,36 @@ def distance_compiled(variable_map: dict[str, Any], state: Hotel, key_stack: lis def get_nearby_predicate_compiled(variable_map: dict[str, Any], state: Hotel) -> bool: return state.distance(variable_map["loc"]) < variable_map["dist"] +hotel_op = StatefulOperator(Hotel, {"distance": distance_compiled}, {}) +def test_nearby_hotels(): + runtime = FlinkRuntime("test_nearby_hotels") + runtime.init() + runtime.add_operator(FlinkOperator(hotel_op)) - -def test_yeeter(): - - hotel_op = StatefulOperator(Hotel, {"distance": distance_compiled}, {}) - hotel_op = FlinkOperator(hotel_op) - + # Create Hotels hotels = [] + init_hotel = OpNode(Hotel, InitClass()) random.seed(42) - for i in range(100): + for i in range(50): coord_x = random.randint(-10, 10) coord_y = random.randint(-10, 10) - hotels.append(Hotel(f"h_{i}", Geo(coord_x, coord_y))) - - def get_nearby(loc: Geo, dist: int) -> list[Hotel]: - return [hotel for hotel in hotels if hotel.distance(loc) < dist] - - # Configure the local Flink instance with the ui at http://localhost:8081 - config = Configuration() # type: ignore - config.set_string("rest.port", "8081") - env = StreamExecutionEnvironment.get_execution_environment(config) - - # Add the kafka producer and consumers - topic = "input-topic" - broker = "localhost:9092" - ds = add_kafka_source(env, topic) - producer = Producer({"bootstrap.servers": 'localhost:9092'}) - deserialization_schema = ByteSerializer() - properties: dict = { - "bootstrap.servers": broker, - "group.id": "test_group_1", - } - kafka_external_sink = FlinkKafkaProducer("out-topic", deserialization_schema, properties) - kafka_internal_sink = FlinkKafkaProducer(topic, deserialization_schema, properties) - - # Create the datastream that will handle - # - simple (single node) dataflows and, - # - init classes - stream = ( - ds.map(lambda x: pickle.loads(x)) - ) - - - select_all_op = SelectAllOperator({Hotel: [hotel.name for hotel in hotels]}) - - select_all_stream = ( - stream.filter(lambda e: isinstance(e.target, SelectAllNode)) - .process(select_all_op) # yield all the hotel_ids - ) - - op_stream = ( - stream.union(select_all_stream).filter(lambda e: isinstance(e.target, OpNode)) - ) - - - hotel_stream = ( - op_stream - .filter(lambda e: e.target.cls == Hotel) - .key_by(lambda e: e.key_stack[-1]) - .process(hotel_op) - ) - - full_stream = hotel_stream #.union... - - full_stream_filtered = ( - full_stream - .filter(lambda e: isinstance(e, Event)) - .filter(lambda e: isinstance(e.target, Filter)) - .filter(lambda e: e.target.filter_fn()) - ) - - full_stream_unfiltered = ( - full_stream - .filter(lambda e: not isinstance(e, Event) or not isinstance(e.target, Filter)) - ) - - # have to remove items from full_stream as well?? - ds = full_stream_unfiltered.union(full_stream_filtered) - - # INIT HOTELS - init_hotel = OpNode(Hotel, InitClass()) - for hotel in hotels: + hotel = Hotel(f"h_{i}", Geo(coord_x, coord_y)) event = Event(init_hotel, [hotel.name], {"name": hotel.name, "loc": hotel.loc}, None) - producer.produce( - topic, - value=pickle.dumps(event), - ) + runtime.send(event) + hotels.append(hotel) - - - ds_external = ds.map(lambda e: dbg(e)).filter(lambda e: isinstance(e, EventResult)).filter(lambda e: e.event_id > 99).print() #.add_sink(kafka_external_sink) - ds_internal = ds.map(lambda e: dbg(e)).filter(lambda e: isinstance(e, Event)).map(lambda e: pickle.dumps(e)).add_sink(kafka_internal_sink) - producer.flush() + collected_iterator: CloseableIterator = runtime.run(run_async=True, collect=True) + records = [] + def wait_for_event_id(id: int) -> EventResult: + for record in collected_iterator: + records.append(record) + print(f"Collected record: {record}") + if record.event_id == id: + return record - env.execute_async() - - print("sleepin") - time.sleep(2) + # Wait for hotels to be created + wait_for_event_id(event._id) # GET NEARBY # dataflow for getting all hotels within region @@ -178,22 +111,15 @@ def get_nearby(loc: Geo, dist: int) -> list[Hotel]: dist = 5 loc = Geo(0, 0) event = Event(n0, [], {"loc": loc, "dist": dist}, df) - producer.produce( - topic, - value=pickle.dumps(event), - ) - + runtime.send(event, flush=True) + nearby = [] for hotel in hotels: if hotel.distance(loc) < dist: nearby.append(hotel.name) - print(nearby) - # ok thats pretty good. But now we need to solve the problem of merging - # an arbitray number of nodes. but like we naturally want to merge as late - # as possible, right? ideally we want to process results in a streaming - # fashion - - # I want another example that does something after filtering, - # for example buying all items less than 10 price - input() + sol = wait_for_event_id(event._id) + print(nearby) + print(sol) + print(records) + assert sol.result in nearby From 53fa5950c1dafef2e654390b8f285f6b2c42c8d5 Mon Sep 17 00:00:00 2001 From: lucasvanmol Date: Thu, 19 Dec 2024 16:01:01 +0100 Subject: [PATCH 3/9] Select* and collect working (but contains duplicates) --- src/cascade/dataflow/dataflow.py | 100 +++++++++++++-- src/cascade/dataflow/operator.py | 36 +++++- src/cascade/runtime/flink_runtime.py | 156 ++++++++++++++++++++--- src/cascade/runtime/test_global_state.py | 128 +++++++++++-------- 4 files changed, 332 insertions(+), 88 deletions(-) diff --git a/src/cascade/dataflow/dataflow.py b/src/cascade/dataflow/dataflow.py index 446a3e8..013c5ee 100644 --- a/src/cascade/dataflow/dataflow.py +++ b/src/cascade/dataflow/dataflow.py @@ -2,6 +2,8 @@ from dataclasses import dataclass, field from typing import Any, Callable, List, Optional, Type, Union +class Operator(ABC): + pass @dataclass class InitClass: @@ -24,7 +26,9 @@ class Node(ABC): id: int = field(init=False) """This node's unique id.""" + _id_counter: int = field(init=False, default=0, repr=False) + outgoing_edges: list['Edge'] = field(init=False, default_factory=list, repr=False) def __post_init__(self): # Assign a unique ID from the class-level counter @@ -37,9 +41,11 @@ class OpNode(Node): A `Dataflow` may reference the same `StatefulOperator` multiple times. The `StatefulOperator` that this node belongs to is referenced by `cls`.""" - cls: Type + operator: Operator method_type: Union[InitClass, InvokeMethod, Filter] assign_result_to: Optional[str] = None + is_conditional: bool = False + """Whether or not the boolean result of this node dictates the following path.""" @dataclass class SelectAllNode(Node): @@ -48,6 +54,7 @@ class SelectAllNode(Node): Think of this as executing `SELECT * FROM cls`""" cls: Type + collect_target: 'CollectNode' @dataclass class MergeNode(Node): @@ -57,12 +64,25 @@ class MergeNode(Node): Their actual implementation is runtime-dependent.""" pass +@dataclass +class CollectNode(Node): + """A node in a `Dataflow` corresponding to a merge operator. + + It will aggregate incoming edges and output them as a list to the outgoing edge. + Their actual implementation is runtime-dependent.""" + assign_result_to: str + """The variable name in the variable map that will contain the collected result.""" + read_results_from: str + """The variable name in the variable map that the individual items put their result in.""" + + @dataclass class Edge(): """An Edge in the Dataflow graph.""" from_node: Node to_node: Node variable_map: dict[str, Any] = field(default_factory=dict) + if_conditional: Optional[bool] = None class DataFlow: """A Dataflow is a graph consisting of `OpNode`s, `MergeNode`s, and `Edge`s. @@ -103,10 +123,10 @@ class DataFlow: df.add_edge(Edge(n3, n4)) ``` """ - def __init__(self, name): - self.name = name - self.adjacency_list = {} - self.nodes = {} + def __init__(self, name: str): + self.name: str = name + self.adjacency_list: dict[int, list[int]] = {} + self.nodes: dict[int, Node] = {} self.entry: Node = None def add_node(self, node: Node): @@ -120,11 +140,32 @@ def add_edge(self, edge: Edge): self.add_node(edge.from_node) self.add_node(edge.to_node) self.adjacency_list[edge.from_node.id].append(edge.to_node.id) + edge.from_node.outgoing_edges.append(edge) def get_neighbors(self, node: Node) -> List[Node]: """Get the outgoing neighbors of this `Node`""" return [self.nodes[id] for id in self.adjacency_list.get(node.id, [])] +class Result(ABC): + pass + +@dataclass +class Arrived(Result): + val: Any + +@dataclass +class NotArrived(Result): + pass + +@dataclass +class CollectTarget: + target_node: CollectNode + """Target node""" + total_items: int + """How many items the merge node needs to wait on (including this one).""" + result_idx: int + """The index this result should be in the collected array.""" + @dataclass class Event(): """An Event is an object that travels through the Dataflow graph.""" @@ -147,6 +188,10 @@ class Event(): _id: int = field(default=None) # type: ignore (will get updated in __post_init__ if unset) """Unique ID for this event. Except in `propogate`, this `id` should not be set.""" + + collect_target: Optional[CollectTarget] = field(default=None) + """Tells each mergenode (key) how many events to merge on""" + _id_counter: int = field(init=False, default=0, repr=False) def __post_init__(self): @@ -160,7 +205,7 @@ def propogate(self, key_stack, result) -> Union['EventResult', list['Event']]: # TODO: keys should be structs containing Key and Opnode (as we need to know the entity (cls) and method to invoke for that particular key) # the following method only works because we assume all the keys have the same entity and method - if self.dataflow is None or len(key_stack) == 0: + if self.dataflow is None:# or len(key_stack) == 0: return EventResult(self._id, result) targets = self.dataflow.get_neighbors(self.target) @@ -171,18 +216,50 @@ def propogate(self, key_stack, result) -> Union['EventResult', list['Event']]: keys = key_stack.pop() if not isinstance(keys, list): keys = [keys] + + # Events with SelectAllNodes need to be assigned a CollectTarget + if isinstance(self.target, SelectAllNode): + collect_targets = [ + CollectTarget(self.target.collect_target, len(keys), i) + for i in range(len(keys)) + ] + else: + collect_targets = [self.collect_target for i in range(len(keys))] + + if isinstance(self.target, OpNode) and self.target.is_conditional: + # In this case there will be two targets depending on the condition + + edges = self.dataflow.nodes[self.target.id].outgoing_edges + true_edges = [edge for edge in edges if edge.if_conditional] + false_edges = [edge for edge in edges if not edge.if_conditional] + assert len(true_edges) == len(false_edges) == 1 + target_true = true_edges[0].to_node + target_false = false_edges[0].to_node - if len(targets) == 1: + + return [Event( + target_true if result else target_false, + key_stack + [key], + self.variable_map, + self.dataflow, + _id=self._id, + collect_target=ct) + + for key, ct in zip(keys, collect_targets)] + + elif len(targets) == 1: # We assume that all keys need to go to the same target # this is only used for SelectAll propogation + return [Event( targets[0], key_stack + [key], self.variable_map, self.dataflow, - _id=self._id) + _id=self._id, + collect_target=ct) - for key in keys] + for key, ct in zip(keys, collect_targets)] else: # An event with multiple targets should have the same number of # keys in a list on top of its key stack @@ -192,9 +269,10 @@ def propogate(self, key_stack, result) -> Union['EventResult', list['Event']]: key_stack + [key], self.variable_map, self.dataflow, - _id=self._id) + _id=self._id, + collect_target=ct) - for target, key in zip(targets, keys)] + for target, key, ct in zip(targets, keys, collect_targets)] @dataclass class EventResult(): diff --git a/src/cascade/dataflow/operator.py b/src/cascade/dataflow/operator.py index c084dd2..6fca4d6 100644 --- a/src/cascade/dataflow/operator.py +++ b/src/cascade/dataflow/operator.py @@ -1,5 +1,6 @@ +from abc import ABC from typing import Any, Generic, Protocol, Type, TypeVar -from cascade.dataflow.dataflow import DataFlow, InvokeMethod +from cascade.dataflow.dataflow import DataFlow, InvokeMethod, Operator T = TypeVar('T') @@ -26,7 +27,7 @@ def __call__(self, variable_map: dict[str, Any], state: T, key_stack: list[str]) """@private""" -class StatefulOperator(Generic[T]): +class StatefulOperator(Generic[T], Operator): """An abstraction for a user-defined python class. A StatefulOperator handles incoming events, such as @@ -38,7 +39,7 @@ class StatefulOperator(Generic[T]): methods, instead reading and modifying the underlying class `T` through a state variable, see `handle_invoke_method`. """ - def __init__(self, cls: Type[T], methods: dict[str, MethodCall[T]], dataflows: dict[str, DataFlow]): + def __init__(self, entity: Type[T], methods: dict[str, MethodCall[T]], dataflows: dict[str, DataFlow]): """Create the StatefulOperator from a class and its compiled methods. Typically, a class could be comprised of split and non-split methods. Take the following example: @@ -90,14 +91,14 @@ def user_buy_item_1(variable_map: dict[str, Any], state: User, key_stack: list[s """ # methods maps function name to a function. Ideally this is done once in the object self._methods = methods - self._cls = cls + self.entity = entity self.dataflows = dataflows """A mapping from method names to DataFlows""" def handle_init_class(self, *args, **kwargs) -> T: """Create an instance of the underlying class. Equivalent to `T.__init__(*args, **kwargs)`.""" - return self._cls(*args, **kwargs) + return self.entity(*args, **kwargs) def handle_invoke_method(self, method: InvokeMethod, variable_map: dict[str, Any], state: T, key_stack: list[str]) -> dict[str, Any]: """Invoke the method of the underlying class. @@ -108,4 +109,27 @@ def handle_invoke_method(self, method: InvokeMethod, variable_map: dict[str, Any The state `T` and key_stack is passed along to the function, and may be modified. """ return self._methods[method.method_name](variable_map=variable_map, state=state, key_stack=key_stack) - \ No newline at end of file + + +class StatelessMethodCall(Protocol): + def __call__(self, variable_map: dict[str, Any], key_stack: list[str]) -> Any: ... + """@private""" + + +class StatelessOperator(Operator): + """A StatelessOperator refers to a stateless function and therefore only has + one dataflow.""" + def __init__(self, methods: dict[str, StatelessMethodCall], dataflow: DataFlow): + self._methods = methods + self.dataflow = dataflow + + def handle_invoke_method(self, method: InvokeMethod, variable_map: dict[str, Any], key_stack: list[str]) -> dict[str, Any]: + """Invoke the method of the underlying class. + + The `cascade.dataflow.dataflow.InvokeMethod` object must contain a method identifier + that exists on the underlying compiled class functions. + + The state `T` and key_stack is passed along to the function, and may be modified. + """ + return self._methods[method.method_name](variable_map=variable_map, key_stack=key_stack) + diff --git a/src/cascade/runtime/flink_runtime.py b/src/cascade/runtime/flink_runtime.py index 34f0c36..927d785 100644 --- a/src/cascade/runtime/flink_runtime.py +++ b/src/cascade/runtime/flink_runtime.py @@ -9,8 +9,8 @@ from pyflink.datastream.connectors.kafka import KafkaOffsetsInitializer, KafkaRecordSerializationSchema, KafkaSource, KafkaSink from pyflink.datastream import ProcessFunction, StreamExecutionEnvironment import pickle -from cascade.dataflow.dataflow import Event, EventResult, Filter, InitClass, InvokeMethod, MergeNode, Node, OpNode, SelectAllNode -from cascade.dataflow.operator import StatefulOperator +from cascade.dataflow.dataflow import Arrived, CollectNode, CollectTarget, Event, EventResult, Filter, InitClass, InvokeMethod, MergeNode, Node, NotArrived, OpNode, Operator, Result, SelectAllNode +from cascade.dataflow.operator import StatefulOperator, StatelessOperator from confluent_kafka import Producer import logging @@ -47,8 +47,13 @@ def open(self, runtime_context: RuntimeContext): def process_element(self, event: Event, ctx: KeyedProcessFunction.Context): key_stack = event.key_stack - assert(isinstance(event.target, OpNode)) # should be handled by filters on this FlinkOperator - logger.debug(f"FlinkOperator {event.target.cls.__name__}[{ctx.get_current_key()}]: Processing: {event}") + + # should be handled by filters on this FlinkOperator + assert(isinstance(event.target, OpNode)) + assert(isinstance(event.target.operator, StatefulOperator)) + assert(event.target.operator.entity == self.operator.entity) + + logger.debug(f"FlinkOperator {self.operator.entity.__name__}[{ctx.get_current_key()}]: Processing: {event.target.method_type}") if isinstance(event.target.method_type, InitClass): # TODO: compile __init__ with only kwargs, and pass the variable_map itself # otherwise, order of variable_map matters for variable assignment @@ -56,13 +61,13 @@ def process_element(self, event: Event, ctx: KeyedProcessFunction.Context): # Register the created key in FlinkSelectAllOperator register_key_event = Event( - FlinkRegisterKeyNode(key_stack[-1], self.operator._cls), # problem is that this id goes up when we don't rly watn it + FlinkRegisterKeyNode(key_stack[-1], self.operator.entity), [], {}, None, _id = event._id ) - logger.debug(f"FlinkOperator {event.target.cls.__name__}[{ctx.get_current_key()}]: Registering key: {register_key_event}") + logger.debug(f"FlinkOperator {self.operator.entity.__name__}[{ctx.get_current_key()}]: Registering key: {register_key_event}") yield register_key_event # Pop this key from the key stack so that we exit @@ -87,10 +92,42 @@ def process_element(self, event: Event, ctx: KeyedProcessFunction.Context): new_events = event.propogate(key_stack, result) if isinstance(new_events, EventResult): - logger.debug(f"FlinkOperator {event.target.cls.__name__}[{ctx.get_current_key()}]: Returned {new_events}") + logger.debug(f"FlinkOperator {self.operator.entity.__name__}[{ctx.get_current_key()}]: Returned {new_events}") + yield new_events + else: + logger.debug(f"FlinkOperator {self.operator.entity.__name__}[{ctx.get_current_key()}]: Propogated {len(new_events)} new Events") + yield from new_events + +class FlinkStatelessOperator(ProcessFunction): + """Wraps an `cascade.dataflow.datflow.StatefulOperator` in a KeyedProcessFunction so that it can run in Flink. + """ + def __init__(self, operator: StatelessOperator) -> None: + self.state: ValueState = None # type: ignore (expect state to be initialised on .open()) + self.operator = operator + + + def process_element(self, event: Event, ctx: KeyedProcessFunction.Context): + key_stack = event.key_stack + + # should be handled by filters on this FlinkOperator + assert(isinstance(event.target, OpNode)) + assert(isinstance(event.target.operator, StatelessOperator)) + + logger.debug(f"FlinkStatelessOperator {self.operator.dataflow.name}[{event._id}]: Processing: {event.target.method_type}") + if isinstance(event.target.method_type, InvokeMethod): + result = self.operator.handle_invoke_method(event.target.method_type, variable_map=event.variable_map, key_stack=key_stack) + else: + raise Exception(f"A StatelessOperator cannot compute event type: {event.target.method_type}") + + if event.target.assign_result_to is not None: + event.variable_map[event.target.assign_result_to] = result + + new_events = event.propogate(key_stack, result) + if isinstance(new_events, EventResult): + logger.debug(f"FlinkStatelessOperator {self.operator.dataflow.name}[{event._id}]: Returned {new_events}") yield new_events else: - logger.debug(f"FlinkOperator {event.target.cls.__name__}[{ctx.get_current_key()}]: Propogated {len(new_events)} new Events") + logger.debug(f"FlinkStatelessOperator {self.operator.dataflow.name}[{event._id}]: Propogated {len(new_events)} new Events") yield from new_events class FlinkSelectAllOperator(KeyedProcessFunction): @@ -108,22 +145,77 @@ def process_element(self, event: Event, ctx: 'ProcessFunction.Context'): state = [] if isinstance(event.target, FlinkRegisterKeyNode): - logger.debug(f"SelectAllOperator [{event.target.cls.__name__}]: Processing: {event}") + logger.debug(f"SelectAllOperator [{event.target.cls.__name__}]: Registering key: {event.target.key}") state.append(event.target.key) self.state.update(state) elif isinstance(event.target, SelectAllNode): - logger.debug(f"SelectAllOperator [{event.target.cls.__name__}]: Processing: {event}") + logger.debug(f"SelectAllOperator [{event.target.cls.__name__}]: Selecting all") # Yield all the keys we now about event.key_stack.append(state) + num_events = len(state) + + # Propogate the event to the next node new_events = event.propogate(event.key_stack, None) - logger.debug(f"SelectAllOperator [{event.target.cls.__name__}]: Propogated {len(new_events)} events") + assert num_events == len(new_events) + + logger.debug(f"SelectAllOperator [{event.target.cls.__name__}]: Propogated {num_events} events with target: {event.target.collect_target}") yield from new_events else: raise Exception(f"Unexpected target for SelectAllOperator: {event.target}") +class FlinkCollectOperator(KeyedProcessFunction): + """Flink implementation of a merge operator.""" + def __init__(self): #, merge_node: MergeNode) -> None: + self.collection: ValueState = None # type: ignore (expect state to be initialised on .open()) + #self.node = merge_node + + def open(self, runtime_context: RuntimeContext): + descriptor = ValueStateDescriptor("merge_state", Types.PICKLED_BYTE_ARRAY()) + self.collection = runtime_context.get_state(descriptor) + + def process_element(self, event: Event, ctx: KeyedProcessFunction.Context): + collection: list[Result] = self.collection.value() + logger.debug(f"FlinkCollectOp [{ctx.get_current_key()}]: Processing: {event}") + + # for now we assume there is only 1 merge per df + assert event.collect_target is not None + entry: CollectTarget = event.collect_target + target_node: CollectNode = entry.target_node + + # Add to the map + if collection == None: + logger.debug(f"FlinkCollectOp [{ctx.get_current_key()}]: Creating map") + collection = [NotArrived()] * entry.total_items + logger.debug(f"FlinkCollectOp [{ctx.get_current_key()}]: Processed event {entry.result_idx} ({entry.total_items})") + + result = None + try: + result = event.variable_map[target_node.read_results_from] + except KeyError: + pass + + collection[entry.result_idx] = Arrived(result) + self.collection.update(collection) + + # Yield events if the merge is done + if all([isinstance(r, Arrived) for r in collection]): + logger.debug(f"FlinkCollectOp [{ctx.get_current_key()}]: Yielding collection") + + collection = [r.val for r in collection] # type: ignore (r is of type Arrived) + event.variable_map[target_node.assign_result_to] = collection + new_events = event.propogate(event.key_stack, collection) + + self.collection.clear() + if isinstance(new_events, EventResult): + logger.debug(f"FlinkCollectOp [{ctx.get_current_key()}]: Returned {new_events}") + yield new_events + else: + logger.debug(f"FlinkCollectOp [{ctx.get_current_key()}]: Propogated {len(new_events)} new Events") + yield from new_events + class FlinkMergeOperator(KeyedProcessFunction): """Flink implementation of a merge operator.""" def __init__(self) -> None: @@ -302,31 +394,55 @@ def init(self, kafka_broker="localhost:9092", bundle_time=1, bundle_size=5): event_stream = select_all_stream.union(not_select_all_stream) + operator_stream = event_stream.filter(lambda e: isinstance(e.target, OpNode)) - self.stateful_op_stream = event_stream.filter(lambda e: isinstance(e.target, OpNode)) - """Stream that ingests events with an `cascade.dataflow.dataflow.OpNode` target""" - + self.stateful_op_stream = ( + operator_stream + .filter(lambda e: isinstance(e.target.operator, StatefulOperator)) + ) + + self.stateless_op_stream = ( + operator_stream + .filter(lambda e: isinstance(e.target.operator, StatelessOperator)) + ) + + # self.merge_op_stream = ( + # event_stream.filter(lambda e: isinstance(e.target, MergeNode)) + # .key_by(lambda e: e._id) # might not work in the future if we have multiple merges in one dataflow? + # .process(FlinkMergeOperator()) + # ) self.merge_op_stream = ( - event_stream.filter(lambda e: isinstance(e.target, MergeNode)) + event_stream.filter(lambda e: isinstance(e.target, CollectNode)) .key_by(lambda e: e._id) # might not work in the future if we have multiple merges in one dataflow? - .process(FlinkMergeOperator()) + .process(FlinkCollectOperator()) ) """Stream that ingests events with an `cascade.dataflow.dataflow.MergeNode` target""" + self.stateless_op_streams = [] self.stateful_op_streams = [] """List of stateful operator streams, which gets appended at `add_operator`.""" self.producer = Producer({'bootstrap.servers': kafka_broker}) - def add_operator(self, op: FlinkOperator): + def add_operator(self, flink_op: FlinkOperator): """Add a `FlinkOperator` to the Flink datastream.""" + op_stream = ( - self.stateful_op_stream.filter(lambda e: e.target.cls == op.operator._cls) + self.stateful_op_stream.filter(lambda e: e.target.operator.entity == flink_op.operator.entity) .key_by(lambda e: e.key_stack[-1]) - .process(op) + .process(flink_op) ) self.stateful_op_streams.append(op_stream) + def add_stateless_operator(self, flink_op: FlinkStatelessOperator): + """Add a `FlinkStatlessOperator` to the Flink datastream.""" + + op_stream = ( + self.stateless_op_stream + .process(flink_op) + ) + self.stateless_op_streams.append(op_stream) + def send(self, event: Event, flush=False): """Send an event to the Kafka source. Once `run` has been called, the Flink runtime will start ingesting these @@ -347,7 +463,7 @@ def run(self, run_async=False, collect=False) -> Union[CloseableIterator, None]: assert self.env is not None, "FlinkRuntime must first be initialised with `init()`." # Combine all the operator streams - operator_streams = self.merge_op_stream.union(*self.stateful_op_streams) + operator_streams = self.merge_op_stream.union(*self.stateful_op_streams).union(*self.stateless_op_streams) # Add filtering for nodes with a `Filter` target full_stream_filtered = ( diff --git a/src/cascade/runtime/test_global_state.py b/src/cascade/runtime/test_global_state.py index 45d1e32..5ebd413 100644 --- a/src/cascade/runtime/test_global_state.py +++ b/src/cascade/runtime/test_global_state.py @@ -4,45 +4,16 @@ import math import random from dataclasses import dataclass -from typing import Any, Type +from typing import Any -from pyflink.common import Configuration -from pyflink.datastream import ProcessFunction, StreamExecutionEnvironment -from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer, FlinkKafkaProducer from pyflink.datastream.data_stream import CloseableIterator -from cascade.dataflow.dataflow import DataFlow, Edge, Event, EventResult, Filter, InitClass, OpNode, SelectAllNode -from cascade.dataflow.operator import StatefulOperator -from cascade.runtime.flink_runtime import ByteSerializer, FlinkOperator, FlinkRegisterKeyNode, FlinkRuntime, FlinkSelectAllOperator +from cascade.dataflow.dataflow import CollectNode, DataFlow, Edge, Event, EventResult, Filter, InitClass, InvokeMethod, MergeNode, OpNode, SelectAllNode +from cascade.dataflow.operator import StatefulOperator, StatelessOperator +from cascade.runtime.flink_runtime import FlinkOperator, FlinkRuntime, FlinkStatelessOperator from confluent_kafka import Producer -import os -import cloudpickle # problems with pickling functions (e.g. lambdas)? use cloudcloudpickle -import logging import time -def add_kafka_source(env: StreamExecutionEnvironment, topics, broker="localhost:9092"): - kafka_jar = os.path.join(os.path.abspath(os.path.dirname(__file__)), - 'bin/flink-sql-connector-kafka-3.3.0-1.20.jar') - serializer_jar = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'bin/flink-kafka-bytes-serializer.jar') - - if os.name == 'nt': - env.add_jars(f"file:///{kafka_jar}",f"file://{serializer_jar}") - else: - env.add_jars(f"file://{kafka_jar}",f"file://{serializer_jar}") - - deserialization_schema = ByteSerializer() - properties: dict = { - "bootstrap.servers": broker, - "auto.offset.reset": "earliest", - "group.id": "test_group_1", - } - kafka_consumer = FlinkKafkaConsumer(topics, deserialization_schema, properties) - return env.add_source(kafka_consumer) - -def dbg(e, msg=""): - print(msg + str(e)) - return e - @dataclass class Geo: x: int @@ -53,6 +24,9 @@ def __init__(self, name: str, loc: Geo): self.name = name self.loc = loc + def get_name(self) -> str: + return self.name + def distance(self, loc: Geo) -> float: return math.sqrt((self.loc.x - loc.x) ** 2 + (self.loc.y - loc.y) ** 2) @@ -61,27 +35,84 @@ def __repr__(self) -> str: def distance_compiled(variable_map: dict[str, Any], state: Hotel, key_stack: list[str]) -> Any: + key_stack.pop() loc = variable_map["loc"] return math.sqrt((state.loc.x - loc.x) ** 2 + (state.loc.y - loc.y) ** 2) +def get_name_compiled(variable_map: dict[str, Any], state: Hotel, key_stack: list[str]) -> Any: + key_stack.pop() + return state.name +hotel_op = StatefulOperator(Hotel, + {"distance": distance_compiled, + "get_name": get_name_compiled}, {}) -# We compile just the predicate, the select is implemented using a selectall node -def get_nearby_predicate_compiled(variable_map: dict[str, Any], state: Hotel) -> bool: - return state.distance(variable_map["loc"]) < variable_map["dist"] -hotel_op = StatefulOperator(Hotel, {"distance": distance_compiled}, {}) + +def get_nearby(hotels: list[Hotel], loc: Geo, dist: float): + return [hotel.get_name() for hotel in hotels if hotel.distance(loc) < dist] + + +# We compile just the predicate, the select is implemented using a selectall node +def get_nearby_predicate_compiled_0(variable_map: dict[str, Any], key_stack: list[str]): + # the top of the key_stack is already the right key, so in this case we don't need to do anything + # loc = variable_map["loc"] + # we need the hotel_key for later. (body_compiled_0) + variable_map["hotel_key"] = key_stack[-1] + pass + +def get_nearby_predicate_compiled_1(variable_map: dict[str, Any], key_stack: list[str]) -> bool: + loc = variable_map["loc"] + dist = variable_map["dist"] + hotel_dist = variable_map["hotel_distance"] + # key_stack.pop() # shouldn't pop because this functino is stateless + return hotel_dist < dist + +def get_nearby_body_compiled_0(variable_map: dict[str, Any], key_stack: list[str]): + key_stack.append(variable_map["hotel_key"]) + +def get_nearby_body_compiled_1(variable_map: dict[str, Any], key_stack: list[str]) -> str: + return variable_map["hotel_name"] + +get_nearby_op = StatelessOperator({ + "get_nearby_predicate_compiled_0": get_nearby_predicate_compiled_0, + "get_nearby_predicate_compiled_1": get_nearby_predicate_compiled_1, + "get_nearby_body_compiled_0": get_nearby_body_compiled_0, + "get_nearby_body_compiled_1": get_nearby_body_compiled_1, +}, None) + +# dataflow for getting all hotels within region +df = DataFlow("get_nearby") +n7 = CollectNode("get_nearby_result", "get_nearby_body") +n0 = SelectAllNode(Hotel, n7) +n1 = OpNode(get_nearby_op, InvokeMethod("get_nearby_predicate_compiled_0")) +n2 = OpNode(hotel_op, InvokeMethod("distance"), assign_result_to="hotel_distance") +n3 = OpNode(get_nearby_op, InvokeMethod("get_nearby_predicate_compiled_1"), is_conditional=True) +n4 = OpNode(get_nearby_op, InvokeMethod("get_nearby_body_compiled_0")) +n5 = OpNode(hotel_op, InvokeMethod("get_name"), assign_result_to="hotel_name") +n6 = OpNode(get_nearby_op, InvokeMethod("get_nearby_body_compiled_1"), assign_result_to="get_nearby_body") + +df.add_edge(Edge(n0, n1)) +df.add_edge(Edge(n1, n2)) +df.add_edge(Edge(n2, n3)) +df.add_edge(Edge(n3, n4, if_conditional=True)) +df.add_edge(Edge(n3, n7, if_conditional=False)) +df.add_edge(Edge(n4, n5)) +df.add_edge(Edge(n5, n6)) +df.add_edge(Edge(n6, n7)) +get_nearby_op.dataflow = df def test_nearby_hotels(): runtime = FlinkRuntime("test_nearby_hotels") runtime.init() runtime.add_operator(FlinkOperator(hotel_op)) + runtime.add_stateless_operator(FlinkStatelessOperator(get_nearby_op)) # Create Hotels hotels = [] - init_hotel = OpNode(Hotel, InitClass()) + init_hotel = OpNode(hotel_op, InitClass()) random.seed(42) - for i in range(50): + for i in range(5): coord_x = random.randint(-10, 10) coord_y = random.randint(-10, 10) hotel = Hotel(f"h_{i}", Geo(coord_x, coord_y)) @@ -98,19 +129,15 @@ def wait_for_event_id(id: int) -> EventResult: if record.event_id == id: return record + print("creating hotels") # Wait for hotels to be created wait_for_event_id(event._id) - - # GET NEARBY - # dataflow for getting all hotels within region - df = DataFlow("get_nearby_hotels") - n0 = SelectAllNode(Hotel) - n1 = OpNode(Hotel, Filter(get_nearby_predicate_compiled)) - df.add_edge(Edge(n0, n1)) + time.sleep(1) # wait for all hotels to be registered dist = 5 loc = Geo(0, 0) - event = Event(n0, [], {"loc": loc, "dist": dist}, df) + # because of how the key stack works, we need to supply a key here + event = Event(n0, ["workaround_key"], {"loc": loc, "dist": dist}, df) runtime.send(event, flush=True) nearby = [] @@ -118,8 +145,7 @@ def wait_for_event_id(id: int) -> EventResult: if hotel.distance(loc) < dist: nearby.append(hotel.name) - sol = wait_for_event_id(event._id) + event_result = wait_for_event_id(event._id) + results = [r for r in event_result.result if r != None] print(nearby) - print(sol) - print(records) - assert sol.result in nearby + assert set(results) == set(nearby) From d52954edb8598fc60d0f14a717f24485529ed56c Mon Sep 17 00:00:00 2001 From: lucasvanmol Date: Fri, 20 Dec 2024 13:18:45 +0100 Subject: [PATCH 4/9] Add datastream names --- src/cascade/runtime/flink_runtime.py | 26 ++++++++++++++++-------- src/cascade/runtime/test_global_state.py | 4 ++-- 2 files changed, 20 insertions(+), 10 deletions(-) diff --git a/src/cascade/runtime/flink_runtime.py b/src/cascade/runtime/flink_runtime.py index 927d785..bbe00d2 100644 --- a/src/cascade/runtime/flink_runtime.py +++ b/src/cascade/runtime/flink_runtime.py @@ -285,7 +285,7 @@ def __init__(self): class FlinkRuntime(): """A Runtime that runs Dataflows on Flink.""" - def __init__(self, topic="input-topic"): + def __init__(self, topic="input-topic", ui_port: Optional[int] = None): self.env: Optional[StreamExecutionEnvironment] = None """@private""" @@ -302,6 +302,11 @@ def __init__(self, topic="input-topic"): tests. """ + self.ui_port = ui_port + """The port to run the Flink web UI on. + + Warning that this does not work well with run(collect=True)!""" + def init(self, kafka_broker="localhost:9092", bundle_time=1, bundle_size=5): """Initialise & configure the Flink runtime. @@ -323,7 +328,8 @@ def init(self, kafka_broker="localhost:9092", bundle_time=1, bundle_size=5): """ config = Configuration() # Add the Flink Web UI at http://localhost:8081 - # config.set_string("rest.port", "8081") + if self.ui_port is not None: + config.set_string("rest.port", str(self.ui_port)) config.set_integer("python.fn-execution.bundle.time", bundle_time) config.set_integer("python.fn-execution.bundle.size", bundle_size) @@ -374,9 +380,10 @@ def init(self, kafka_broker="localhost:9092", bundle_time=1, bundle_size=5): "Kafka Source" ) .map(lambda x: pickle.loads(x)) - # .filter(lambda e: isinstance(e, Event)) # Enforced by `add_operator` type safety + .name("DESERIALIZE") + # .filter(lambda e: isinstance(e, Event)) # Enforced by `send` type safety ) - + # Events with a `SelectAllNode` will first be processed by the select # all operator, which will send out multiple other Events that can # then be processed by operators in the same steam. @@ -384,7 +391,7 @@ def init(self, kafka_broker="localhost:9092", bundle_time=1, bundle_size=5): event_stream.filter(lambda e: isinstance(e.target, SelectAllNode) or isinstance(e.target, FlinkRegisterKeyNode)) .key_by(lambda e: e.target.cls) - .process(FlinkSelectAllOperator()) + .process(FlinkSelectAllOperator()).name("SELECT ALL OP") ) """Stream that ingests events with an `SelectAllNode` or `FlinkRegisterKeyNode`""" not_select_all_stream = ( @@ -392,9 +399,9 @@ def init(self, kafka_broker="localhost:9092", bundle_time=1, bundle_size=5): not (isinstance(e.target, SelectAllNode) or isinstance(e.target, FlinkRegisterKeyNode))) ) - event_stream = select_all_stream.union(not_select_all_stream) + event_stream_2 = select_all_stream.union(not_select_all_stream) - operator_stream = event_stream.filter(lambda e: isinstance(e.target, OpNode)) + operator_stream = event_stream_2.filter(lambda e: isinstance(e.target, OpNode)).name("OPERATOR STREAM") self.stateful_op_stream = ( operator_stream @@ -415,6 +422,7 @@ def init(self, kafka_broker="localhost:9092", bundle_time=1, bundle_size=5): event_stream.filter(lambda e: isinstance(e.target, CollectNode)) .key_by(lambda e: e._id) # might not work in the future if we have multiple merges in one dataflow? .process(FlinkCollectOperator()) + .name("Collect") ) """Stream that ingests events with an `cascade.dataflow.dataflow.MergeNode` target""" @@ -431,6 +439,7 @@ def add_operator(self, flink_op: FlinkOperator): self.stateful_op_stream.filter(lambda e: e.target.operator.entity == flink_op.operator.entity) .key_by(lambda e: e.key_stack[-1]) .process(flink_op) + .name("STATEFUL OP: " + flink_op.operator.entity.__name__) ) self.stateful_op_streams.append(op_stream) @@ -440,6 +449,7 @@ def add_stateless_operator(self, flink_op: FlinkStatelessOperator): op_stream = ( self.stateless_op_stream .process(flink_op) + .name("STATELESS DATAFLOW: " + flink_op.operator.dataflow.name) ) self.stateless_op_streams.append(op_stream) @@ -482,7 +492,7 @@ def run(self, run_async=False, collect=False) -> Union[CloseableIterator, None]: ds_external = ds.filter(lambda e: isinstance(e, EventResult)).execute_and_collect() else: ds_external = ds.filter(lambda e: isinstance(e, EventResult)).print() #.add_sink(kafka_external_sink) - ds_internal = ds.filter(lambda e: isinstance(e, Event)).sink_to(self.kafka_internal_sink) + ds_internal = ds.filter(lambda e: isinstance(e, Event)).sink_to(self.kafka_internal_sink).name("INTERNAL KAFKA SINK") if run_async: self.env.execute_async("Cascade: Flink Runtime") diff --git a/src/cascade/runtime/test_global_state.py b/src/cascade/runtime/test_global_state.py index 5ebd413..c5ba7f9 100644 --- a/src/cascade/runtime/test_global_state.py +++ b/src/cascade/runtime/test_global_state.py @@ -112,7 +112,7 @@ def test_nearby_hotels(): hotels = [] init_hotel = OpNode(hotel_op, InitClass()) random.seed(42) - for i in range(5): + for i in range(20): coord_x = random.randint(-10, 10) coord_y = random.randint(-10, 10) hotel = Hotel(f"h_{i}", Geo(coord_x, coord_y)) @@ -148,4 +148,4 @@ def wait_for_event_id(id: int) -> EventResult: event_result = wait_for_event_id(event._id) results = [r for r in event_result.result if r != None] print(nearby) - assert set(results) == set(nearby) + assert set(results) == set(nearby) \ No newline at end of file From f9457d2aaa496ea564f3e362d8199ae060c444d2 Mon Sep 17 00:00:00 2001 From: lucasvanmol Date: Fri, 20 Dec 2024 15:04:09 +0100 Subject: [PATCH 5/9] Fix other tests --- README.md | 6 ++ src/cascade/dataflow/dataflow.py | 7 ++ src/cascade/runtime/flink_runtime.py | 5 ++ tests/integration/flink-runtime/common.py | 67 +++++++++++-------- .../flink-runtime/test_merge_operator.py | 6 +- .../flink-runtime/test_select_all.py | 19 +++++- .../flink-runtime/test_two_entities.py | 8 +-- 7 files changed, 80 insertions(+), 38 deletions(-) rename src/cascade/runtime/test_global_state.py => tests/integration/flink-runtime/test_select_all.py (88%) diff --git a/README.md b/README.md index 7129609..1b60e1b 100644 --- a/README.md +++ b/README.md @@ -16,6 +16,12 @@ Integration tests require Kafka to be running (with `docker compose up`): pytest -m integration ``` +To view with debug logs: + +``` +pytest -m integration -s --log-level=DEBUG +``` + ### Documentation diff --git a/src/cascade/dataflow/dataflow.py b/src/cascade/dataflow/dataflow.py index 013c5ee..11d6a0f 100644 --- a/src/cascade/dataflow/dataflow.py +++ b/src/cascade/dataflow/dataflow.py @@ -46,6 +46,8 @@ class OpNode(Node): assign_result_to: Optional[str] = None is_conditional: bool = False """Whether or not the boolean result of this node dictates the following path.""" + collect_target: Optional['CollectTarget'] = None + """Whether the result of this node should go to a CollectNode.""" @dataclass class SelectAllNode(Node): @@ -217,12 +219,17 @@ def propogate(self, key_stack, result) -> Union['EventResult', list['Event']]: if not isinstance(keys, list): keys = [keys] + collect_targets: list[Optional[CollectTarget]] # Events with SelectAllNodes need to be assigned a CollectTarget if isinstance(self.target, SelectAllNode): collect_targets = [ CollectTarget(self.target.collect_target, len(keys), i) for i in range(len(keys)) ] + elif isinstance(self.target, OpNode) and self.target.collect_target is not None: + collect_targets = [ + self.target.collect_target for i in range(len(keys)) + ] else: collect_targets = [self.collect_target for i in range(len(keys))] diff --git a/src/cascade/runtime/flink_runtime.py b/src/cascade/runtime/flink_runtime.py index bbe00d2..074b2ce 100644 --- a/src/cascade/runtime/flink_runtime.py +++ b/src/cascade/runtime/flink_runtime.py @@ -326,6 +326,7 @@ def init(self, kafka_broker="localhost:9092", bundle_time=1, bundle_size=5): processing the next bundle of elements. A larger value can improve throughput but at the cost of more memory usage and higher latency. """ + logger.debug("FlinkRuntime initializing...") config = Configuration() # Add the Flink Web UI at http://localhost:8081 if self.ui_port is not None: @@ -431,6 +432,7 @@ def init(self, kafka_broker="localhost:9092", bundle_time=1, bundle_size=5): """List of stateful operator streams, which gets appended at `add_operator`.""" self.producer = Producer({'bootstrap.servers': kafka_broker}) + logger.debug("FlinkRuntime initialized") def add_operator(self, flink_op: FlinkOperator): """Add a `FlinkOperator` to the Flink datastream.""" @@ -471,6 +473,7 @@ def run(self, run_async=False, collect=False) -> Union[CloseableIterator, None]: `cascade.dataflow.dataflow.EventResult`s.""" assert self.env is not None, "FlinkRuntime must first be initialised with `init()`." + logger.debug("FlinkRuntime merging operator streams...") # Combine all the operator streams operator_streams = self.merge_op_stream.union(*self.stateful_op_streams).union(*self.stateless_op_streams) @@ -495,7 +498,9 @@ def run(self, run_async=False, collect=False) -> Union[CloseableIterator, None]: ds_internal = ds.filter(lambda e: isinstance(e, Event)).sink_to(self.kafka_internal_sink).name("INTERNAL KAFKA SINK") if run_async: + logger.debug("FlinkRuntime starting (async)") self.env.execute_async("Cascade: Flink Runtime") return ds_external # type: ignore (will be CloseableIterator provided the source is unbounded (i.e. Kafka)) else: + logger.debug("FlinkRuntime starting (sync)") self.env.execute("Cascade: Flink Runtime") \ No newline at end of file diff --git a/tests/integration/flink-runtime/common.py b/tests/integration/flink-runtime/common.py index ef1b888..105fdbd 100644 --- a/tests/integration/flink-runtime/common.py +++ b/tests/integration/flink-runtime/common.py @@ -1,5 +1,5 @@ from typing import Any -from cascade.dataflow.dataflow import DataFlow, Edge, InvokeMethod, MergeNode, OpNode +from cascade.dataflow.dataflow import CollectNode, CollectTarget, DataFlow, Edge, InvokeMethod, MergeNode, OpNode from cascade.runtime.flink_runtime import StatefulOperator class User: @@ -71,14 +71,32 @@ def buy_2_items_0_compiled(variable_map: dict[str, Any], state: User, key_stack: def buy_2_items_1_compiled(variable_map: dict[str, Any], state: User, key_stack: list[str]) -> Any: key_stack.pop() - state.balance -= variable_map["item1_price"] + variable_map["item2_price"] + state.balance -= variable_map["item_prices"][0] + variable_map["item_prices"][1] return state.balance >= 0 +# An operator is defined by the underlying class and the functions that can be called +user_op = StatefulOperator( + User, + { + "update_balance": update_balance_compiled, + "get_balance": get_balance_compiled, + "buy_item_0": buy_item_0_compiled, + "buy_item_1": buy_item_1_compiled, + "buy_2_items_0": buy_2_items_0_compiled, + "buy_2_items_1": buy_2_items_1_compiled + }, + None) + +item_op = StatefulOperator( + Item, {"get_price": get_price_compiled}, None +) + + def user_buy_item_df(): df = DataFlow("user.buy_item") - n0 = OpNode(User, InvokeMethod("buy_item_0")) - n1 = OpNode(Item, InvokeMethod("get_price"), assign_result_to="item_price") - n2 = OpNode(User, InvokeMethod("buy_item_1")) + n0 = OpNode(user_op, InvokeMethod("buy_item_0")) + n1 = OpNode(item_op, InvokeMethod("get_price"), assign_result_to="item_price") + n2 = OpNode(user_op, InvokeMethod("buy_item_1")) df.add_edge(Edge(n0, n1)) df.add_edge(Edge(n1, n2)) df.entry = n0 @@ -86,11 +104,21 @@ def user_buy_item_df(): def user_buy_2_items_df(): df = DataFlow("user.buy_2_items") - n0 = OpNode(User, InvokeMethod("buy_2_items_0")) - n1 = OpNode(Item, InvokeMethod("get_price"), assign_result_to="item1_price") - n2 = OpNode(Item, InvokeMethod("get_price"), assign_result_to="item2_price") - n3 = MergeNode() - n4 = OpNode(User, InvokeMethod("buy_2_items_1")) + n0 = OpNode(user_op, InvokeMethod("buy_2_items_0")) + n3 = CollectNode(assign_result_to="item_prices", read_results_from="item_price") + n1 = OpNode( + item_op, + InvokeMethod("get_price"), + assign_result_to="item_price", + collect_target=CollectTarget(n3, 2, 0) + ) + n2 = OpNode( + item_op, + InvokeMethod("get_price"), + assign_result_to="item_price", + collect_target=CollectTarget(n3, 2, 1) + ) + n4 = OpNode(user_op, InvokeMethod("buy_2_items_1")) df.add_edge(Edge(n0, n1)) df.add_edge(Edge(n0, n2)) df.add_edge(Edge(n1, n3)) @@ -99,22 +127,7 @@ def user_buy_2_items_df(): df.entry = n0 return df -# An operator is defined by the underlying class and the functions that can be called -user_op = StatefulOperator( - User, - { - "update_balance": update_balance_compiled, - "get_balance": get_balance_compiled, - "buy_item_0": buy_item_0_compiled, - "buy_item_1": buy_item_1_compiled, - "buy_2_items_0": buy_2_items_0_compiled, - "buy_2_items_1": buy_2_items_1_compiled - }, - { +user_op.dataflows = { "buy_2_items": user_buy_2_items_df(), "buy_item": user_buy_item_df() - }) - -item_op = StatefulOperator( - Item, {"get_price": get_price_compiled}, None -) + } \ No newline at end of file diff --git a/tests/integration/flink-runtime/test_merge_operator.py b/tests/integration/flink-runtime/test_merge_operator.py index ac3cdc0..d136d99 100644 --- a/tests/integration/flink-runtime/test_merge_operator.py +++ b/tests/integration/flink-runtime/test_merge_operator.py @@ -16,13 +16,13 @@ def test_merge_operator(): # Create a User object foo_user = User("foo", 100) - init_user_node = OpNode(User, InitClass()) + init_user_node = OpNode(user_op, InitClass()) event = Event(init_user_node, ["foo"], {"key": "foo", "balance": 100}, None) runtime.send(event) # Create an Item object fork_item = Item("fork", 5) - init_item_node = OpNode(Item, InitClass()) + init_item_node = OpNode(item_op, InitClass()) event = Event(init_item_node, ["fork"], {"key": "fork", "price": 5}, None) runtime.send(event) @@ -58,7 +58,7 @@ def wait_for_event_id(id: int) -> EventResult: assert buy_fork_result.result == True # Send an event to check if the balance was updated - user_get_balance_node = OpNode(User, InvokeMethod("get_balance")) + user_get_balance_node = OpNode(user_op, InvokeMethod("get_balance")) user_get_balance = Event(user_get_balance_node, ["foo"], {}, None) runtime.send(user_get_balance, flush=True) diff --git a/src/cascade/runtime/test_global_state.py b/tests/integration/flink-runtime/test_select_all.py similarity index 88% rename from src/cascade/runtime/test_global_state.py rename to tests/integration/flink-runtime/test_select_all.py index c5ba7f9..62c371e 100644 --- a/src/cascade/runtime/test_global_state.py +++ b/tests/integration/flink-runtime/test_select_all.py @@ -13,6 +13,7 @@ from cascade.runtime.flink_runtime import FlinkOperator, FlinkRuntime, FlinkStatelessOperator from confluent_kafka import Producer import time +import pytest @dataclass class Geo: @@ -65,7 +66,7 @@ def get_nearby_predicate_compiled_1(variable_map: dict[str, Any], key_stack: lis loc = variable_map["loc"] dist = variable_map["dist"] hotel_dist = variable_map["hotel_distance"] - # key_stack.pop() # shouldn't pop because this functino is stateless + # key_stack.pop() # shouldn't pop because this function is stateless return hotel_dist < dist def get_nearby_body_compiled_0(variable_map: dict[str, Any], key_stack: list[str]): @@ -102,6 +103,7 @@ def get_nearby_body_compiled_1(variable_map: dict[str, Any], key_stack: list[str df.add_edge(Edge(n6, n7)) get_nearby_op.dataflow = df +@pytest.mark.integration def test_nearby_hotels(): runtime = FlinkRuntime("test_nearby_hotels") runtime.init() @@ -128,11 +130,22 @@ def wait_for_event_id(id: int) -> EventResult: print(f"Collected record: {record}") if record.event_id == id: return record + + def wait_for_n_records(num: int) -> list[EventResult]: + i = 0 + n_records = [] + for record in collected_iterator: + i += 1 + records.append(record) + n_records.append(record) + print(f"Collected record: {record}") + if i == num: + return n_records print("creating hotels") # Wait for hotels to be created - wait_for_event_id(event._id) - time.sleep(1) # wait for all hotels to be registered + wait_for_n_records(20) + time.sleep(3) # wait for all hotels to be registered dist = 5 loc = Geo(0, 0) diff --git a/tests/integration/flink-runtime/test_two_entities.py b/tests/integration/flink-runtime/test_two_entities.py index 905257d..9d2e0cf 100644 --- a/tests/integration/flink-runtime/test_two_entities.py +++ b/tests/integration/flink-runtime/test_two_entities.py @@ -13,17 +13,15 @@ def test_two_entities(): runtime.add_operator(FlinkOperator(item_op)) runtime.add_operator(FlinkOperator(user_op)) - # Create a User object foo_user = User("foo", 100) - init_user_node = OpNode(User, InitClass()) + init_user_node = OpNode(user_op, InitClass()) event = Event(init_user_node, ["foo"], {"key": "foo", "balance": 100}, None) - runtime.send(event) # Create an Item object fork_item = Item("fork", 5) - init_item_node = OpNode(Item, InitClass()) + init_item_node = OpNode(item_op, InitClass()) event = Event(init_item_node, ["fork"], {"key": "fork", "price": 5}, None) runtime.send(event) @@ -55,7 +53,7 @@ def wait_for_event_id(id: int) -> EventResult: assert buy_fork_result.result == True # Send an event to check if the balance was updated - user_get_balance_node = OpNode(User, InvokeMethod("get_balance")) + user_get_balance_node = OpNode(user_op, InvokeMethod("get_balance")) user_get_balance = Event(user_get_balance_node, ["foo"], {}, None) runtime.send(user_get_balance, flush=True) From ae1dd2be79cd1551e6070cfe527a97e44f26d069 Mon Sep 17 00:00:00 2001 From: lucasvanmol Date: Thu, 9 Jan 2025 16:05:21 +0100 Subject: [PATCH 6/9] Add deathstar bench --- deathstar/__init__.py | 0 deathstar/demo.py | 326 +++++++++++++++++++++++++++ deathstar/entities/__init__.py | 0 deathstar/entities/flight.py | 33 +++ deathstar/entities/hotel.py | 84 +++++++ deathstar/entities/recommendation.py | 137 +++++++++++ deathstar/entities/search.py | 85 +++++++ deathstar/entities/user.py | 79 +++++++ deathstar/test_demo.py | 36 +++ requirements.txt | 1 + src/cascade/dataflow/dataflow.py | 2 + src/cascade/runtime/flink_runtime.py | 5 +- 12 files changed, 786 insertions(+), 2 deletions(-) create mode 100644 deathstar/__init__.py create mode 100644 deathstar/demo.py create mode 100644 deathstar/entities/__init__.py create mode 100644 deathstar/entities/flight.py create mode 100644 deathstar/entities/hotel.py create mode 100644 deathstar/entities/recommendation.py create mode 100644 deathstar/entities/search.py create mode 100644 deathstar/entities/user.py create mode 100644 deathstar/test_demo.py diff --git a/deathstar/__init__.py b/deathstar/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/deathstar/demo.py b/deathstar/demo.py new file mode 100644 index 0000000..761ca96 --- /dev/null +++ b/deathstar/demo.py @@ -0,0 +1,326 @@ +import random +import sys +import os +import time +from confluent_kafka import Producer +import pickle +from timeit import default_timer as timer +from multiprocessing import Pool + +# import cascade +sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "../src"))) + +from cascade.dataflow.dataflow import Event, InitClass, InvokeMethod, OpNode +from cascade.runtime.flink_runtime import FlinkOperator, FlinkRuntime, FlinkStatelessOperator +from deathstar.entities.flight import Flight, flight_op +from deathstar.entities.hotel import Geo, Hotel, Rate, hotel_op +from deathstar.entities.recommendation import Recommendation, recommend_op +from deathstar.entities.search import Search, search_op +from deathstar.entities.user import User, user_op + + +class DeathStarDemo(): + def __init__(self, name): + self.init_user = OpNode(user_op, InitClass()) + self.init_hotel = OpNode(hotel_op, InitClass()) + self.init_flight = OpNode(flight_op, InitClass()) + self.runtime = FlinkRuntime(name) + + def init_runtime(self): + self.runtime.init(bundle_time=100, bundle_size=1000) + self.runtime.add_operator(FlinkOperator(hotel_op)) + self.runtime.add_operator(FlinkOperator(flight_op)) + self.runtime.add_operator(FlinkOperator(user_op)) + self.runtime.add_stateless_operator(FlinkStatelessOperator(search_op)) + self.runtime.add_stateless_operator(FlinkStatelessOperator(recommend_op)) + + + def populate(self): + # Create locations & rates for hotels + geos = [] + geos.append(Geo(37.7867, 0)) + geos.append(Geo(37.7854, -122.4005)) + geos.append(Geo(37.7867, -122.4071)) + geos.append(Geo(37.7936, -122.3930)) + geos.append(Geo(37.7831, -122.4181)) + geos.append(Geo(37.7863, -122.4015)) + + for i in range(6, 100): + lat: float = 37.7835 + i / 500.0 * 3 + lon: float = -122.41 + i / 500.0 * 4 + geos.append(Geo(lat, lon)) + + rates = {} + rates[1] = Rate(1, "RACK", + "2015-04-09", + "2015-04-10", + { "BookableRate": 190.0, + "Code": "KNG", + "RoomDescription": "King sized bed", + "TotalRate": 109.0, + "TotalRateInclusive": 123.17}) + + rates[2] = Rate(2, "RACK", + "2015-04-09", + "2015-04-10", + { "BookableRate": 139.0, + "Code": "QN", + "RoomDescription": "Queen sized bed", + "TotalRate": 139.0, + "TotalRateInclusive": 153.09}) + + rates[3] = Rate(3, "RACK", + "2015-04-09", + "2015-04-10", + { "BookableRate": 109.0, + "Code": "KNG", + "RoomDescription": "King sized bed", + "TotalRate": 109.0, + "TotalRateInclusive": 123.17}) + + for i in range(4, 80): + if i % 3 == 0: + hotel_id = i + end_date = "2015-04-" + rate = 109.0 + rate_inc = 123.17 + if i % 2 == 0: + end_date += '17' + else: + end_date += '24' + if i % 5 == 1: + rate = 120.0 + rate_inc = 140.0 + elif i % 5 == 2: + rate = 124.0 + rate_inc = 144.0 + elif i % 5 == 3: + rate = 132.0 + rate_inc = 158.0 + elif i % 5 == 4: + rate = 232.0 + rate_inc = 258.0 + + rates[hotel_id] = Rate(i, "RACK", + "2015-04-09", + end_date, + { "BookableRate": rate, + "Code": "KNG", + "RoomDescription": "King sized bed", + "TotalRate": rate, + "TotalRateInclusive": rate_inc}) + + # we don't create recommendations, because it doesn't really + # correspond to an entity + prices = [] + + prices.append(150.00) + prices.append(120.00) + prices.append(190.00) + prices.append(160.00) + prices.append(140.00) + prices.append(200.00) + + for i in range(6, 100): + price = 179.00 + if i % 3 == 0: + if i % 5 == 0: + price = 123.17 + elif i % 5 == 1: + price = 140.00 + elif i % 5 == 2: + price = 144.00 + elif i % 5 == 3: + price = 158.00 + elif i % 5 == 4: + price = 258.00 + + prices.append(price) + + # populate users + self.users = [User(f"Cornell_{i}", str(i) * 10) for i in range(501)] + for user in self.users: + event = Event(self.init_user, [user.id], {"user_id": user.id, "password": user.password}, None) + self.runtime.send(event) + + # populate hotels + self.hotels: list[Hotel] = [] + for i in range(100): + geo = geos[i] + rate = rates[i] if i in rates else [] + price = prices[i] + hotel = Hotel(str(i), 10, geo, rate, price) + self.hotels.append(hotel) + event = Event(self.init_hotel, [hotel.key], + { + "key": hotel.key, + "cap": hotel.cap, + "geo": hotel.geo, + "rates": hotel.rates, + "price": hotel.price + }, None) + self.runtime.send(event) + + # populate flights + self.flights = [Flight(str(i), 10) for i in range(100)] + for flight in self.flights[:-1]: + event = Event(self.init_flight, [flight.id], { + "id": flight.id, + "cap": flight.cap + }, None) + self.runtime.send(event) + flight = self.flights[-1] + event = Event(self.init_flight, [flight.id], { + "id": flight.id, + "cap": flight.cap + }, None) + self.runtime.send(event, flush=True) + +class Client: + def __init__(self, topic="input-topic", kafka_broker="localhost:9092"): + self.producer = Producer({'bootstrap.servers': kafka_broker}) + self.topic = topic + + def send(self, event: Event, flush=False): + """Send an event to the Kafka source. + Once `run` has been called, the Flink runtime will start ingesting these + messages. Messages can always be sent after `init` is called - Flink + will continue ingesting messages after `run` is called asynchronously. + """ + self.producer.produce(self.topic, value=pickle.dumps(event)) + if flush: + self.producer.flush() + + def search_hotel(self): + in_date = random.randint(9, 23) + out_date = random.randint(in_date + 1, 24) + + if in_date < 10: + in_date_str = f"2015-04-0{in_date}" + else: + in_date_str = f"2015-04-{in_date}" + if out_date < 10: + out_date_str = f"2015-04-0{out_date}" + else: + out_date_str = f"2015-04-{out_date}" + + lat = 38.0235 + (random.randint(0, 481) - 240.5) / 1000.0 + lon = -122.095 + (random.randint(0, 325) - 157.0) / 1000.0 + + # We don't really use the in_date, out_date information + event = Event(search_op.dataflow.entry, ["tempkey"], {"lat": lat, "lon": lon}, search_op.dataflow) + self.send(event) + + def recommend(self, req_param=None): + if req_param is None: + coin = random.random() + if coin < 0.5: + req_param = "distance" + else: + req_param = "price" + + lat = 38.0235 + (random.randint(0, 481) - 240.5) / 1000.0 + lon = -122.095 + (random.randint(0, 325) - 157.0) / 1000.0 + + event = Event(recommend_op.dataflow.entry, ["tempkey"], {"requirement": req_param, "lat": lat, "lon": lon}, recommend_op.dataflow) + self.send(event) + + def user_login(self): + user_id = random.randint(0, 500) + username = f"Cornell_{user_id}" + password = str(user_id) * 10 + event = Event(OpNode(user_op, InvokeMethod("login")), [username], {"password": password}, None) + self.send(event) + + def reserve(self): + hotel_id = random.randint(0, 99) + flight_id = random.randint(0, 99) + + # user = User("user1", "pass") + # user.order(flight, hotel) + user_id = "Cornell_" + str(random.randint(0, 500)) + + event = Event(user_op.dataflows["order"].entry, [user_id], {"flight": str(flight_id), "hotel": str(hotel_id)}, user_op.dataflows["order"]) + self.send(event) + + def deathstar_workload_generator(self): + search_ratio = 0.6 + recommend_ratio = 0.39 + user_ratio = 0.005 + reserve_ratio = 0.005 + c = 0 + while True: + coin = random.random() + if coin < search_ratio: + yield self.search_hotel() + elif coin < search_ratio + recommend_ratio: + yield self.recommend() + elif coin < search_ratio + recommend_ratio + user_ratio: + yield self.user_login() + else: + yield self.reserve() + c += 1 + +threads = 3 +messages_per_second = 100 +sleeps_per_second = 100 +sleep_time = 0.0085 +seconds = 50 + +def benchmark_runner(proc_num) -> dict[bytes, dict]: + print(f'Generator: {proc_num} starting') + client = Client("deathstar") + deathstar_generator = client.deathstar_workload_generator() + timestamp_futures: dict[bytes, dict] = {} + start = timer() + for _ in range(seconds): + sec_start = timer() + for i in range(messages_per_second): + if i % (messages_per_second // sleeps_per_second) == 0: + time.sleep(sleep_time) + # operator, key, func_name, params = next(deathstar_generator) + # future = client.send_event(operator=operator, + # key=key, + # function=func_name, + # params=params) + # timestamp_futures[future.request_id] = {"op": f'{func_name} {key}->{params}'} + next(deathstar_generator) + # styx.flush() + sec_end = timer() + lps = sec_end - sec_start + if lps < 1: + time.sleep(1 - lps) + sec_end2 = timer() + print(f'Latency per second: {sec_end2 - sec_start}') + end = timer() + print(f'Average latency per second: {(end - start) / seconds}') + # styx.close() + # for key, metadata in styx.delivery_timestamps.items(): + # timestamp_futures[key]["timestamp"] = metadata + return timestamp_futures + + +def main(): + + ds = DeathStarDemo("deathstar") + ds.init_runtime() + ds.runtime.run(run_async=True) + ds.populate() + + + time.sleep(1) + input() + + with Pool(threads) as p: + results = p.map(benchmark_runner, range(threads)) + + results = {k: v for d in results for k, v in d.items()} + + # pd.DataFrame({"request_id": list(results.keys()), + # "timestamp": [res["timestamp"] for res in results.values()], + # "op": [res["op"] for res in results.values()] + # }).sort_values("timestamp").to_csv(f'{SAVE_DIR}/client_requests.csv', index=False) + print(results) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/deathstar/entities/__init__.py b/deathstar/entities/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/deathstar/entities/flight.py b/deathstar/entities/flight.py new file mode 100644 index 0000000..60af68b --- /dev/null +++ b/deathstar/entities/flight.py @@ -0,0 +1,33 @@ +from typing import Any +from cascade.dataflow.dataflow import Operator +from cascade.dataflow.operator import StatefulOperator + + +class Flight(): + def __init__(self, id: str, cap: int): + self.id = id + self.cap = cap + # self.customers = [] + + # In order to be deterministic, we don't actually change the capacity + def reserve(self) -> bool: + if self.cap <= 0: + return False + return True + + +#### COMPILED FUNCTIONS (ORACLE) ##### + +def reserve_compiled(variable_map: dict[str, Any], state: Flight, key_stack: list[str]) -> Any: + key_stack.pop() + if state.cap <= 0: + return False + return True + +flight_op = StatefulOperator( + Flight, + { + "reserve": reserve_compiled + }, + {} # no dataflow? +) diff --git a/deathstar/entities/hotel.py b/deathstar/entities/hotel.py new file mode 100644 index 0000000..6689168 --- /dev/null +++ b/deathstar/entities/hotel.py @@ -0,0 +1,84 @@ +from dataclasses import dataclass +from typing import Any, Optional +from cascade.dataflow.operator import StatefulOperator +from geopy.distance import distance + + +@dataclass +class Geo(): + lat: float + lon: float + + def distance_km(self, lat: float, lon: float): + return distance((lat, lon), (self.lat, self.lon)).km + +@dataclass +class Rate(): + key: int + code: str + in_date: str + out_date: str + room_type: dict + + def __key__(self): + return self.key + +# todo: add a linked entity +# e.g. reviews: list[Review] where Review is an entity +class Hotel(): + def __init__(self, + key: str, + cap: int, + geo: Geo, + rates: list[Rate], + price: float): + self.key = key + self.cap = cap + self.customers = [] + self.rates = rates + self.geo = geo + self.price = price + + # In order to be deterministic, we don't actually change the capacity + def reserve(self) -> bool: + if self.cap < 0: + return False + return True + + def get_geo(self) -> Geo: + return self.geo + + @staticmethod + def __all__() -> list['Hotel']: + pass + + def __key__(self) -> int: + return self.key + + + +#### COMPILED FUNCTIONS (ORACLE) ##### + +def reserve_compiled(variable_map: dict[str, Any], state: Hotel, key_stack: list[str]) -> Any: + key_stack.pop() + if state.cap <= 0: + return False + return True + +def get_geo_compiled(variable_map: dict[str, Any], state: Hotel, key_stack: list[str]) -> Any: + key_stack.pop() + return state.geo + +def get_price_compiled(variable_map: dict[str, Any], state: Hotel, key_stack: list[str]) -> Any: + key_stack.pop() + return state.price + +hotel_op = StatefulOperator( + Hotel, + { + "reserve": reserve_compiled, + "get_geo": get_geo_compiled, + "get_price": get_price_compiled + }, + {} # no dataflow? +) diff --git a/deathstar/entities/recommendation.py b/deathstar/entities/recommendation.py new file mode 100644 index 0000000..7667210 --- /dev/null +++ b/deathstar/entities/recommendation.py @@ -0,0 +1,137 @@ +from typing import Any, Literal +from cascade.dataflow.dataflow import CollectNode, DataFlow, Edge, InvokeMethod, OpNode, SelectAllNode +from cascade.dataflow.operator import StatelessOperator +from deathstar.entities.hotel import Geo, Hotel, hotel_op + +# Stateless +class Recommendation(): + @staticmethod + def get_recommendations(requirement: Literal["distance", "price"], lat: float, lon: float) -> list[Hotel]: + if requirement == "distance": + distances = [(hotel.geo.distance_km(lat, lon), hotel) + for hotel in Hotel.__all__()] + min_dist = min(distances, key=lambda x: x[0]) + res = [hotel for dist, hotel in distances if dist == min_dist] + elif requirement == "price": + prices = [(hotel.price, hotel) + for hotel in Hotel.__all__()] + min_price = min(prices, key=lambda x: x[0]) + res = [hotel for rate, hotel in prices if rate == min_price] + + # todo: raise error on else ...? + return res + +#### COMPILED FUNCTIONS (ORACLE) #### + +def get_recs_if_cond(variable_map: dict[str, Any], key_stack: list[str]): + return variable_map["requirement"] == "distance" + +# list comprehension entry +def get_recs_if_body_0(variable_map: dict[str, Any], key_stack: list[str]): + hotel_key = key_stack[-1] + # The body will need the hotel key (actually, couldn't we just take the top of the key stack again?) + variable_map["hotel_key"] = hotel_key + # The next node (Hotel.get_geo) will need the hotel key + key_stack.append(hotel_key) + + +# list comprehension body +def get_recs_if_body_1(variable_map: dict[str, Any], key_stack: list[str]): + hotel_geo: Geo = variable_map["hotel_geo"] + lat, lon = variable_map["lat"], variable_map["lon"] + dist = hotel_geo.distance_km(lat, lon) + return (dist, variable_map["hotel_key"]) + +# after list comprehension +def get_recs_if_body_2(variable_map: dict[str, Any], key_stack: list[str]): + distances = variable_map["distances"] + min_dist = min(distances, key=lambda x: x[0])[0] + variable_map["res"] = [hotel for dist, hotel in distances if dist == min_dist] + + +def get_recs_elif_cond(variable_map: dict[str, Any], key_stack: list[str]): + return variable_map["requirement"] == "price" + + +# list comprehension entry +def get_recs_elif_body_0(variable_map: dict[str, Any], key_stack: list[str]): + hotel_key = key_stack[-1] + # The body will need the hotel key (actually, couldn't we just take the top of the key stack again?) + variable_map["hotel_key"] = hotel_key + # The next node (Hotel.get_geo) will need the hotel key + key_stack.append(hotel_key) + + +# list comprehension body +def get_recs_elif_body_1(variable_map: dict[str, Any], key_stack: list[str]): + return (variable_map["hotel_price"], variable_map["hotel_key"]) + +# after list comprehension +def get_recs_elif_body_2(variable_map: dict[str, Any], key_stack: list[str]): + prices = variable_map["prices"] + min_price = min(prices, key=lambda x: x[0])[0] + variable_map["res"] = [hotel for price, hotel in prices if price == min_price] + + + +# a future optimization might instead duplicate this piece of code over the two +# branches, in order to reduce the number of splits by one +def get_recs_final(variable_map: dict[str, Any], key_stack: list[str]): + return variable_map["res"] + + +recommend_op = StatelessOperator({ + "get_recs_if_cond": get_recs_if_cond, + "get_recs_if_body_0": get_recs_if_body_0, + "get_recs_if_body_1": get_recs_if_body_1, + "get_recs_if_body_2": get_recs_if_body_2, + "get_recs_elif_cond": get_recs_elif_cond, + "get_recs_elif_body_0": get_recs_elif_body_0, + "get_recs_elif_body_1": get_recs_elif_body_1, + "get_recs_elif_body_2": get_recs_elif_body_2, + "get_recs_final": get_recs_final, +}, None) + +df = DataFlow("get_recommendations") +n1 = OpNode(recommend_op, InvokeMethod("get_recs_if_cond"), is_conditional=True) +n2 = OpNode(recommend_op, InvokeMethod("get_recs_if_body_0")) +n3 = OpNode(hotel_op, InvokeMethod("get_geo"), assign_result_to="hotel_geo") +n4 = OpNode(recommend_op, InvokeMethod("get_recs_if_body_1"), assign_result_to="distance") +n5 = CollectNode("distances", "distance") +n6 = OpNode(recommend_op, InvokeMethod("get_recs_if_body_2")) +ns1 = SelectAllNode(Hotel, n5) + +n7 = OpNode(recommend_op, InvokeMethod("get_recs_elif_cond"), is_conditional=True) +n8 = OpNode(recommend_op, InvokeMethod("get_recs_elif_body_0")) +n9 = OpNode(hotel_op, InvokeMethod("get_price"), assign_result_to="hotel_price") +n10 = OpNode(recommend_op, InvokeMethod("get_recs_elif_body_1"), assign_result_to="price") +n11 = CollectNode("prices", "price") +n12 = OpNode(recommend_op, InvokeMethod("get_recs_elif_body_2")) +ns2 = SelectAllNode(Hotel, n11) + + +n13 = OpNode(recommend_op, InvokeMethod("get_recs_final")) + +df.add_edge(Edge(n1, ns1, if_conditional=True)) +df.add_edge(Edge(n1, n7, if_conditional=False)) +df.add_edge(Edge(n7, ns2, if_conditional=True)) +df.add_edge(Edge(n7, n13, if_conditional=False)) + +# if branch +df.add_edge(Edge(ns1, n2)) +df.add_edge(Edge(n2, n3)) +df.add_edge(Edge(n3, n4)) +df.add_edge(Edge(n4, n5)) +df.add_edge(Edge(n5, n6)) +df.add_edge(Edge(n6, n13)) + +# elif branch +df.add_edge(Edge(ns2, n8)) +df.add_edge(Edge(n8, n9)) +df.add_edge(Edge(n9, n10)) +df.add_edge(Edge(n10, n11)) +df.add_edge(Edge(n11, n12)) +df.add_edge(Edge(n12, n13)) + +df.entry = n1 +recommend_op.dataflow = df \ No newline at end of file diff --git a/deathstar/entities/search.py b/deathstar/entities/search.py new file mode 100644 index 0000000..a2782d2 --- /dev/null +++ b/deathstar/entities/search.py @@ -0,0 +1,85 @@ +from typing import Any +from cascade.dataflow.dataflow import CollectNode, DataFlow, Edge, InvokeMethod, OpNode, SelectAllNode +from cascade.dataflow.operator import StatelessOperator +from deathstar.entities.hotel import Geo, Hotel, hotel_op + +# Stateless +class Search(): + # Get the 5 nearest hotels + @staticmethod + def nearby(lat: float, lon: float, in_date: int, out_date: int): + distances = [ + (dist, hotel) + for hotel in Hotel.__all__() + if (dist := hotel.geo.distance_km(lat, lon)) < 10] + hotels = [hotel for dist, hotel in sorted(distances)[:5]] + return hotels + + +#### COMPILED FUNCTIONS (ORACLE) ##### + + + +# predicate 1 +def search_nearby_compiled_0(variable_map: dict[str, Any], key_stack: list[str]): + # We assume that the top of the key stack is the hotel key. + # This assumption holds if the node before this one is a correctly + # configure SelectAllNode. + + hotel_key = key_stack[-1] + # The body will need the hotel key (actually, couldn't we just take the top of the key stack again?) + variable_map["hotel_key"] = hotel_key + # The next node (Hotel.get_geo) will need the hotel key + key_stack.append(hotel_key) + +# predicate 2 +def search_nearby_compiled_1(variable_map: dict[str, Any], key_stack: list[str]): + hotel_geo: Geo = variable_map["hotel_geo"] + lat, lon = variable_map["lat"], variable_map["lon"] + dist = hotel_geo.distance_km(lat, lon) + variable_map["dist"] = dist + return dist < 10 + + +# body +def search_nearby_compiled_2(variable_map: dict[str, Any], key_stack: list[str]): + return (variable_map["dist"], variable_map["hotel_key"]) + +# next line +def search_nearby_compiled_3(variable_map: dict[str, Any], key_stack: list[str]): + distances = variable_map["distances"] + hotels = [hotel for dist, hotel in sorted(distances)[:5]] + return hotels + + +search_op = StatelessOperator({ + "search_nearby_compiled_0": search_nearby_compiled_0, + "search_nearby_compiled_1": search_nearby_compiled_1, + "search_nearby_compiled_2": search_nearby_compiled_2, + "search_nearby_compiled_3": search_nearby_compiled_3, +}, None) + +df = DataFlow("search_nearby") +n1 = OpNode(search_op, InvokeMethod("search_nearby_compiled_0")) +n2 = OpNode(hotel_op, InvokeMethod("get_geo"), assign_result_to="hotel_geo") +n3 = OpNode(search_op, InvokeMethod("search_nearby_compiled_1"), is_conditional=True) +n4 = OpNode(search_op, InvokeMethod("search_nearby_compiled_2"), assign_result_to="search_body") +n5 = CollectNode("distances", "search_body") +n0 = SelectAllNode(Hotel, n5) + +n6 = OpNode(search_op, InvokeMethod("search_nearby_compiled_3")) + +df.add_edge(Edge(n0, n1)) +df.add_edge(Edge(n1, n2)) +df.add_edge(Edge(n2, n3)) + +# if true make the body +df.add_edge(Edge(n3, n4, if_conditional=True)) +df.add_edge(Edge(n4, n5)) +# if false skip past +df.add_edge(Edge(n3, n5, if_conditional=False)) + +df.add_edge(Edge(n5, n6)) + +df.entry = n0 +search_op.dataflow = df \ No newline at end of file diff --git a/deathstar/entities/user.py b/deathstar/entities/user.py new file mode 100644 index 0000000..0234e91 --- /dev/null +++ b/deathstar/entities/user.py @@ -0,0 +1,79 @@ +from typing import Any +from cascade.dataflow.dataflow import DataFlow, Edge, InvokeMethod, OpNode +from cascade.dataflow.operator import StatefulOperator +from deathstar.entities.flight import Flight, flight_op +from deathstar.entities.hotel import Hotel, hotel_op + + +class User(): + def __init__(self, user_id: str, password: str): + self.id = user_id + self.password = password + + def check(self, password): + return self.password == password + + def order(self, flight: Flight, hotel: Hotel): + if hotel.reserve() and flight.reserve(): + return True + else: + return False + +#### COMPILED FUNCTIONS (ORACLE) ##### + +def check_compiled(variable_map: dict[str, Any], state: User, key_stack: list[str]) -> Any: + key_stack.pop() + return state.password == variable_map["password"] + +def order_compiled_entry_0(variable_map: dict[str, Any], state: User, key_stack: list[str]) -> Any: + key_stack.append(variable_map["hotel"]) + +def order_compiled_entry_1(variable_map: dict[str, Any], state: User, key_stack: list[str]) -> Any: + key_stack.append(variable_map["flight"]) + +def order_compiled_if_cond(variable_map: dict[str, Any], state: User, key_stack: list[str]) -> Any: + return variable_map["hotel_reserve"] and variable_map["flight_reserve"] + +def order_compiled_if_body(variable_map: dict[str, Any], state: User, key_stack: list[str]) -> Any: + key_stack.pop() + return True + +def order_compiled_else_body(variable_map: dict[str, Any], state: User, key_stack: list[str]) -> Any: + key_stack.pop() + return False + +user_op = StatefulOperator( + User, + { + "login": check_compiled, + "order_compiled_entry_0": order_compiled_entry_0, + "order_compiled_entry_1": order_compiled_entry_1, + "order_compiled_if_cond": order_compiled_if_cond, + "order_compiled_if_body": order_compiled_if_body, + "order_compiled_else_body": order_compiled_else_body + }, + {} +) + +# For now, the dataflow will be serial instead of parallel. Future optimizations +# will try to automatically parallelize this. +# There is also no user entry (this could also be an optimization) +df = DataFlow("user_order") +n0 = OpNode(user_op, InvokeMethod("order_compiled_entry_0")) +n1 = OpNode(hotel_op, InvokeMethod("reserve"), assign_result_to="hotel_reserve") +n2 = OpNode(user_op, InvokeMethod("order_compiled_entry_1")) +n3 = OpNode(flight_op, InvokeMethod("reserve"), assign_result_to="flight_reserve") +n4 = OpNode(user_op, InvokeMethod("order_compiled_if_cond"), is_conditional=True) +n5 = OpNode(user_op, InvokeMethod("order_compiled_if_body")) +n6 = OpNode(user_op, InvokeMethod("order_compiled_else_body")) + +df.add_edge(Edge(n0, n1)) +df.add_edge(Edge(n1, n2)) +df.add_edge(Edge(n2, n3)) +df.add_edge(Edge(n3, n4)) +df.add_edge(Edge(n4, n5, if_conditional=True)) +df.add_edge(Edge(n4, n6, if_conditional=False)) + +df.entry = n0 + +user_op.dataflows["order"] = df diff --git a/deathstar/test_demo.py b/deathstar/test_demo.py new file mode 100644 index 0000000..bc6a1c7 --- /dev/null +++ b/deathstar/test_demo.py @@ -0,0 +1,36 @@ + +from deathstar.demo import DeathStarDemo + + +def test_deathstar_demo(): + ds = DeathStarDemo() + ds.init_runtime() + ds.runtime.run(run_async=True) + ds.populate() + print("done!") + + input() + print("user login") + ds.user_login() + + input() + print("reserve") + ds.reserve() + + input() + print("search") + ds.search_hotel() + + input() + print("recommend") + ds.recommend(req_param="distance") + + input() + print("recommend") + ds.recommend(req_param="price") + + input() + + +if __name__ == "__main__": + test_deathstar_demo() \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 6122afd..fac17e9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -14,6 +14,7 @@ exceptiongroup==1.2.2 fastavro==1.9.7 fasteners==0.19 find_libpython==0.4.0 +geopy==2.4.1 grpcio==1.68.1 hdfs==2.7.3 httplib2==0.22.0 diff --git a/src/cascade/dataflow/dataflow.py b/src/cascade/dataflow/dataflow.py index 11d6a0f..8c4ec78 100644 --- a/src/cascade/dataflow/dataflow.py +++ b/src/cascade/dataflow/dataflow.py @@ -239,6 +239,8 @@ def propogate(self, key_stack, result) -> Union['EventResult', list['Event']]: edges = self.dataflow.nodes[self.target.id].outgoing_edges true_edges = [edge for edge in edges if edge.if_conditional] false_edges = [edge for edge in edges if not edge.if_conditional] + if not (len(true_edges) == len(false_edges) == 1): + print(edges) assert len(true_edges) == len(false_edges) == 1 target_true = true_edges[0].to_node target_false = false_edges[0].to_node diff --git a/src/cascade/runtime/flink_runtime.py b/src/cascade/runtime/flink_runtime.py index 074b2ce..b9ae82b 100644 --- a/src/cascade/runtime/flink_runtime.py +++ b/src/cascade/runtime/flink_runtime.py @@ -204,7 +204,7 @@ def process_element(self, event: Event, ctx: KeyedProcessFunction.Context): if all([isinstance(r, Arrived) for r in collection]): logger.debug(f"FlinkCollectOp [{ctx.get_current_key()}]: Yielding collection") - collection = [r.val for r in collection] # type: ignore (r is of type Arrived) + collection = [r.val for r in collection if r.val is not None] # type: ignore (r is of type Arrived) event.variable_map[target_node.assign_result_to] = collection new_events = event.propogate(event.key_stack, collection) @@ -446,10 +446,11 @@ def add_operator(self, flink_op: FlinkOperator): self.stateful_op_streams.append(op_stream) def add_stateless_operator(self, flink_op: FlinkStatelessOperator): - """Add a `FlinkStatlessOperator` to the Flink datastream.""" + """Add a `FlinkStatelessOperator` to the Flink datastream.""" op_stream = ( self.stateless_op_stream + .filter(lambda e: e.target.operator.dataflow.name == flink_op.operator.dataflow.name) .process(flink_op) .name("STATELESS DATAFLOW: " + flink_op.operator.dataflow.name) ) From 80766ecc182c73578544ca94d04a140615c49dfb Mon Sep 17 00:00:00 2001 From: lucasvanmol Date: Thu, 9 Jan 2025 16:14:18 +0100 Subject: [PATCH 7/9] Mark demo test as integration --- deathstar/test_demo.py | 34 ++++++++++++++++++++-------------- 1 file changed, 20 insertions(+), 14 deletions(-) diff --git a/deathstar/test_demo.py b/deathstar/test_demo.py index bc6a1c7..cba7f27 100644 --- a/deathstar/test_demo.py +++ b/deathstar/test_demo.py @@ -1,35 +1,41 @@ -from deathstar.demo import DeathStarDemo - +from deathstar.demo import Client, DeathStarDemo +import time +import pytest +@pytest.mark.integration def test_deathstar_demo(): - ds = DeathStarDemo() + ds = DeathStarDemo("deathstardemo-test") ds.init_runtime() ds.runtime.run(run_async=True) + print("Populating, press enter to go to the next step when done") ds.populate() - print("done!") + client = Client("deathstardemo-test") input() - print("user login") - ds.user_login() + print("testing user login") + client.user_login() input() - print("reserve") - ds.reserve() + print("testing reserve") + client.reserve() input() - print("search") - ds.search_hotel() + print("testing search") + client.search_hotel() input() - print("recommend") - ds.recommend(req_param="distance") + print("testing recommend (distance)") + time.sleep(0.5) + client.recommend(req_param="distance") input() - print("recommend") - ds.recommend(req_param="price") + print("testing recommend (price)") + time.sleep(0.5) + client.recommend(req_param="price") input() + print("done!") if __name__ == "__main__": From 8b85f8923779b4d8cb5a42d391ce51cc25724521 Mon Sep 17 00:00:00 2001 From: lucasvanmol Date: Fri, 10 Jan 2025 14:44:01 +0100 Subject: [PATCH 8/9] Add SyncFlinkClient --- deathstar/demo.py | 81 +++++++++--------- deathstar/test_demo.py | 23 +++-- src/cascade/runtime/flink_runtime.py | 123 ++++++++++++++++++++++----- 3 files changed, 159 insertions(+), 68 deletions(-) diff --git a/deathstar/demo.py b/deathstar/demo.py index 761ca96..6d0092f 100644 --- a/deathstar/demo.py +++ b/deathstar/demo.py @@ -11,7 +11,7 @@ sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "../src"))) from cascade.dataflow.dataflow import Event, InitClass, InvokeMethod, OpNode -from cascade.runtime.flink_runtime import FlinkOperator, FlinkRuntime, FlinkStatelessOperator +from cascade.runtime.flink_runtime import FlinkClientSync, FlinkOperator, FlinkRuntime, FlinkStatelessOperator from deathstar.entities.flight import Flight, flight_op from deathstar.entities.hotel import Geo, Hotel, Rate, hotel_op from deathstar.entities.recommendation import Recommendation, recommend_op @@ -19,12 +19,12 @@ from deathstar.entities.user import User, user_op -class DeathStarDemo(): - def __init__(self, name): +class DeathstarDemo(): + def __init__(self, input_topic, output_topic): self.init_user = OpNode(user_op, InitClass()) self.init_hotel = OpNode(hotel_op, InitClass()) self.init_flight = OpNode(flight_op, InitClass()) - self.runtime = FlinkRuntime(name) + self.runtime = FlinkRuntime(input_topic, output_topic) def init_runtime(self): self.runtime.init(bundle_time=100, bundle_size=1000) @@ -176,20 +176,13 @@ def populate(self): }, None) self.runtime.send(event, flush=True) -class Client: - def __init__(self, topic="input-topic", kafka_broker="localhost:9092"): +class DeathstarClient: + def __init__(self, input_topic="input-topic", output_topic="output-topic", kafka_broker="localhost:9092"): + self.client = FlinkClientSync(input_topic, output_topic, kafka_broker, True) self.producer = Producer({'bootstrap.servers': kafka_broker}) - self.topic = topic - - def send(self, event: Event, flush=False): - """Send an event to the Kafka source. - Once `run` has been called, the Flink runtime will start ingesting these - messages. Messages can always be sent after `init` is called - Flink - will continue ingesting messages after `run` is called asynchronously. - """ - self.producer.produce(self.topic, value=pickle.dumps(event)) - if flush: - self.producer.flush() + + def send(self, event: Event, flush: bool = False): + return self.client.send(event, flush) def search_hotel(self): in_date = random.randint(9, 23) @@ -208,8 +201,7 @@ def search_hotel(self): lon = -122.095 + (random.randint(0, 325) - 157.0) / 1000.0 # We don't really use the in_date, out_date information - event = Event(search_op.dataflow.entry, ["tempkey"], {"lat": lat, "lon": lon}, search_op.dataflow) - self.send(event) + return Event(search_op.dataflow.entry, ["tempkey"], {"lat": lat, "lon": lon}, search_op.dataflow) def recommend(self, req_param=None): if req_param is None: @@ -222,15 +214,13 @@ def recommend(self, req_param=None): lat = 38.0235 + (random.randint(0, 481) - 240.5) / 1000.0 lon = -122.095 + (random.randint(0, 325) - 157.0) / 1000.0 - event = Event(recommend_op.dataflow.entry, ["tempkey"], {"requirement": req_param, "lat": lat, "lon": lon}, recommend_op.dataflow) - self.send(event) + return Event(recommend_op.dataflow.entry, ["tempkey"], {"requirement": req_param, "lat": lat, "lon": lon}, recommend_op.dataflow) def user_login(self): user_id = random.randint(0, 500) username = f"Cornell_{user_id}" password = str(user_id) * 10 - event = Event(OpNode(user_op, InvokeMethod("login")), [username], {"password": password}, None) - self.send(event) + return Event(OpNode(user_op, InvokeMethod("login")), [username], {"password": password}, None) def reserve(self): hotel_id = random.randint(0, 99) @@ -240,8 +230,7 @@ def reserve(self): # user.order(flight, hotel) user_id = "Cornell_" + str(random.randint(0, 500)) - event = Event(user_op.dataflows["order"].entry, [user_id], {"flight": str(flight_id), "hotel": str(hotel_id)}, user_op.dataflows["order"]) - self.send(event) + return Event(user_op.dataflows["order"].entry, [user_id], {"flight": str(flight_id), "hotel": str(hotel_id)}, user_op.dataflows["order"]) def deathstar_workload_generator(self): search_ratio = 0.6 @@ -261,30 +250,31 @@ def deathstar_workload_generator(self): yield self.reserve() c += 1 -threads = 3 +threads = 1 messages_per_second = 100 sleeps_per_second = 100 sleep_time = 0.0085 -seconds = 50 +seconds = 20 + -def benchmark_runner(proc_num) -> dict[bytes, dict]: +def benchmark_runner(proc_num) -> dict[int, dict]: print(f'Generator: {proc_num} starting') - client = Client("deathstar") + client = DeathstarClient("deathstar", "ds-out") deathstar_generator = client.deathstar_workload_generator() - timestamp_futures: dict[bytes, dict] = {} + futures: dict[int, dict] = {} start = timer() for _ in range(seconds): sec_start = timer() for i in range(messages_per_second): if i % (messages_per_second // sleeps_per_second) == 0: time.sleep(sleep_time) - # operator, key, func_name, params = next(deathstar_generator) - # future = client.send_event(operator=operator, - # key=key, - # function=func_name, - # params=params) - # timestamp_futures[future.request_id] = {"op": f'{func_name} {key}->{params}'} - next(deathstar_generator) + event = next(deathstar_generator) + func_name = event.dataflow.name if event.dataflow is not None else "login" # only login has no dataflow + key = event.key_stack[0] + params = event.variable_map + client.send(event) + futures[event._id] = {"event": f'{func_name} {key}->{params}'} + # styx.flush() sec_end = timer() lps = sec_end - sec_start @@ -297,12 +287,14 @@ def benchmark_runner(proc_num) -> dict[bytes, dict]: # styx.close() # for key, metadata in styx.delivery_timestamps.items(): # timestamp_futures[key]["timestamp"] = metadata - return timestamp_futures + for event_id, result in client.client._futures.items(): + assert event_id in futures + futures[event_id]["result"] = result + return futures def main(): - - ds = DeathStarDemo("deathstar") + ds = DeathstarDemo("deathstar", "ds-out") ds.init_runtime() ds.runtime.run(run_async=True) ds.populate() @@ -321,6 +313,13 @@ def main(): # "op": [res["op"] for res in results.values()] # }).sort_values("timestamp").to_csv(f'{SAVE_DIR}/client_requests.csv', index=False) print(results) - + t = len(results) + r = 0 + for result in results.values(): + if result["result"] is not None: + print(result) + r += 1 + print(f"{r}/{t} results recieved.") + if __name__ == "__main__": main() \ No newline at end of file diff --git a/deathstar/test_demo.py b/deathstar/test_demo.py index cba7f27..a7e674e 100644 --- a/deathstar/test_demo.py +++ b/deathstar/test_demo.py @@ -1,41 +1,48 @@ -from deathstar.demo import Client, DeathStarDemo +from deathstar.demo import DeathstarDemo, DeathstarClient import time import pytest @pytest.mark.integration def test_deathstar_demo(): - ds = DeathStarDemo("deathstardemo-test") + ds = DeathstarDemo("deathstardemo-test", "dsd-out") ds.init_runtime() ds.runtime.run(run_async=True) print("Populating, press enter to go to the next step when done") ds.populate() - client = Client("deathstardemo-test") + client = DeathstarClient("deathstardemo-test", "dsd-out") input() print("testing user login") - client.user_login() + event = client.user_login() + client.send(event) input() print("testing reserve") - client.reserve() + event = client.reserve() + client.send(event) input() print("testing search") - client.search_hotel() + event = client.search_hotel() + client.send(event) input() print("testing recommend (distance)") time.sleep(0.5) - client.recommend(req_param="distance") + event = client.recommend(req_param="distance") + client.send(event) input() print("testing recommend (price)") time.sleep(0.5) - client.recommend(req_param="price") + event = client.recommend(req_param="price") + client.send(event) + print(client.client._futures) input() print("done!") + print(client.client._futures) if __name__ == "__main__": diff --git a/src/cascade/runtime/flink_runtime.py b/src/cascade/runtime/flink_runtime.py index b9ae82b..f8b13c0 100644 --- a/src/cascade/runtime/flink_runtime.py +++ b/src/cascade/runtime/flink_runtime.py @@ -1,6 +1,8 @@ from dataclasses import dataclass import os -from typing import Optional, Type, Union +import uuid +import threading +from typing import Literal, Optional, Type, Union from pyflink.common.typeinfo import Types, get_gateway from pyflink.common import Configuration, DeserializationSchema, SerializationSchema, WatermarkStrategy from pyflink.datastream.connectors import DeliveryGuarantee @@ -11,7 +13,7 @@ import pickle from cascade.dataflow.dataflow import Arrived, CollectNode, CollectTarget, Event, EventResult, Filter, InitClass, InvokeMethod, MergeNode, Node, NotArrived, OpNode, Operator, Result, SelectAllNode from cascade.dataflow.operator import StatefulOperator, StatelessOperator -from confluent_kafka import Producer +from confluent_kafka import Producer, Consumer import logging logger = logging.getLogger(__name__) @@ -285,7 +287,7 @@ def __init__(self): class FlinkRuntime(): """A Runtime that runs Dataflows on Flink.""" - def __init__(self, topic="input-topic", ui_port: Optional[int] = None): + def __init__(self, input_topic="input-topic", output_topic="output-topic", ui_port: Optional[int] = None): self.env: Optional[StreamExecutionEnvironment] = None """@private""" @@ -295,11 +297,12 @@ def __init__(self, topic="input-topic", ui_port: Optional[int] = None): self.sent_events = 0 """The number of events that were sent using `send()`.""" - self.topic = topic - """The topic to use for internal communications. + self.input_topic = input_topic + """The topic to use for internal communications.""" - Useful for running multiple instances concurrently, for example during - tests. + self.output_topic = output_topic + """The topic to use for external communications, i.e. when a dataflow is + finished. As such only `EventResult`s should be contained in this topic. """ self.ui_port = ui_port @@ -354,7 +357,7 @@ def init(self, kafka_broker="localhost:9092", bundle_time=1, bundle_size=5): kafka_source = ( KafkaSource.builder() .set_bootstrap_servers(kafka_broker) - .set_topics(self.topic) + .set_topics(self.input_topic) .set_group_id("test_group_1") .set_starting_offsets(KafkaOffsetsInitializer.earliest()) .set_value_only_deserializer(deserialization_schema) @@ -365,7 +368,7 @@ def init(self, kafka_broker="localhost:9092", bundle_time=1, bundle_size=5): .set_bootstrap_servers(kafka_broker) .set_record_serializer( KafkaRecordSerializationSchema.builder() - .set_topic(self.topic) + .set_topic(self.input_topic) .set_value_serialization_schema(deserialization_schema) .build() ) @@ -374,6 +377,21 @@ def init(self, kafka_broker="localhost:9092", bundle_time=1, bundle_size=5): ) """Kafka sink that will be ingested again by the Flink runtime.""" + self.kafka_external_sink = ( + KafkaSink.builder() + .set_bootstrap_servers(kafka_broker) + .set_record_serializer( + KafkaRecordSerializationSchema.builder() + .set_topic(self.output_topic) + .set_value_serialization_schema(deserialization_schema) + .build() + ) + .set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE) + .build() + ) + """Kafka sink corresponding to outputs of calls (`EventResult`s).""" + + event_stream = ( self.env.from_source( kafka_source, @@ -414,11 +432,6 @@ def init(self, kafka_broker="localhost:9092", bundle_time=1, bundle_size=5): .filter(lambda e: isinstance(e.target.operator, StatelessOperator)) ) - # self.merge_op_stream = ( - # event_stream.filter(lambda e: isinstance(e.target, MergeNode)) - # .key_by(lambda e: e._id) # might not work in the future if we have multiple merges in one dataflow? - # .process(FlinkMergeOperator()) - # ) self.merge_op_stream = ( event_stream.filter(lambda e: isinstance(e.target, CollectNode)) .key_by(lambda e: e._id) # might not work in the future if we have multiple merges in one dataflow? @@ -462,12 +475,12 @@ def send(self, event: Event, flush=False): messages. Messages can always be sent after `init` is called - Flink will continue ingesting messages after `run` is called asynchronously. """ - self.producer.produce(self.topic, value=pickle.dumps(event)) + self.producer.produce(self.input_topic, value=pickle.dumps(event)) if flush: self.producer.flush() self.sent_events += 1 - def run(self, run_async=False, collect=False) -> Union[CloseableIterator, None]: + def run(self, run_async=False, output: Literal["collect", "kafka", "stdout"]="kafka") -> Union[CloseableIterator, None]: """Start ingesting and processing messages from the Kafka source. If `collect` is True then this will return a CloseableIterator over @@ -492,10 +505,15 @@ def run(self, run_async=False, collect=False) -> Union[CloseableIterator, None]: ds = full_stream_filtered.union(full_stream_unfiltered) # Output the stream - if collect: + if output == "collect": ds_external = ds.filter(lambda e: isinstance(e, EventResult)).execute_and_collect() + elif output == "stdout": + ds_external = ds.filter(lambda e: isinstance(e, EventResult)).print() + elif output == "kafka": + ds_external = ds.filter(lambda e: isinstance(e, EventResult)).sink_to(self.kafka_external_sink).name("EXTERNAL KAFKA SINK") else: - ds_external = ds.filter(lambda e: isinstance(e, EventResult)).print() #.add_sink(kafka_external_sink) + raise ValueError(f"Invalid output: {output}") + ds_internal = ds.filter(lambda e: isinstance(e, Event)).sink_to(self.kafka_internal_sink).name("INTERNAL KAFKA SINK") if run_async: @@ -504,4 +522,71 @@ def run(self, run_async=False, collect=False) -> Union[CloseableIterator, None]: return ds_external # type: ignore (will be CloseableIterator provided the source is unbounded (i.e. Kafka)) else: logger.debug("FlinkRuntime starting (sync)") - self.env.execute("Cascade: Flink Runtime") \ No newline at end of file + self.env.execute("Cascade: Flink Runtime") + +class FlinkClientSync: + def __init__(self, input_topic="input-topic", output_topic="output-topic", kafka_url="localhost:9092", start_consumer_thread: bool = True): + self.producer = Producer({'bootstrap.servers': kafka_url}) + self.input_topic = input_topic + self.output_topic = output_topic + self.kafka_url = kafka_url + self.is_consuming = False + self._futures: dict[int, Optional[EventResult]] = {} # TODO: handle timeouts? + """Mapping of event id's to their EventResult. None if not arrived.""" + if start_consumer_thread: + self.start_consumer_thread() + + def start_consumer_thread(self): + self.result_consumer_process = threading.Thread(target=self.consume_results) + self.is_consuming = True + self.result_consumer_process.start() + + def consume_results(self): + self.consumer = Consumer( + { + "bootstrap.servers": self.kafka_url, + "group.id": str(uuid.uuid4()), + "auto.offset.reset": "earliest", + # "enable.auto.commit": False, + # "fetch.min.bytes": 1 + }) + + self.consumer.subscribe([self.output_topic]) + + while self.is_consuming: + msg = self.consumer.poll(1.0) + if msg is None: + continue + if msg.error(): + logger.error(f"Consumer error: {msg.error()}") + continue + try: + event_result: EventResult = pickle.loads(msg.value()) + if event_result.event_id in self._futures: + if (r := self._futures[event_result.event_id]) != None: + logger.warning(f"Recieved EventResult with id {event_result.event_id} more than once: {event_result} replaced previous: {r}") + self._futures[event_result.event_id] = event_result + except Exception as e: + logger.error(f"Consumer deserializing error: {e}") + + + self.consumer.close() + + def send(self, event: Event, flush=False) -> int: + """Send an event to the Kafka source and block until an EventResult is recieved. + + :param event: The event to send. + :param flush: Whether to flush the producer after sending. + :return: The result event if recieved + :raises Exception: If an exception occured recieved or deserializing the message. + """ + self.producer.produce(self.input_topic, value=pickle.dumps(event)) + if flush: + self.producer.flush() + + self._futures[event._id] = None + return event._id + + def close(self): + self.producer.flush() + self.is_consuming = False From 16acbc06c9b5c721325188612876331695be49c9 Mon Sep 17 00:00:00 2001 From: lucasvanmol Date: Mon, 13 Jan 2025 11:09:35 +0100 Subject: [PATCH 9/9] Make metrics changes --- deathstar/demo.py | 75 +++++++++++++++++++++------- src/cascade/runtime/flink_runtime.py | 21 ++++++-- 2 files changed, 74 insertions(+), 22 deletions(-) diff --git a/deathstar/demo.py b/deathstar/demo.py index 6d0092f..c42bdb7 100644 --- a/deathstar/demo.py +++ b/deathstar/demo.py @@ -2,8 +2,8 @@ import sys import os import time +import csv from confluent_kafka import Producer -import pickle from timeit import default_timer as timer from multiprocessing import Pool @@ -251,17 +251,17 @@ def deathstar_workload_generator(self): c += 1 threads = 1 -messages_per_second = 100 -sleeps_per_second = 100 +messages_per_second = 10 +sleeps_per_second = 10 sleep_time = 0.0085 -seconds = 20 +seconds = 10 def benchmark_runner(proc_num) -> dict[int, dict]: print(f'Generator: {proc_num} starting') client = DeathstarClient("deathstar", "ds-out") deathstar_generator = client.deathstar_workload_generator() - futures: dict[int, dict] = {} + # futures: dict[int, dict] = {} start = timer() for _ in range(seconds): sec_start = timer() @@ -269,11 +269,11 @@ def benchmark_runner(proc_num) -> dict[int, dict]: if i % (messages_per_second // sleeps_per_second) == 0: time.sleep(sleep_time) event = next(deathstar_generator) - func_name = event.dataflow.name if event.dataflow is not None else "login" # only login has no dataflow - key = event.key_stack[0] - params = event.variable_map + # func_name = event.dataflow.name if event.dataflow is not None else "login" # only login has no dataflow + # key = event.key_stack[0] + # params = event.variable_map client.send(event) - futures[event._id] = {"event": f'{func_name} {key}->{params}'} + # futures[event._id] = {"event": f'{func_name} {key}->{params}'} # styx.flush() sec_end = timer() @@ -287,12 +287,51 @@ def benchmark_runner(proc_num) -> dict[int, dict]: # styx.close() # for key, metadata in styx.delivery_timestamps.items(): # timestamp_futures[key]["timestamp"] = metadata - for event_id, result in client.client._futures.items(): - assert event_id in futures - futures[event_id]["result"] = result + + done = False + while not done: + done = True + for event_id, fut in client.client._futures.items(): + result = fut["ret"] + if result is None: + done = False + time.sleep(0.5) + break + futures = client.client._futures + client.client.close() return futures +def write_dict_to_csv(futures_dict, filename): + """ + Writes a dictionary of event data to a CSV file. + + Args: + futures_dict (dict): A dictionary where each key is an event ID and the value is another dict. + filename (str): The name of the CSV file to write to. + """ + # Define the column headers + headers = ["event_id", "sent", "sent_t", "ret", "ret_t"] + + # Open the file for writing + with open(filename, mode='w', newline='', encoding='utf-8') as csvfile: + writer = csv.DictWriter(csvfile, fieldnames=headers) + + # Write the headers + writer.writeheader() + + # Write the data rows + for event_id, event_data in futures_dict.items(): + # Prepare a row where the 'event_id' is the first column + row = { + "event_id": event_id, + "sent": event_data.get("sent"), + "sent_t": event_data.get("sent_t"), + "ret": event_data.get("ret"), + "ret_t": event_data.get("ret_t") + } + writer.writerow(row) + def main(): ds = DeathstarDemo("deathstar", "ds-out") ds.init_runtime() @@ -303,10 +342,11 @@ def main(): time.sleep(1) input() - with Pool(threads) as p: - results = p.map(benchmark_runner, range(threads)) + # with Pool(threads) as p: + # results = p.map(benchmark_runner, range(threads)) - results = {k: v for d in results for k, v in d.items()} + # results = {k: v for d in results for k, v in d.items()} + results = benchmark_runner(0) # pd.DataFrame({"request_id": list(results.keys()), # "timestamp": [res["timestamp"] for res in results.values()], @@ -316,10 +356,11 @@ def main(): t = len(results) r = 0 for result in results.values(): - if result["result"] is not None: + if result["ret"] is not None: print(result) r += 1 print(f"{r}/{t} results recieved.") - + write_dict_to_csv(results, "test2.csv") + if __name__ == "__main__": main() \ No newline at end of file diff --git a/src/cascade/runtime/flink_runtime.py b/src/cascade/runtime/flink_runtime.py index f8b13c0..763582e 100644 --- a/src/cascade/runtime/flink_runtime.py +++ b/src/cascade/runtime/flink_runtime.py @@ -1,5 +1,6 @@ from dataclasses import dataclass import os +import time import uuid import threading from typing import Literal, Optional, Type, Union @@ -310,7 +311,7 @@ def __init__(self, input_topic="input-topic", output_topic="output-topic", ui_po Warning that this does not work well with run(collect=True)!""" - def init(self, kafka_broker="localhost:9092", bundle_time=1, bundle_size=5): + def init(self, kafka_broker="localhost:9092", bundle_time=1, bundle_size=5, parallelism=None): """Initialise & configure the Flink runtime. This function is required before any other calls, and requires a Kafka @@ -338,6 +339,9 @@ def init(self, kafka_broker="localhost:9092", bundle_time=1, bundle_size=5): config.set_integer("python.fn-execution.bundle.size", bundle_size) self.env = StreamExecutionEnvironment.get_execution_environment(config) + if parallelism: + self.env.set_parallelism(parallelism) + logger.debug(f"FlinkRuntime: parellelism {self.env.get_parallelism()}") kafka_jar = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'bin/flink-sql-connector-kafka-3.3.0-1.20.jar') @@ -531,7 +535,7 @@ def __init__(self, input_topic="input-topic", output_topic="output-topic", kafka self.output_topic = output_topic self.kafka_url = kafka_url self.is_consuming = False - self._futures: dict[int, Optional[EventResult]] = {} # TODO: handle timeouts? + self._futures: dict[int, dict] = {} # TODO: handle timeouts? """Mapping of event id's to their EventResult. None if not arrived.""" if start_consumer_thread: self.start_consumer_thread() @@ -562,10 +566,12 @@ def consume_results(self): continue try: event_result: EventResult = pickle.loads(msg.value()) + ts = time.time() if event_result.event_id in self._futures: if (r := self._futures[event_result.event_id]) != None: logger.warning(f"Recieved EventResult with id {event_result.event_id} more than once: {event_result} replaced previous: {r}") - self._futures[event_result.event_id] = event_result + self._futures[event_result.event_id]["ret"] = event_result + self._futures[event_result.event_id]["ret_t"] = ts except Exception as e: logger.error(f"Consumer deserializing error: {e}") @@ -584,8 +590,13 @@ def send(self, event: Event, flush=False) -> int: if flush: self.producer.flush() - self._futures[event._id] = None - return event._id + self._futures[event._id] = { + "sent": event, + "sent_t": time.time(), + "ret": None, + "ret_t": None + } + return event._id def close(self): self.producer.flush()