forked from bethel-physics/WagonTestGUI
-
Notifications
You must be signed in to change notification settings - Fork 0
/
run_stress_test.py
62 lines (48 loc) · 1.94 KB
/
run_stress_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# Importing necessary modules
from audioop import mul
import multiprocessing as mp
import socket
# Imports the GUIWindow
from PythonFiles.StressTest import StressTest
from PythonFiles.utils.SUBClient import SUBClient
#####################################################################
# #
# Please go to PythonFiles/StressTest.py #
# to set up the settings for the Stress Test. #
# This file is essentially an executable for the #
# stress test script. #
# #
#####################################################################
# Creates a task of creating the GUIWindow
def task_GUI(conn, queue):
# creates the main_window as an instantiation of GUIWindow
stress_test = StressTest(conn, queue)
# Creates a task of creating the SUBClient
def task_SUBClient(conn, queue):
# Creates the SUBSCRIBE Socket Client
sub_client = SUBClient(conn, queue)
def run():
# Creates a Pipe for the SUBClient to talk to the GUI Window
conn_SUB, conn_GUI = mp.Pipe()
queue = mp.Queue()
# Turns creating the GUI and creating the SUBClient tasks into processes
process_test = mp.Process(target = task_GUI, args=(conn_GUI, queue,))
process_SUBClient = mp.Process(target = task_SUBClient, args = (conn_SUB, queue,))
# Starts the processes
process_test.start()
process_SUBClient.start()
# Should hold the code at this line until the GUI process ends
process_test.join()
try:
conn_SUB.close()
conn_GUI.close()
except:
print("Pipe close is unnecessary.")
try:
# Cleans up the SUBClient process
process_SUBClient.terminate()
except:
print("Terminate is unnecessary.")
pass
if __name__ == "__main__":
run()