Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RPC] Fetch TPCH profiled info for application at runtime #92

Merged
merged 9 commits into from
Mar 2, 2024
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file added profiles/workload/tpch_decima/2g/task_duration_6.npy
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
27 changes: 27 additions & 0 deletions profiles/workload/tpch_decima/query_dag.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
{
"workload_type": "tpch",
"query_number": {
"1": "[(0, 1), (1, 2)]",
"2": "[(0, 2), (0, 9), (1, 2), (2, 4), (3, 4), (4, 7), (5, 7), (5, 13), (6, 15), (7, 15), (8, 9), (9, 11), (10, 11), (11, 13), (12, 14), (13, 14), (14, 16), (15, 16)]",
"3": "[(0, 2), (1, 2), (2, 4), (3, 4)]",
"4": "[(0, 2), (1, 2), (2, 3), (3, 4)]",
"5": "[(0, 2), (1, 2), (2, 4), (3, 4), (4, 6), (5, 6), (6, 8), (7, 8), (8, 10), (9, 10), (10, 11), (11, 12)]",
"6": "[(0, 1)]",
"7": "[(0, 2), (0, 5), (1, 2), (2, 8), (3, 8), (4, 5), (5, 7), (6, 7), (7, 9), (8, 9), (9, 10), (10, 11)]",
"8": "[(0, 5), (1, 5), (2, 4), (3, 4), (4, 13), (5, 13), (6, 8), (7, 8), (8, 10), (9, 10), (10, 12), (11, 12), (12, 14), (13, 14), (14, 15), (15, 16)]",
"9": "[(0, 5), (1, 5), (2, 4), (3, 4), (4, 6), (5, 6), (6, 8), (7, 8), (8, 10), (9, 10), (10, 11), (11, 12)]",
"10": "[(0, 2), (1, 2), (2, 4), (3, 4), (4, 6), (5, 6), (6, 7)]",
"11": "",
"12": "[(0, 2), (1, 2), (2, 3), (3, 4)]",
"13": "[(0, 2), (1, 2), (2, 3), (3, 4), (4, 5)]",
"14": "[(0, 2), (1, 2)]",
"15": "",
"16": "[(0, 2), (1, 2), (2, 4), (3, 4), (4, 5), (5, 6), (6, 7)]",
"17": "",
"18": "",
"19": "[(0, 2), (1, 2)]",
"20": "",
"21": "",
"22": ""
}
}
76 changes: 68 additions & 8 deletions rpc/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import erdos_scheduler_pb2_grpc
import grpc
from absl import app, flags
from tpch_utils import get_all_stage_info_for_query, verify_and_relable_tpch_app_graph

from schedulers import EDFScheduler, FIFOScheduler
from utils import EventTime, setup_logging
Expand Down Expand Up @@ -367,10 +368,62 @@ async def RegisterTaskGraph(self, request, context):
num_executors=0,
)

self._logger.info(
"Attempting to register application ID %s with name %s",
request.id,
request.name,
)
# Check if query is from TPC-H workload.
# If yes, retrieve profiled slots and runtime info. If no, use default values
is_tpch_query = False
tpch_query_all_stage_info = None
if request.name.startswith("TPCH_"):
is_tpch_query = True
# retrieve tasks-per-stage and runtime info based on query number
tpch_query_num = request.name.split("TPCH_Q", 1)[1]
tpch_query_all_stage_info = get_all_stage_info_for_query(tpch_query_num)
same_structure, stage_id_mapping = verify_and_relable_tpch_app_graph(
query_num=tpch_query_num, dependencies=request.dependencies
)

# return failure message if not tpch app isnt of same DAG structure
if not same_structure:
self._logger.warning(
"TPCH application with ID %s and name %s couldn't be registered."
"DAG structure mismatch!",
request.id,
request.name,
)
return erdos_scheduler_pb2.RegisterTaskGraphResponse(
success=False,
message=f"TPCH application ID {request.id} with name {request.name}"
f" couldn't be registered. DAG structure mismatch!",
num_executors=0,
)

# Construct all the Tasks for the TaskGraph.
task_ids_to_task: Mapping[int, Task] = {}
default_resource = Resources(
resource_vector={Resource(name="Slot_CPU", _id="any"): 30}
)
default_runtime = EventTime(20, EventTime.Unit.US)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
default_runtime = EventTime(20, EventTime.Unit.US)
default_runtime = EventTime(20, EventTime.Unit.S)

Do you have any runtimes that are lower than a second?


for task_dependency in request.dependencies:
framework_task = task_dependency.key
if is_tpch_query:
mapped_stage_id = stage_id_mapping[framework_task.id]
task_slots = tpch_query_all_stage_info[mapped_stage_id]["num_tasks"]
task_runtime = tpch_query_all_stage_info[mapped_stage_id][
"avg_task_duration"
]
self._logger.info(
"Creating Task for given app TPCH stage: %s, mapped to "
"original stage id %s, with tasks: %s and avg runtime: %s",
framework_task.id,
mapped_stage_id,
task_slots,
task_runtime,
)
task_ids_to_task[framework_task.id] = Task(
name=framework_task.name,
task_graph=request.id,
Expand All @@ -380,17 +433,24 @@ async def RegisterTaskGraph(self, request, context):
name=f"WorkProfile_{framework_task.name}",
execution_strategies=ExecutionStrategies(
[
# TODO (Sukrit): Find the actual resource requirements
# for the particular TaskGraph, along with the expected
# runtime and set it here.
ExecutionStrategy(
resources=Resources(
resource_vector={
Resource(name="Slot_CPU", _id="any"): 30
}
resources=(
default_resource
if not is_tpch_query
else Resources(
resource_vector={
Resource(
name="Slot_CPU", _id="any"
): task_slots
}
)
),
batch_size=1,
runtime=EventTime(20, EventTime.Unit.US),
runtime=(
default_runtime
if not is_tpch_query
else EventTime(task_runtime, EventTime.Unit.US)
),
)
]
),
Expand Down
170 changes: 170 additions & 0 deletions rpc/tpch_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
# Code adopted from decima-sim to use their profiles of TPC-H queries

import ast
import json
import os
from typing import Mapping, Sequence

import networkx as nx
import numpy as np

HOME_TPCH_DIR = "../profiles/workload/tpch_decima/"
TPCH_SUBDIR = "2g/"


class SetWithCount(object):
"""
allow duplication in set
"""

def __init__(self):
self.set = {}

def __contains__(self, item):
return item in self.set

def add(self, item):
if item in self.set:
self.set[item] += 1
else:
self.set[item] = 1

def clear(self):
self.set.clear()

def remove(self, item):
self.set[item] -= 1
if self.set[item] == 0:
del self.set[item]


def pre_process_task_duration(task_duration):
# remove fresh durations from first wave
clean_first_wave = {}
for e in task_duration["first_wave"]:
clean_first_wave[e] = []
fresh_durations = SetWithCount()
# O(1) access
for d in task_duration["fresh_durations"][e]:
fresh_durations.add(d)
for d in task_duration["first_wave"][e]:
if d not in fresh_durations:
clean_first_wave[e].append(d)
else:
# prevent duplicated fresh duration blocking first wave
fresh_durations.remove(d)


def get_all_stage_info_for_query(query_num):
task_durations = np.load(
os.path.join(
HOME_TPCH_DIR, TPCH_SUBDIR, "task_duration_" + str(query_num) + ".npy"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we get the TPCH_SUBDIR label from the application name?

We might want to run a mix of TPC-H queries from different scaling sizes.

),
allow_pickle=True,
).item()

num_nodes = len(task_durations)

stage_info = {}

for n in range(num_nodes):
task_duration = task_durations[n]
e = next(iter(task_duration["first_wave"]))
# NOTE: somehow only picks the first element {2: [n_tasks_in_ms]}

num_tasks = len(task_duration["first_wave"][e]) + len(
task_duration["rest_wave"][e]
)

# remove fresh duration from first wave duration
# drag nearest neighbor first wave duration to empty spots
pre_process_task_duration(task_duration)
rough_duration = np.mean(
[i for t in task_duration["first_wave"].values() for i in t]
+ [i for t in task_duration["rest_wave"].values() for i in t]
+ [i for t in task_duration["fresh_durations"].values() for i in t]
)

curr_stage = {
"stage_id": n,
"num_tasks": num_tasks,
"avg_task_duration": round(rough_duration),
}
stage_info[n] = curr_stage

return stage_info


def get_base_tpch_graph_structure(query_num):
# use query_num to read string from file
with open(os.path.join(HOME_TPCH_DIR, "query_dag.json")) as f:
tpch_query_json = json.load(f)

# get query dependency from file
query_dependency = ast.literal_eval(tpch_query_json["query_number"][str(query_num)])

# convert job structure into a nx graph
base_tpch_graph = nx.DiGraph(query_dependency)

# return the job nx graph for query
return base_tpch_graph


def get_graph_from_deps(dependencies):
# parse dependencies to get it in list of tuples
# construct the TaskGraph from the dependencies.
task_graph_structure: Mapping[int, Sequence[int]] = {}
for task_dependency in dependencies:
task_graph_structure[task_dependency.key.id] = [
task_id for task_id in task_dependency.children_ids
]

# convert our TaskGraph into a nx friendly notation
all_edges_in_app = []
for parent in task_graph_structure.keys():
for child in task_graph_structure[parent]:
all_edges_in_app.append((parent, child))

# construct nx graph
given_app_tpch_graph = nx.DiGraph(all_edges_in_app)

# return the graph
return given_app_tpch_graph


def are_structurally_same(graph1, graph2):
# Step 1: Check if both graphs have the same number of vertices
if len(graph1.nodes) != len(graph2.nodes):
print(
f"DAG structure mismatch! Graph1 has "
f"{graph1.nodes} while Graph2 has {graph2.nodes}"
)
return False, None

# Step 2: Check if there exists a bijection between the vertices
# of the two graphs such that their adjacency relationships match
for mapping in nx.isomorphism.GraphMatcher(graph1, graph2).isomorphisms_iter():
# Check if the adjacency relationships match
if all(v in mapping for u, v in graph1.edges):
# graph structures match
# mapping is a dict {key=original-stage-id, val=app-stage-id}
# we reverse reversed mapping from app-stage-id to orig-stage-id
reversed_mapping = {v: k for k, v in mapping.items()}

return True, reversed_mapping

print("DAG structure mismatch! No mapping could be found")
return False, None


def verify_and_relable_tpch_app_graph(query_num, dependencies):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def verify_and_relable_tpch_app_graph(query_num, dependencies):
def verify_and_relabel_tpch_app_graph(query_num, dependencies):

Typo

# get nx graphs from base tpch file and application dependencies
base_tpch_graph = get_base_tpch_graph_structure(query_num)
app_graph = get_graph_from_deps(dependencies)

# verify that the graphs are isomorphic
# returns true if same_structure, along with stage_id_mapping wrt base tpch file
same_structure, stage_id_mapping = are_structurally_same(base_tpch_graph, app_graph)

# return with stage_id_mapping back to assign correct runtime and num_executors
return same_structure, stage_id_mapping
Loading