-
Notifications
You must be signed in to change notification settings - Fork 2
/
demo.py
167 lines (127 loc) · 5.11 KB
/
demo.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
import time
from time import sleep
from spicepy import Client
import threading
SPICEAI_API_KEY = '3232|2ea664a9be804002af9b212021749439'
###########################
# Spice.ai Platform #
###########################
# use API key from Spice.ai app to instantiate a client
client = Client(SPICEAI_API_KEY)
startTime = time.time()
data = client.query('SELECT * FROM eth.recent_traces trace JOIN eth.recent_transactions trans ON trace.transaction_hash = trans.hash ORDER BY trans.block_number DESC;')
pd = data.read_chunk()
endTime = time.time()
print("Completed in {duration:.3f} seconds\n".format(duration = endTime - startTime))
exit()
########################################
# DO NOT COMMENT OUT THE LINE BELOW #
########################################
client = Client(SPICEAI_API_KEY, 'grpc://127.0.0.1:50051')
###########################
# Spice AI Datasource #
###########################
while True:
startTime = time.time()
data = client.query('SELECT trace.block_number, trace.block_timestamp, trace.transaction_hash FROM eth_recent_traces trace JOIN eth_recent_transactions trans ON trace.transaction_hash = trans.hash ORDER BY trans.block_number DESC;')
pd = data.read_pandas()
endTime = time.time()
print(pd.head(5))
print("Completed in {duration:.3f} seconds\n".format(duration = endTime - startTime))
sleep(1)
###########################
# Dremio Datasource #
###########################
while True:
startTime = time.time()
data = client.query('SELECT * FROM taxi_trips ORDER BY pickup_datetime DESC LIMIT 100')
endTime = time.time()
pd = data.read_pandas()
print(pd.to_string() + "\n")
print("Completed in {duration:.3f} seconds\n".format(duration = endTime - startTime))
startTime = time.time()
data = client.query('SELECT count(*) FROM taxi_trips')
endTime = time.time()
pd = data.read_pandas()
print(pd.to_string() + "\n")
print("Completed in {duration:.3f} seconds\n".format(duration = endTime - startTime))
###########################
# Spice/Dremio Datasource #
###########################
while True:
startTime = time.time()
data = client.query("""
SELECT DISTINCT
eth_recent_blocks.number as block_number,
taxi_trips.trip_distance_mi
FROM eth_recent_blocks
LEFT JOIN taxi_trips
ON eth_recent_blocks.number%100 = taxi_trips.trip_distance_mi*10
ORDER BY eth_recent_blocks.number DESC
LIMIT 10
""")
endTime = time.time()
pd = data.read_pandas()
print(pd.to_string() + "\n")
print("Completed in {duration:.3f} seconds\n".format(duration = endTime - startTime))
sleep(5)
#####################################
# High-RPS Queries Simulation # (OPTIONAL)
#####################################
# yaml file needs to be updated before running this code
def simulate_runtime_duckdb(user_id):
#print("User " + str(user_id) + " started")
# make a new client for each user
client = Client(SPICEAI_API_KEY, 'grpc://127.0.0.1:50051')
start = time.time()
# make a query
data = client.query('SELECT * FROM eth_recent_blocks_duckdb DESC;')
pd = data.read_all()
end = time.time()
total = end - start
print("User " + str(user_id) + ": " + str(total) + " seconds")
def simulate_runtime_arrow_mem(user_id):
#print("User " + str(user_id) + " started")
# make a new client for each user
client = Client(SPICEAI_API_KEY, 'grpc://127.0.0.1:50051')
start = time.time()
# make a query
data = client.query('SELECT * FROM eth_recent_blocks DESC;')
pd = data.read_all()
end = time.time()
total = end - start
print("User " + str(user_id) + ": " + str(total) + " seconds")
def simulate_sdk(user_id):
#print("User " + str(user_id) + " started")
# make a new client for each user
client = Client(SPICEAI_API_KEY)
start = time.time()
# make a query
data = client.query('SELECT * FROM eth.recent_blocks DESC;')
pd = data.read_all()
end = time.time()
total = end - start
print("User " + str(user_id) + ": " + str(total) + " seconds")
# simulate the number of users
def simulate_concurrent_queries(num_users, function_name):
threads = []
start_time = time.time()
for user_id in range(num_users):
t = threading.Thread(target=function_name, args=(user_id,))
threads.append(t)
t.start()
for thread in threads:
thread.join()
end_time = time.time()
total_time = end_time - start_time
print("\n")
return total_time
total_users = 20
time_sdk = simulate_concurrent_queries(total_users, simulate_sdk)
time_runtime_duckdb = simulate_concurrent_queries(total_users, simulate_runtime_duckdb)
time_runtime_arrow_mem = simulate_concurrent_queries(total_users, simulate_runtime_arrow_mem)
print("Simulating " + str(total_users) + " concurrent users making queries..")
print("Total Time for SDK: " + str(time_sdk) + " seconds")
print("Total Time for In Memory Runtime: " + str(time_runtime_arrow_mem) + " seconds")
print("Total Time for DuckDB Runtime: " + str(time_runtime_duckdb) + " seconds")
exit()