From a476556216be04f15a9cb2ce0bba5b73f4a87630 Mon Sep 17 00:00:00 2001 From: lucasvanmol Date: Wed, 15 Jan 2025 12:00:08 +0100 Subject: [PATCH 1/3] Add LogAppendTime for benchmarking --- deathstar/demo.py | 11 ++++++----- docker-compose.yml | 3 +++ src/cascade/runtime/flink_runtime.py | 21 ++++++++++++++------- 3 files changed, 23 insertions(+), 12 deletions(-) diff --git a/deathstar/demo.py b/deathstar/demo.py index c42bdb7..41ce836 100644 --- a/deathstar/demo.py +++ b/deathstar/demo.py @@ -270,12 +270,12 @@ def benchmark_runner(proc_num) -> dict[int, dict]: time.sleep(sleep_time) event = next(deathstar_generator) # func_name = event.dataflow.name if event.dataflow is not None else "login" # only login has no dataflow - # key = event.key_stack[0] + key = event.key_stack[0] # params = event.variable_map client.send(event) # futures[event._id] = {"event": f'{func_name} {key}->{params}'} - - # styx.flush() + + client.client.flush() sec_end = timer() lps = sec_end - sec_start if lps < 1: @@ -311,7 +311,7 @@ def write_dict_to_csv(futures_dict, filename): filename (str): The name of the CSV file to write to. """ # Define the column headers - headers = ["event_id", "sent", "sent_t", "ret", "ret_t"] + headers = ["event_id", "sent", "sent_t", "ret", "ret_t", "latency"] # Open the file for writing with open(filename, mode='w', newline='', encoding='utf-8') as csvfile: @@ -328,7 +328,8 @@ def write_dict_to_csv(futures_dict, filename): "sent": event_data.get("sent"), "sent_t": event_data.get("sent_t"), "ret": event_data.get("ret"), - "ret_t": event_data.get("ret_t") + "ret_t": event_data.get("ret_t"), + "latency": event_data["ret_t"][1] - event_data["sent_t"][1] } writer.writerow(row) diff --git a/docker-compose.yml b/docker-compose.yml index 07a153f..ce9e1b4 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -27,6 +27,9 @@ services: # Required for a single node cluster KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + # Change timestamp type for benchmark measurements + KAFKA_LOG_MESSAGE_TIMESTAMP_TYPE: LogAppendTime + kafka-ui: image: ghcr.io/kafbat/kafka-ui:latest ports: diff --git a/src/cascade/runtime/flink_runtime.py b/src/cascade/runtime/flink_runtime.py index 763582e..c97acdf 100644 --- a/src/cascade/runtime/flink_runtime.py +++ b/src/cascade/runtime/flink_runtime.py @@ -551,6 +551,7 @@ def consume_results(self): "bootstrap.servers": self.kafka_url, "group.id": str(uuid.uuid4()), "auto.offset.reset": "earliest", + "api.version.request": True # "enable.auto.commit": False, # "fetch.min.bytes": 1 }) @@ -566,9 +567,9 @@ def consume_results(self): continue try: event_result: EventResult = pickle.loads(msg.value()) - ts = time.time() + ts = msg.timestamp() if event_result.event_id in self._futures: - if (r := self._futures[event_result.event_id]) != None: + if (r := self._futures[event_result.event_id]["ret"]) != None: logger.warning(f"Recieved EventResult with id {event_result.event_id} more than once: {event_result} replaced previous: {r}") self._futures[event_result.event_id]["ret"] = event_result self._futures[event_result.event_id]["ret_t"] = ts @@ -578,6 +579,9 @@ def consume_results(self): self.consumer.close() + def flush(self): + self.producer.flush() + def send(self, event: Event, flush=False) -> int: """Send an event to the Kafka source and block until an EventResult is recieved. @@ -586,16 +590,19 @@ def send(self, event: Event, flush=False) -> int: :return: The result event if recieved :raises Exception: If an exception occured recieved or deserializing the message. """ - self.producer.produce(self.input_topic, value=pickle.dumps(event)) - if flush: - self.producer.flush() - self._futures[event._id] = { "sent": event, - "sent_t": time.time(), + "sent_t": None, "ret": None, "ret_t": None } + + def set_ts(ts): + self._futures[event._id]["sent_t"] = ts + + self.producer.produce(self.input_topic, value=pickle.dumps(event), on_delivery=lambda err, msg: set_ts(msg.timestamp())) + if flush: + self.producer.flush() return event._id def close(self): From 9aae75a8b5d2c90eb1e7a9f5aebf94326fdae86a Mon Sep 17 00:00:00 2001 From: lucasvanmol Date: Thu, 16 Jan 2025 16:37:13 +0100 Subject: [PATCH 2/3] Add python runtime for testing/benchmarking --- deathstar/demo.py | 163 +++++++++--------- deathstar/demo_python.py | 109 ++++++++++++ deathstar/test_demo.py | 69 ++++++-- src/cascade/dataflow/dataflow.py | 8 - src/cascade/dataflow/test_dataflow.py | 2 +- src/cascade/runtime/flink_runtime.py | 17 +- src/cascade/runtime/python_runtime.py | 156 +++++++++++++++++ tests/integration/flink-runtime/common.py | 2 +- .../flink-runtime/test_select_all.py | 2 +- 9 files changed, 412 insertions(+), 116 deletions(-) create mode 100644 deathstar/demo_python.py create mode 100644 src/cascade/runtime/python_runtime.py diff --git a/deathstar/demo.py b/deathstar/demo.py index 41ce836..79df850 100644 --- a/deathstar/demo.py +++ b/deathstar/demo.py @@ -3,7 +3,6 @@ import os import time import csv -from confluent_kafka import Producer from timeit import default_timer as timer from multiprocessing import Pool @@ -20,19 +19,19 @@ class DeathstarDemo(): - def __init__(self, input_topic, output_topic): + def __init__(self): self.init_user = OpNode(user_op, InitClass()) self.init_hotel = OpNode(hotel_op, InitClass()) self.init_flight = OpNode(flight_op, InitClass()) - self.runtime = FlinkRuntime(input_topic, output_topic) - def init_runtime(self): - self.runtime.init(bundle_time=100, bundle_size=1000) - self.runtime.add_operator(FlinkOperator(hotel_op)) - self.runtime.add_operator(FlinkOperator(flight_op)) - self.runtime.add_operator(FlinkOperator(user_op)) - self.runtime.add_stateless_operator(FlinkStatelessOperator(search_op)) - self.runtime.add_stateless_operator(FlinkStatelessOperator(recommend_op)) + def init_runtime(self, runtime, **kwargs): + self.runtime = runtime + self.runtime.init(**kwargs) + self.runtime.add_operator(hotel_op) + self.runtime.add_operator(flight_op) + self.runtime.add_operator(user_op) + self.runtime.add_stateless_operator(search_op) + self.runtime.add_stateless_operator(recommend_op) def populate(self): @@ -176,80 +175,72 @@ def populate(self): }, None) self.runtime.send(event, flush=True) -class DeathstarClient: - def __init__(self, input_topic="input-topic", output_topic="output-topic", kafka_broker="localhost:9092"): - self.client = FlinkClientSync(input_topic, output_topic, kafka_broker, True) - self.producer = Producer({'bootstrap.servers': kafka_broker}) - - def send(self, event: Event, flush: bool = False): - return self.client.send(event, flush) - - def search_hotel(self): - in_date = random.randint(9, 23) - out_date = random.randint(in_date + 1, 24) - - if in_date < 10: - in_date_str = f"2015-04-0{in_date}" +def search_hotel(): + in_date = random.randint(9, 23) + out_date = random.randint(in_date + 1, 24) + + if in_date < 10: + in_date_str = f"2015-04-0{in_date}" + else: + in_date_str = f"2015-04-{in_date}" + if out_date < 10: + out_date_str = f"2015-04-0{out_date}" + else: + out_date_str = f"2015-04-{out_date}" + + lat = 38.0235 + (random.randint(0, 481) - 240.5) / 1000.0 + lon = -122.095 + (random.randint(0, 325) - 157.0) / 1000.0 + + # We don't really use the in_date, out_date information + return Event(search_op.dataflow.entry, ["tempkey"], {"lat": lat, "lon": lon}, search_op.dataflow) + +def recommend(req_param=None): + if req_param is None: + coin = random.random() + if coin < 0.5: + req_param = "distance" else: - in_date_str = f"2015-04-{in_date}" - if out_date < 10: - out_date_str = f"2015-04-0{out_date}" - else: - out_date_str = f"2015-04-{out_date}" - - lat = 38.0235 + (random.randint(0, 481) - 240.5) / 1000.0 - lon = -122.095 + (random.randint(0, 325) - 157.0) / 1000.0 + req_param = "price" - # We don't really use the in_date, out_date information - return Event(search_op.dataflow.entry, ["tempkey"], {"lat": lat, "lon": lon}, search_op.dataflow) + lat = 38.0235 + (random.randint(0, 481) - 240.5) / 1000.0 + lon = -122.095 + (random.randint(0, 325) - 157.0) / 1000.0 - def recommend(self, req_param=None): - if req_param is None: - coin = random.random() - if coin < 0.5: - req_param = "distance" - else: - req_param = "price" + return Event(recommend_op.dataflow.entry, ["tempkey"], {"requirement": req_param, "lat": lat, "lon": lon}, recommend_op.dataflow) - lat = 38.0235 + (random.randint(0, 481) - 240.5) / 1000.0 - lon = -122.095 + (random.randint(0, 325) - 157.0) / 1000.0 +def user_login(): + user_id = random.randint(0, 500) + username = f"Cornell_{user_id}" + password = str(user_id) * 10 + return Event(OpNode(user_op, InvokeMethod("login")), [username], {"password": password}, None) - return Event(recommend_op.dataflow.entry, ["tempkey"], {"requirement": req_param, "lat": lat, "lon": lon}, recommend_op.dataflow) - - def user_login(self): - user_id = random.randint(0, 500) - username = f"Cornell_{user_id}" - password = str(user_id) * 10 - return Event(OpNode(user_op, InvokeMethod("login")), [username], {"password": password}, None) - - def reserve(self): - hotel_id = random.randint(0, 99) - flight_id = random.randint(0, 99) - - # user = User("user1", "pass") - # user.order(flight, hotel) - user_id = "Cornell_" + str(random.randint(0, 500)) - - return Event(user_op.dataflows["order"].entry, [user_id], {"flight": str(flight_id), "hotel": str(hotel_id)}, user_op.dataflows["order"]) - - def deathstar_workload_generator(self): - search_ratio = 0.6 - recommend_ratio = 0.39 - user_ratio = 0.005 - reserve_ratio = 0.005 - c = 0 - while True: - coin = random.random() - if coin < search_ratio: - yield self.search_hotel() - elif coin < search_ratio + recommend_ratio: - yield self.recommend() - elif coin < search_ratio + recommend_ratio + user_ratio: - yield self.user_login() - else: - yield self.reserve() - c += 1 +def reserve(): + hotel_id = random.randint(0, 99) + flight_id = random.randint(0, 99) + # user = User("user1", "pass") + # user.order(flight, hotel) + user_id = "Cornell_" + str(random.randint(0, 500)) + + return Event(user_op.dataflows["order"].entry, [user_id], {"flight": str(flight_id), "hotel": str(hotel_id)}, user_op.dataflows["order"]) + +def deathstar_workload_generator(): + search_ratio = 0.6 + recommend_ratio = 0.39 + user_ratio = 0.005 + reserve_ratio = 0.005 + c = 0 + while True: + coin = random.random() + if coin < search_ratio: + yield search_hotel() + elif coin < search_ratio + recommend_ratio: + yield recommend() + elif coin < search_ratio + recommend_ratio + user_ratio: + yield user_login() + else: + yield reserve() + c += 1 + threads = 1 messages_per_second = 10 sleeps_per_second = 10 @@ -259,8 +250,8 @@ def deathstar_workload_generator(self): def benchmark_runner(proc_num) -> dict[int, dict]: print(f'Generator: {proc_num} starting') - client = DeathstarClient("deathstar", "ds-out") - deathstar_generator = client.deathstar_workload_generator() + client = FlinkClientSync("deathstar", "ds-out", "localhost:9092", True) + deathstar_generator = deathstar_workload_generator() # futures: dict[int, dict] = {} start = timer() for _ in range(seconds): @@ -275,7 +266,7 @@ def benchmark_runner(proc_num) -> dict[int, dict]: client.send(event) # futures[event._id] = {"event": f'{func_name} {key}->{params}'} - client.client.flush() + client.flush() sec_end = timer() lps = sec_end - sec_start if lps < 1: @@ -291,14 +282,14 @@ def benchmark_runner(proc_num) -> dict[int, dict]: done = False while not done: done = True - for event_id, fut in client.client._futures.items(): + for event_id, fut in client._futures.items(): result = fut["ret"] if result is None: done = False time.sleep(0.5) break - futures = client.client._futures - client.client.close() + futures = client._futures + client.close() return futures @@ -334,8 +325,8 @@ def write_dict_to_csv(futures_dict, filename): writer.writerow(row) def main(): - ds = DeathstarDemo("deathstar", "ds-out") - ds.init_runtime() + ds = DeathstarDemo() + ds.init_runtime(FlinkRuntime("deathstar", "ds-out"), bundle_time=5, bundle_size=10) ds.runtime.run(run_async=True) ds.populate() diff --git a/deathstar/demo_python.py b/deathstar/demo_python.py new file mode 100644 index 0000000..80e687c --- /dev/null +++ b/deathstar/demo_python.py @@ -0,0 +1,109 @@ +import time +import sys +import os + +# import cascade +sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "../src"))) + + +from cascade.runtime.python_runtime import PythonRuntime +from deathstar.demo import DeathstarDemo, deathstar_workload_generator +from timeit import default_timer as timer +import csv + +messages_per_second = 10 +sleeps_per_second = 10 +sleep_time = 0.0085 +seconds = 10 + +def benchmark_runner(runtime) -> dict[int, dict]: + + deathstar_generator = deathstar_workload_generator() + # futures: dict[int, dict] = {} + start = timer() + for _ in range(seconds): + sec_start = timer() + for i in range(messages_per_second): + if i % (messages_per_second // sleeps_per_second) == 0: + time.sleep(sleep_time) + event = next(deathstar_generator) + # func_name = event.dataflow.name if event.dataflow is not None else "login" # only login has no dataflow + key = event.key_stack[0] + # params = event.variable_map + runtime.send(event) + # futures[event._id] = {"event": f'{func_name} {key}->{params}'} + + sec_end = timer() + lps = sec_end - sec_start + if lps < 1: + time.sleep(1 - lps) + sec_end2 = timer() + print(f'Latency per second: {sec_end2 - sec_start}') + end = timer() + print(f'Average latency per second: {(end - start) / seconds}') + + # done = False + # while not done: + # done = True + # for event_id, fut in client._futures.items(): + # result = fut["ret"] + # if result is None: + # done = False + # time.sleep(0.5) + # break + # futures = client._futures + # client.close() + # return futures + + +def write_dict_to_csv(futures_dict, filename): + """ + Writes a dictionary of event data to a CSV file. + + Args: + futures_dict (dict): A dictionary where each key is an event ID and the value is another dict. + filename (str): The name of the CSV file to write to. + """ + # Define the column headers + headers = ["event_id", "sent", "sent_t", "ret", "ret_t", "latency"] + + # Open the file for writing + with open(filename, mode='w', newline='', encoding='utf-8') as csvfile: + writer = csv.DictWriter(csvfile, fieldnames=headers) + + # Write the headers + writer.writeheader() + + # Write the data rows + for event_id, event_data in futures_dict.items(): + # Prepare a row where the 'event_id' is the first column + row = { + "event_id": event_id, + "sent": event_data.get("sent"), + "sent_t": event_data.get("sent_t"), + "ret": event_data.get("ret"), + "ret_t": event_data.get("ret_t"), + "latency": event_data["ret_t"][1] - event_data["sent_t"][1] + } + writer.writerow(row) + +def test_python_runtime(): + ds = DeathstarDemo() + ds.init_runtime(PythonRuntime()) + ds.populate() + + + time.sleep(1) + input() + + results = benchmark_runner(ds.runtime) + + print(results) + t = len(results) + r = 0 + for result in results.values(): + if result["ret"] is not None: + print(result) + r += 1 + print(f"{r}/{t} results recieved.") + write_dict_to_csv(results, "test2.csv") diff --git a/deathstar/test_demo.py b/deathstar/test_demo.py index a7e674e..31e5f97 100644 --- a/deathstar/test_demo.py +++ b/deathstar/test_demo.py @@ -1,48 +1,95 @@ -from deathstar.demo import DeathstarDemo, DeathstarClient +import os +import sys + +# import cascade +sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "../src"))) + +from cascade.runtime.python_runtime import PythonClientSync, PythonRuntime +from cascade.runtime.flink_runtime import FlinkClientSync, FlinkRuntime +from deathstar.demo import DeathstarDemo, recommend, reserve, search_hotel, user_login import time import pytest @pytest.mark.integration def test_deathstar_demo(): - ds = DeathstarDemo("deathstardemo-test", "dsd-out") - ds.init_runtime() + ds = DeathstarDemo() + ds.init_runtime(FlinkRuntime("deathstardemo-test", "dsd-out")) ds.runtime.run(run_async=True) print("Populating, press enter to go to the next step when done") ds.populate() - client = DeathstarClient("deathstardemo-test", "dsd-out") + client = FlinkClientSync("deathstardemo-test", "dsd-out") input() print("testing user login") - event = client.user_login() + event = user_login() client.send(event) input() print("testing reserve") - event = client.reserve() + event = reserve() client.send(event) input() print("testing search") - event = client.search_hotel() + event = search_hotel() client.send(event) input() print("testing recommend (distance)") time.sleep(0.5) - event = client.recommend(req_param="distance") + event = recommend(req_param="distance") client.send(event) input() print("testing recommend (price)") time.sleep(0.5) - event = client.recommend(req_param="price") + event = recommend(req_param="price") client.send(event) - print(client.client._futures) + print(client._futures) input() print("done!") - print(client.client._futures) + print(client._futures) + +def test_deathstar_demo_python(): + ds = DeathstarDemo() + ds.init_runtime(PythonRuntime()) + ds.runtime.run() + print("Populating, press enter to go to the next step when done") + ds.populate() + + time.sleep(2) + + client = PythonClientSync(ds.runtime) + print("testing user login") + event = user_login() + result = client.send(event) + assert result == True + + print("testing reserve") + event = reserve() + result = client.send(event) + assert result == True + + print("testing search") + event = search_hotel() + result = client.send(event) + print(result) + + print("testing recommend (distance)") + time.sleep(0.5) + event = recommend(req_param="distance") + result = client.send(event) + print(result) + + print("testing recommend (price)") + time.sleep(0.5) + event = recommend(req_param="price") + result = client.send(event) + print(result) + + print("done!") if __name__ == "__main__": diff --git a/src/cascade/dataflow/dataflow.py b/src/cascade/dataflow/dataflow.py index 2e3d890..7e45dbd 100644 --- a/src/cascade/dataflow/dataflow.py +++ b/src/cascade/dataflow/dataflow.py @@ -59,14 +59,6 @@ class SelectAllNode(Node): collect_target: 'CollectNode' -@dataclass -class MergeNode(Node): - """A node in a `Dataflow` corresponding to a merge operator. - - It will aggregate incoming edges and output them as a list to the outgoing edge. - Their actual implementation is runtime-dependent.""" - pass - @dataclass class CollectNode(Node): """A node in a `Dataflow` corresponding to a merge operator. diff --git a/src/cascade/dataflow/test_dataflow.py b/src/cascade/dataflow/test_dataflow.py index 9458c70..3b5cba3 100644 --- a/src/cascade/dataflow/test_dataflow.py +++ b/src/cascade/dataflow/test_dataflow.py @@ -1,5 +1,5 @@ from typing import Any -from cascade.dataflow.dataflow import DataFlow, Edge, Event, EventResult, InvokeMethod, MergeNode, OpNode +from cascade.dataflow.dataflow import DataFlow, Edge, Event, EventResult, InvokeMethod, OpNode from cascade.dataflow.operator import StatefulOperator class DummyUser: diff --git a/src/cascade/runtime/flink_runtime.py b/src/cascade/runtime/flink_runtime.py index c97acdf..fd74279 100644 --- a/src/cascade/runtime/flink_runtime.py +++ b/src/cascade/runtime/flink_runtime.py @@ -12,7 +12,7 @@ from pyflink.datastream.connectors.kafka import KafkaOffsetsInitializer, KafkaRecordSerializationSchema, KafkaSource, KafkaSink from pyflink.datastream import ProcessFunction, StreamExecutionEnvironment import pickle -from cascade.dataflow.dataflow import Arrived, CollectNode, CollectTarget, Event, EventResult, Filter, InitClass, InvokeMethod, MergeNode, Node, NotArrived, OpNode, Operator, Result, SelectAllNode +from cascade.dataflow.dataflow import Arrived, CollectNode, CollectTarget, Event, EventResult, Filter, InitClass, InvokeMethod, Node, NotArrived, OpNode, Result, SelectAllNode from cascade.dataflow.operator import StatefulOperator, StatelessOperator from confluent_kafka import Producer, Consumer import logging @@ -171,9 +171,8 @@ def process_element(self, event: Event, ctx: 'ProcessFunction.Context'): class FlinkCollectOperator(KeyedProcessFunction): """Flink implementation of a merge operator.""" - def __init__(self): #, merge_node: MergeNode) -> None: + def __init__(self): self.collection: ValueState = None # type: ignore (expect state to be initialised on .open()) - #self.node = merge_node def open(self, runtime_context: RuntimeContext): descriptor = ValueStateDescriptor("merge_state", Types.PICKLED_BYTE_ARRAY()) @@ -442,7 +441,7 @@ def init(self, kafka_broker="localhost:9092", bundle_time=1, bundle_size=5, para .process(FlinkCollectOperator()) .name("Collect") ) - """Stream that ingests events with an `cascade.dataflow.dataflow.MergeNode` target""" + """Stream that ingests events with an `cascade.dataflow.dataflow.CollectNode` target""" self.stateless_op_streams = [] self.stateful_op_streams = [] @@ -451,9 +450,10 @@ def init(self, kafka_broker="localhost:9092", bundle_time=1, bundle_size=5, para self.producer = Producer({'bootstrap.servers': kafka_broker}) logger.debug("FlinkRuntime initialized") - def add_operator(self, flink_op: FlinkOperator): + def add_operator(self, op: StatefulOperator): """Add a `FlinkOperator` to the Flink datastream.""" - + flink_op = FlinkOperator(op) + op_stream = ( self.stateful_op_stream.filter(lambda e: e.target.operator.entity == flink_op.operator.entity) .key_by(lambda e: e.key_stack[-1]) @@ -462,9 +462,10 @@ def add_operator(self, flink_op: FlinkOperator): ) self.stateful_op_streams.append(op_stream) - def add_stateless_operator(self, flink_op: FlinkStatelessOperator): + def add_stateless_operator(self, op: StatelessOperator): """Add a `FlinkStatelessOperator` to the Flink datastream.""" - + flink_op = FlinkStatelessOperator(op) + op_stream = ( self.stateless_op_stream .filter(lambda e: e.target.operator.dataflow.name == flink_op.operator.dataflow.name) diff --git a/src/cascade/runtime/python_runtime.py b/src/cascade/runtime/python_runtime.py new file mode 100644 index 0000000..cf936f3 --- /dev/null +++ b/src/cascade/runtime/python_runtime.py @@ -0,0 +1,156 @@ +from logging import Filter +import threading +from cascade.dataflow.operator import StatefulOperator, StatelessOperator +from cascade.dataflow.dataflow import CollectNode, Event, EventResult, InitClass, InvokeMethod, OpNode, SelectAllNode +from queue import Empty, Queue + +class PythonStatefulOperator(): + def __init__(self, operator: StatefulOperator): + self.operator = operator + self.states = {} + + def process(self, event: Event): + assert(isinstance(event.target, OpNode)) + assert(isinstance(event.target.operator, StatefulOperator)) + assert(event.target.operator.entity == self.operator.entity) + key_stack = event.key_stack + key = key_stack[-1] + + print(f"PythonStatefulOperator: {event}") + + if isinstance(event.target.method_type, InitClass): + result = self.operator.handle_init_class(*event.variable_map.values()) + self.states[key] = result + key_stack.pop() + + elif isinstance(event.target.method_type, InvokeMethod): + state = self.states[key] + result = self.operator.handle_invoke_method( + event.target.method_type, + variable_map=event.variable_map, + state=state, + key_stack=key_stack + ) + self.states[key] = state + + elif isinstance(event.target.method_type, Filter): + raise NotImplementedError() + + if event.target.assign_result_to is not None: + event.variable_map[event.target.assign_result_to] = result + + new_events = event.propogate(key_stack, result) + if isinstance(new_events, EventResult): + yield new_events + else: + yield from new_events + +class PythonStatelessOperator(): + def __init__(self, operator: StatelessOperator): + self.operator = operator + + def process(self, event: Event): + assert(isinstance(event.target, OpNode)) + assert(isinstance(event.target.operator, StatelessOperator)) + + key_stack = event.key_stack + + + if isinstance(event.target.method_type, InvokeMethod): + result = self.operator.handle_invoke_method( + event.target.method_type, + variable_map=event.variable_map, + key_stack=key_stack + ) + else: + raise Exception(f"A StatelessOperator cannot compute event type: {event.target.method_type}") + + if event.target.assign_result_to is not None: + event.variable_map[event.target.assign_result_to] = result + + new_events = event.propogate(key_stack, result) + if isinstance(new_events, EventResult): + yield new_events + else: + yield from new_events + + +class PythonRuntime(): + """Simple non-distributed runtime meant for testing that runs Dataflows locally.""" + def __init__(self): + self.events = Queue() + self.results = Queue() + self.running = False + self.statefuloperators: dict[StatefulOperator, PythonStatefulOperator] = {} + self.statelessoperators: dict[StatelessOperator, PythonStatelessOperator] = {} + + def init(self): + pass + + def _consume_events(self): + self.running = True + def consume_event(event: Event): + if isinstance(event.target, OpNode): + if isinstance(event.target.operator, StatefulOperator): + yield from self.statefuloperators[event.target.operator].process(event) + elif isinstance(event.target.operator, StatelessOperator): + yield from self.statelessoperators[event.target.operator].process(event) + + elif isinstance(event.target, SelectAllNode): + raise NotImplementedError() + elif isinstance(event.target, CollectNode): + raise NotImplementedError() + + + events = [] + while self.running: + if len(events) == 0: + try: + event: Event = self.events.get(timeout=1) + except Empty: + continue + else: + event = events.pop() + + for ev in consume_event(event): + if isinstance(ev, EventResult): + print(ev) + self.results.put(ev) + elif isinstance(ev, Event): + events.append(ev) + + def add_operator(self, op: StatefulOperator): + """Add a `StatefulOperator` to the datastream.""" + self.statefuloperators[op] = PythonStatefulOperator(op) + + def add_stateless_operator(self, op: StatelessOperator): + """Add a `StatelessOperator` to the datastream.""" + self.statelessoperators[op] = PythonStatelessOperator(op) + + def send(self, event: Event, flush=None): + self.events.put(event) + + def run(self): + self.thread = threading.Thread(target=self._consume_events, daemon=True) + self.thread.start() + + def stop(self): + self.running = False + self.thread.join() + +class PythonClientSync: + def __init__(self, runtime: PythonRuntime): + self._results_q = runtime.results + self._events = runtime.events + self.results = {} + + def send(self, event: Event, block=True): + self._events.put(event) + + while block: + er: EventResult = self._results_q.get(block=True) + if event._id == er.event_id: + self.results[er.event_id] = er.result + return er.result + + \ No newline at end of file diff --git a/tests/integration/flink-runtime/common.py b/tests/integration/flink-runtime/common.py index 105fdbd..49a0ef3 100644 --- a/tests/integration/flink-runtime/common.py +++ b/tests/integration/flink-runtime/common.py @@ -1,5 +1,5 @@ from typing import Any -from cascade.dataflow.dataflow import CollectNode, CollectTarget, DataFlow, Edge, InvokeMethod, MergeNode, OpNode +from cascade.dataflow.dataflow import CollectNode, CollectTarget, DataFlow, Edge, InvokeMethod, OpNode from cascade.runtime.flink_runtime import StatefulOperator class User: diff --git a/tests/integration/flink-runtime/test_select_all.py b/tests/integration/flink-runtime/test_select_all.py index 62c371e..f585092 100644 --- a/tests/integration/flink-runtime/test_select_all.py +++ b/tests/integration/flink-runtime/test_select_all.py @@ -8,7 +8,7 @@ from pyflink.datastream.data_stream import CloseableIterator -from cascade.dataflow.dataflow import CollectNode, DataFlow, Edge, Event, EventResult, Filter, InitClass, InvokeMethod, MergeNode, OpNode, SelectAllNode +from cascade.dataflow.dataflow import CollectNode, DataFlow, Edge, Event, EventResult, InitClass, InvokeMethod, OpNode, SelectAllNode from cascade.dataflow.operator import StatefulOperator, StatelessOperator from cascade.runtime.flink_runtime import FlinkOperator, FlinkRuntime, FlinkStatelessOperator from confluent_kafka import Producer From 621d9409eb05ab39067609e05bafb19c5b2e63c4 Mon Sep 17 00:00:00 2001 From: lucasvanmol Date: Fri, 17 Jan 2025 13:06:51 +0100 Subject: [PATCH 3/3] temp fix for python runtime test --- deathstar/demo.py | 4 ++-- deathstar/test_demo.py | 4 ++++ 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/deathstar/demo.py b/deathstar/demo.py index 79df850..a284ee7 100644 --- a/deathstar/demo.py +++ b/deathstar/demo.py @@ -207,10 +207,10 @@ def recommend(req_param=None): return Event(recommend_op.dataflow.entry, ["tempkey"], {"requirement": req_param, "lat": lat, "lon": lon}, recommend_op.dataflow) -def user_login(): +def user_login(succesfull=True): user_id = random.randint(0, 500) username = f"Cornell_{user_id}" - password = str(user_id) * 10 + password = str(user_id) * 10 if succesfull else "" return Event(OpNode(user_op, InvokeMethod("login")), [username], {"password": password}, None) def reserve(): diff --git a/deathstar/test_demo.py b/deathstar/test_demo.py index 31e5f97..a3afd4a 100644 --- a/deathstar/test_demo.py +++ b/deathstar/test_demo.py @@ -66,12 +66,16 @@ def test_deathstar_demo_python(): event = user_login() result = client.send(event) assert result == True + event = user_login(succesfull=False) + result = client.send(event) + assert result == False print("testing reserve") event = reserve() result = client.send(event) assert result == True + return print("testing search") event = search_hotel() result = client.send(event)