Skip to content

Commit

Permalink
Merge pull request #7 from delftdata/benchmarking
Browse files Browse the repository at this point in the history
Add python runtime & benchmarking
  • Loading branch information
lucasvanmol authored Jan 20, 2025
2 parents 88447d4 + 621d940 commit 1746feb
Show file tree
Hide file tree
Showing 10 changed files with 438 additions and 127 deletions.
172 changes: 82 additions & 90 deletions deathstar/demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import os
import time
import csv
from confluent_kafka import Producer
from timeit import default_timer as timer
from multiprocessing import Pool

Expand All @@ -20,19 +19,19 @@


class DeathstarDemo():
def __init__(self, input_topic, output_topic):
def __init__(self):
self.init_user = OpNode(user_op, InitClass())
self.init_hotel = OpNode(hotel_op, InitClass())
self.init_flight = OpNode(flight_op, InitClass())
self.runtime = FlinkRuntime(input_topic, output_topic)

def init_runtime(self):
self.runtime.init(bundle_time=100, bundle_size=1000)
self.runtime.add_operator(FlinkOperator(hotel_op))
self.runtime.add_operator(FlinkOperator(flight_op))
self.runtime.add_operator(FlinkOperator(user_op))
self.runtime.add_stateless_operator(FlinkStatelessOperator(search_op))
self.runtime.add_stateless_operator(FlinkStatelessOperator(recommend_op))
def init_runtime(self, runtime, **kwargs):
self.runtime = runtime
self.runtime.init(**kwargs)
self.runtime.add_operator(hotel_op)
self.runtime.add_operator(flight_op)
self.runtime.add_operator(user_op)
self.runtime.add_stateless_operator(search_op)
self.runtime.add_stateless_operator(recommend_op)


def populate(self):
Expand Down Expand Up @@ -176,80 +175,72 @@ def populate(self):
}, None)
self.runtime.send(event, flush=True)

class DeathstarClient:
def __init__(self, input_topic="input-topic", output_topic="output-topic", kafka_broker="localhost:9092"):
self.client = FlinkClientSync(input_topic, output_topic, kafka_broker, True)
self.producer = Producer({'bootstrap.servers': kafka_broker})

def send(self, event: Event, flush: bool = False):
return self.client.send(event, flush)

def search_hotel(self):
in_date = random.randint(9, 23)
out_date = random.randint(in_date + 1, 24)

if in_date < 10:
in_date_str = f"2015-04-0{in_date}"
def search_hotel():
in_date = random.randint(9, 23)
out_date = random.randint(in_date + 1, 24)

if in_date < 10:
in_date_str = f"2015-04-0{in_date}"
else:
in_date_str = f"2015-04-{in_date}"
if out_date < 10:
out_date_str = f"2015-04-0{out_date}"
else:
out_date_str = f"2015-04-{out_date}"

lat = 38.0235 + (random.randint(0, 481) - 240.5) / 1000.0
lon = -122.095 + (random.randint(0, 325) - 157.0) / 1000.0

# We don't really use the in_date, out_date information
return Event(search_op.dataflow.entry, ["tempkey"], {"lat": lat, "lon": lon}, search_op.dataflow)

def recommend(req_param=None):
if req_param is None:
coin = random.random()
if coin < 0.5:
req_param = "distance"
else:
in_date_str = f"2015-04-{in_date}"
if out_date < 10:
out_date_str = f"2015-04-0{out_date}"
else:
out_date_str = f"2015-04-{out_date}"

lat = 38.0235 + (random.randint(0, 481) - 240.5) / 1000.0
lon = -122.095 + (random.randint(0, 325) - 157.0) / 1000.0

# We don't really use the in_date, out_date information
return Event(search_op.dataflow.entry, ["tempkey"], {"lat": lat, "lon": lon}, search_op.dataflow)
req_param = "price"

def recommend(self, req_param=None):
if req_param is None:
coin = random.random()
if coin < 0.5:
req_param = "distance"
else:
req_param = "price"
lat = 38.0235 + (random.randint(0, 481) - 240.5) / 1000.0
lon = -122.095 + (random.randint(0, 325) - 157.0) / 1000.0

lat = 38.0235 + (random.randint(0, 481) - 240.5) / 1000.0
lon = -122.095 + (random.randint(0, 325) - 157.0) / 1000.0
return Event(recommend_op.dataflow.entry, ["tempkey"], {"requirement": req_param, "lat": lat, "lon": lon}, recommend_op.dataflow)

return Event(recommend_op.dataflow.entry, ["tempkey"], {"requirement": req_param, "lat": lat, "lon": lon}, recommend_op.dataflow)
def user_login(succesfull=True):
user_id = random.randint(0, 500)
username = f"Cornell_{user_id}"
password = str(user_id) * 10 if succesfull else ""
return Event(OpNode(user_op, InvokeMethod("login")), [username], {"password": password}, None)

def user_login(self):
user_id = random.randint(0, 500)
username = f"Cornell_{user_id}"
password = str(user_id) * 10
return Event(OpNode(user_op, InvokeMethod("login")), [username], {"password": password}, None)

def reserve(self):
hotel_id = random.randint(0, 99)
flight_id = random.randint(0, 99)

# user = User("user1", "pass")
# user.order(flight, hotel)
user_id = "Cornell_" + str(random.randint(0, 500))

return Event(user_op.dataflows["order"].entry, [user_id], {"flight": str(flight_id), "hotel": str(hotel_id)}, user_op.dataflows["order"])

def deathstar_workload_generator(self):
search_ratio = 0.6
recommend_ratio = 0.39
user_ratio = 0.005
reserve_ratio = 0.005
c = 0
while True:
coin = random.random()
if coin < search_ratio:
yield self.search_hotel()
elif coin < search_ratio + recommend_ratio:
yield self.recommend()
elif coin < search_ratio + recommend_ratio + user_ratio:
yield self.user_login()
else:
yield self.reserve()
c += 1
def reserve():
hotel_id = random.randint(0, 99)
flight_id = random.randint(0, 99)

# user = User("user1", "pass")
# user.order(flight, hotel)
user_id = "Cornell_" + str(random.randint(0, 500))

return Event(user_op.dataflows["order"].entry, [user_id], {"flight": str(flight_id), "hotel": str(hotel_id)}, user_op.dataflows["order"])

def deathstar_workload_generator():
search_ratio = 0.6
recommend_ratio = 0.39
user_ratio = 0.005
reserve_ratio = 0.005
c = 0
while True:
coin = random.random()
if coin < search_ratio:
yield search_hotel()
elif coin < search_ratio + recommend_ratio:
yield recommend()
elif coin < search_ratio + recommend_ratio + user_ratio:
yield user_login()
else:
yield reserve()
c += 1

threads = 1
messages_per_second = 10
sleeps_per_second = 10
Expand All @@ -259,8 +250,8 @@ def deathstar_workload_generator(self):

def benchmark_runner(proc_num) -> dict[int, dict]:
print(f'Generator: {proc_num} starting')
client = DeathstarClient("deathstar", "ds-out")
deathstar_generator = client.deathstar_workload_generator()
client = FlinkClientSync("deathstar", "ds-out", "localhost:9092", True)
deathstar_generator = deathstar_workload_generator()
# futures: dict[int, dict] = {}
start = timer()
for _ in range(seconds):
Expand All @@ -270,12 +261,12 @@ def benchmark_runner(proc_num) -> dict[int, dict]:
time.sleep(sleep_time)
event = next(deathstar_generator)
# func_name = event.dataflow.name if event.dataflow is not None else "login" # only login has no dataflow
# key = event.key_stack[0]
key = event.key_stack[0]
# params = event.variable_map
client.send(event)
# futures[event._id] = {"event": f'{func_name} {key}->{params}'}
# styx.flush()

client.flush()
sec_end = timer()
lps = sec_end - sec_start
if lps < 1:
Expand All @@ -291,14 +282,14 @@ def benchmark_runner(proc_num) -> dict[int, dict]:
done = False
while not done:
done = True
for event_id, fut in client.client._futures.items():
for event_id, fut in client._futures.items():
result = fut["ret"]
if result is None:
done = False
time.sleep(0.5)
break
futures = client.client._futures
client.client.close()
futures = client._futures
client.close()
return futures


Expand All @@ -311,7 +302,7 @@ def write_dict_to_csv(futures_dict, filename):
filename (str): The name of the CSV file to write to.
"""
# Define the column headers
headers = ["event_id", "sent", "sent_t", "ret", "ret_t"]
headers = ["event_id", "sent", "sent_t", "ret", "ret_t", "latency"]

# Open the file for writing
with open(filename, mode='w', newline='', encoding='utf-8') as csvfile:
Expand All @@ -328,13 +319,14 @@ def write_dict_to_csv(futures_dict, filename):
"sent": event_data.get("sent"),
"sent_t": event_data.get("sent_t"),
"ret": event_data.get("ret"),
"ret_t": event_data.get("ret_t")
"ret_t": event_data.get("ret_t"),
"latency": event_data["ret_t"][1] - event_data["sent_t"][1]
}
writer.writerow(row)

def main():
ds = DeathstarDemo("deathstar", "ds-out")
ds.init_runtime()
ds = DeathstarDemo()
ds.init_runtime(FlinkRuntime("deathstar", "ds-out"), bundle_time=5, bundle_size=10)
ds.runtime.run(run_async=True)
ds.populate()

Expand Down
109 changes: 109 additions & 0 deletions deathstar/demo_python.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import time
import sys
import os

# import cascade
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "../src")))


from cascade.runtime.python_runtime import PythonRuntime
from deathstar.demo import DeathstarDemo, deathstar_workload_generator
from timeit import default_timer as timer
import csv

messages_per_second = 10
sleeps_per_second = 10
sleep_time = 0.0085
seconds = 10

def benchmark_runner(runtime) -> dict[int, dict]:

deathstar_generator = deathstar_workload_generator()
# futures: dict[int, dict] = {}
start = timer()
for _ in range(seconds):
sec_start = timer()
for i in range(messages_per_second):
if i % (messages_per_second // sleeps_per_second) == 0:
time.sleep(sleep_time)
event = next(deathstar_generator)
# func_name = event.dataflow.name if event.dataflow is not None else "login" # only login has no dataflow
key = event.key_stack[0]
# params = event.variable_map
runtime.send(event)
# futures[event._id] = {"event": f'{func_name} {key}->{params}'}

sec_end = timer()
lps = sec_end - sec_start
if lps < 1:
time.sleep(1 - lps)
sec_end2 = timer()
print(f'Latency per second: {sec_end2 - sec_start}')
end = timer()
print(f'Average latency per second: {(end - start) / seconds}')

# done = False
# while not done:
# done = True
# for event_id, fut in client._futures.items():
# result = fut["ret"]
# if result is None:
# done = False
# time.sleep(0.5)
# break
# futures = client._futures
# client.close()
# return futures


def write_dict_to_csv(futures_dict, filename):
"""
Writes a dictionary of event data to a CSV file.
Args:
futures_dict (dict): A dictionary where each key is an event ID and the value is another dict.
filename (str): The name of the CSV file to write to.
"""
# Define the column headers
headers = ["event_id", "sent", "sent_t", "ret", "ret_t", "latency"]

# Open the file for writing
with open(filename, mode='w', newline='', encoding='utf-8') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=headers)

# Write the headers
writer.writeheader()

# Write the data rows
for event_id, event_data in futures_dict.items():
# Prepare a row where the 'event_id' is the first column
row = {
"event_id": event_id,
"sent": event_data.get("sent"),
"sent_t": event_data.get("sent_t"),
"ret": event_data.get("ret"),
"ret_t": event_data.get("ret_t"),
"latency": event_data["ret_t"][1] - event_data["sent_t"][1]
}
writer.writerow(row)

def test_python_runtime():
ds = DeathstarDemo()
ds.init_runtime(PythonRuntime())
ds.populate()


time.sleep(1)
input()

results = benchmark_runner(ds.runtime)

print(results)
t = len(results)
r = 0
for result in results.values():
if result["ret"] is not None:
print(result)
r += 1
print(f"{r}/{t} results recieved.")
write_dict_to_csv(results, "test2.csv")
Loading

0 comments on commit 1746feb

Please sign in to comment.