-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrequest_handler.py
99 lines (75 loc) · 2.85 KB
/
request_handler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
import api_session
import google.generativeai
import threading
import ui
import time
from constants import * ## Global constants
class RequestHandler:
def __init__(self, cli_lock, parent):
self.parent = parent
self.cli_lock = cli_lock
self.requests_lock = threading.Lock()
self.sessions = []
self.requests = []
def stop_threads(self): # Not in use yet
with self.requests_lock:
for request in self.requests:
request.stop()
self.requests.remove(request)
for session in self.sessions:
session.client.stop()
def submit_requests(self, query):
for session in self.sessions:
session.client.query = query
# Create Request() and append to list of active requests
request = Request(session, self)
with self.requests_lock:
self.requests.append(request)
thread = threading.Thread(target=request.main, daemon=True) # Temporarily set daemon to false
request.thread = thread
request.thread.start()
def monitor_requests(self):
start_time = time.time()
while True:
with self.requests_lock:
if not self.requests:
return
time.sleep(0.2)
if (time.time() - start_time) > TIMEOUT:
self.stop_threads()
with self.cli_lock:
ui.c_out("Requests timed out.", isolate=True)
return
class Request:
def __init__(self, session, parent):
self._stop_event = threading.Event()
self.session = session
self.parent = parent
def stop(self):
self._stop_event.set()
def stopped(self):
return self._stop_event.is_set()
def remove_from_requests(self):
with self.parent.requests_lock:
try:
self.parent.requests.remove(self)
except ValueError:
pass
def main(self):
try:
successful_request = self.session.client.send_request()
except Exception as e:
successful_request = False
if not successful_request:
if not self.stopped():
with self.parent.cli_lock:
ui.c_out(f"Client: {self.session.client.name}", bottom_margin=True)
ui.c_out("Failed to receive response.", separator=True)
self.remove_from_requests()
return
self.session.client.format_response()
if not self.stopped():
with self.parent.cli_lock:
ui.c_out(f"Client: {self.session.client.name}", bottom_margin=True)
ui.c_out(self.session.client.response, separator=True)
self.remove_from_requests()