-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathprep_plan_set.py
203 lines (163 loc) · 9.48 KB
/
prep_plan_set.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
import psycopg2
import time
from psql_explain_decoder import decode
from parse import gen_final_hint
from prep_cardinality import *
from prep_selectivity import prep_sel
from prep_error_list import cal_rel_error
from utility import find_bin_id_from_err_hist_list
import json
from postgres import *
def enum(sensitive_dims, table_name_id_dict, join_maps, join_info, est_base_card, raw_base_card, f_base_sel, est_join_card, raw_join_card, f_join_sel, to_execute_, db_name, err_samples, recentered_err, debug=False):
os.system("cp ~/robust-vcm/cardinality/new_single.txt ~/imdb/")
conn = psycopg2.connect(host="/tmp", dbname=db_name, user="hx68")
conn.set_session(autocommit=True)
start_t = time.time()
cost_list = []
old_join_order = []
old_scan_mtd = set()
join_order_set = set()
scan_mtd_set = set()
plan_list = []
plan_set = set() # Plan = Join Order + Scan Mtd
plan_change_at_sel = []
est_card_all = est_base_card + est_join_card
raw_card_all = raw_base_card + raw_join_card
with conn.cursor() as cursor:
for i in range(len(err_samples)):
if i % 100 == 0: print(i)
cur_err = err_samples[i]
output_sel_list = [est_base_card[t] / raw_base_card[t] for t in range(len(est_base_card))]
output_sel_list_join = [est_join_card[t] / raw_join_card[t] for t in range(len(est_join_card))]
if debug: print(cur_err, sensitive_dims)
prep_sel(table_name_id_dict, join_maps, join_info,
est_base_sel=output_sel_list, f_base_sel=f_base_sel,
est_join_sel=output_sel_list_join, f_join_sel=f_join_sel,
error=cur_err, recentered_error=recentered_err, relation_list=sensitive_dims, rela_error=True)
try:
# TODO: join relation
os.system("cp ~/robust-vcm/cardinality/join.txt ~/imdb/")
os.system("cp ~/robust-vcm/cardinality/new_single.txt ~/imdb/")
os.system("cp ~/robust-vcm/cardinality/pointers.txt ~/imdb")
cursor.execute('DISCARD ALL;')
cursor.execute('SET enable_material = off')
# cursor.execute('SET top_n = 0')
cursor.execute("SET ml_cardest_enabled=true;")
cursor.execute("SET ml_joinest_enabled=true;")
cursor.execute("SET query_no=0;")
cursor.execute("SET join_est_no=0;")
cursor.execute("SET ml_cardest_fname='new_single.txt';")
cursor.execute("SET ml_joinest_fname='join.txt';")
cursor.execute("LOAD 'pg_hint_plan';")
# cursor.execute("SET print_single_tbl_queries=true;")
# cursor.execute("SET enable_nestloop = false;")
cursor.execute(to_execute_)
query_plan = cursor.fetchall()
# print(query_plan[0][0][0]['Plan']['Plans'])
### join_order is a string e.g. Nested Loop(Nested Loop(Nested Loop(Nested Loop(v,p),u),b),c)
### scan mtd is a list e.g. ['Seq Scan(v)', 'Index Scan(p)', 'Index Scan(u)', 'Index Scan(b)', 'Index Only Scan(c)']
join_order, _, scan_mtd = decode(query_plan[0][0][0]['Plan']['Plans'], query_plan[0][0][0]['Plan']['Node Type'])
# This hint is generated by the explain results
hint_ = gen_final_hint(scan_mtd=scan_mtd, str=join_order)
# if with_hint: assert hint_.translate(string.whitespace) == hint.translate(string.whitespace), hint + '---input hint \n' + hint_ + '---generated hint from pg_explain results\n' + str(i)
cost = query_plan[0][0][0]['Plan']['Total Cost']
plan = scan_mtd
plan.append(str(join_order))
# Find a new plan
if join_order not in join_order_set or frozenset(scan_mtd) not in scan_mtd_set:
if frozenset(plan) not in plan_set:
print(f" Cost at new sel: {cost} ##### Find New Plan" , plan)
print(cur_err, sensitive_dims)
plan_list.append(hint_)
plan_set.add(frozenset(plan))
else:
raise Exception(f"plan is: {plan}, plan set is: {plan_set}")
join_order_set.add(join_order)
scan_mtd_set.add(frozenset(scan_mtd))
if old_join_order != join_order:
if len(set(scan_mtd).difference(old_scan_mtd)) > 0:
if debug: print(f" #-# join order & scan method change")
old_scan_mtd = set(scan_mtd).copy()
else:
if debug: print(f" #-# Only join order change")
# print(f" #-# sel = {i/len(err_samples)} Only join order change from {old_join_order} to {join_order}")
old_join_order = join_order
plan_change_at_sel.append((i/len(err_samples), cost))
elif len(set(scan_mtd).difference(old_scan_mtd)) > 0:
print(f" #-# Only scan method change")
# print(f" #-# sel = {i/len(err_samples)} Scan Methods change from {old_scan_mtd} to {set(scan_mtd)}")
old_scan_mtd = set(scan_mtd).copy()
cost_list.append(cost)
# except psycopg2.OperationalError as e:
# print(to_execute_)
except psycopg2.errors.SyntaxError as e:
print(to_execute_)
end_t = time.time()
print(f"++++Time for planning {len(err_samples)} query: {end_t - start_t}(s)++++")
write_to_file([est_base_card[t] / raw_base_card[t] for t in range(len(est_base_card))], f_base_sel)
write_to_file([est_join_card[t] / raw_join_card[t] for t in range(len(est_join_card))], f_join_sel)
conn.close()
print(f"Unique Join Plan: {len(join_order_set)}")
print(f"Unique Scan Methods: {len(scan_mtd_set)}")
print(f"Unique Physical Plan: {len(plan_set)}")
# assert len(join_order_set) == len(scan_mtd_set) and len(scan_mtd_set) == len(plan_set)
# print("\n".join(join_order_set))
return cost_list, plan_change_at_sel, plan_list
def get_plan_set_by_enum(table_name_id_dict, join_maps, join_info,
db_name, to_execute_,
est_base_card, raw_base_card, f_base_sel,
est_join_card, raw_join_card, f_join_sel,
dims, num_of_samples, err_info_dict, recentered_err, top_k=10):
plan_dict_total = {}
joint_error_sample = []
# Get the samples from the error distribution on each dimension
for table_id in dims:
est_card = est_base_card + est_join_card
raw_card = raw_base_card + raw_join_card
r = find_bin_id_from_err_hist_list(est_card, raw_card, cur_dim=table_id, err_info_dict=err_info_dict)
pdf_of_err = err_info_dict[table_id][2][r]
err_sample = pdf_of_err.sample(num_of_samples)
joint_error_sample.append(err_sample)
# Transfer to joint samples
joint_error_sample = np.array(joint_error_sample).T.tolist()[0]
print(f"Generate {len(joint_error_sample)} plans based on the joint distribution.")
cost_list, plan_change_sel_list, plan_list = enum(dims, table_name_id_dict, join_maps, join_info, est_base_card, raw_base_card, f_base_sel,
est_join_card, raw_join_card, f_join_sel, to_execute_, db_name, joint_error_sample, recentered_err)
plan_dict_total[0] = plan_list
if top_k > 0:
conn = psycopg2.connect(host="/tmp", dbname=db_name, user="hx68")
conn.set_session(autocommit=True)
with conn.cursor() as cursor:
cursor.execute('DISCARD ALL;')
cursor.execute('SET enable_material = off')
# cursor.execute(f'SET top_n = {top_k}')
cursor.execute(to_execute_)
# Read the input from "/winhomes/hx68/imdb"
with open("/winhomes/hx68/imdb/record.txt") as f:
input_string = f.read()
# Build the plan tree and print the serialized plan
preprocessed_string = pre_deal_gather(input_string)
preprocessed_string = pre_build_plan_tree(preprocessed_string)
# print(preprocessed_string)
plan_lists = seperate_top_n_plans(preprocessed_string)
scan_method_set = set()
serialized_plan_set = set()
hint_list = set()
for i in range(len(plan_lists)):
# print(f"The {i}-th")
### scan methods is a list
scan_methods = build_scan_methods(plan_lists[i])
# print(scan_methods)
scan_method_set.add(" ".join(sorted(scan_methods)))
serialized_plan = build_plan_tree(plan_lists[i], 1)
# print(serialized_plan)
serialized_plan_set.add(serialized_plan)
# Transfer to hint
hint_ = gen_final_hint(scan_mtd=scan_methods, str=serialized_plan)
hint_list.add(hint_)
# add to plan_dict_total, key = -1
plan_dict_total[-1] = list(hint_list)
print(f"--- Top-k plans: unique join order: {len(serialized_plan_set)}")
print(f"--- Top-k plans: unique scan method: {len(scan_method_set)}")
conn.close()
return plan_dict_total