-
Notifications
You must be signed in to change notification settings - Fork 0
/
python-host
313 lines (259 loc) · 11.1 KB
/
python-host
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
#!/usr/bin/env python
import struct
import sys
import threading
import Queue
import os.path
import json
from time import gmtime, strftime
import time
from os.path import expanduser
usbDriveLetter = "E:"
isLogging = False
isTesting = False
home = expanduser("~")
logPath = home + '\pythonLog.txt'
# A quick way to attempt using another drive
# Returns true if another drive can be attempted
def try_another_drive():
global usbDriveLetter
if not os.path.exists(usbDriveLetter):
# List of drive letters. Note the absence of A, B and C
dl = "DEFGHIJKLMNOPQRSTUVWXYZ"
drives = ['%s:' % d for d in dl if os.path.exists('%s:' % d)]
# If we can find an alternative drive, use it and return true
if drives:
usbDriveLetter = drives[0]
return True
# If we can't find another drive...
return False
# Check for 'test' command line arg
for arg in sys.argv:
if arg == "test":
isLogging = True
isTesting = True
# On Windows, the default I/O mode is O_TEXT. Set this to O_BINARY
# to avoid unwanted modifications of the input/output streams.
if sys.platform == "win32":
import os
import msvcrt
msvcrt.setmode(sys.stdin.fileno(), os.O_BINARY)
msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)
# Thread that reads messages from the webapp.
def read_thread_func(queue):
while 1:
# Read the message length (first 4 bytes).
text_length_bytes = sys.stdin.read(4)
if len(text_length_bytes) == 0:
if queue:
queue.put(None)
sys.exit(0)
# Unpack message length as 4 byte integer.
text_length = struct.unpack('i', text_length_bytes)[0]
# Read the text (JSON object) of the message.
text = sys.stdin.read(text_length).decode('utf-8')
if queue:
queue.put(text)
# Helper to send response to extension
def send_message(message):
# Write message size.
sys.stdout.write(struct.pack('I', len(message)))
# Write the message itself.
sys.stdout.write(message)
sys.stdout.flush()
class NativeMessenger():
def __init__(self, queue):
self.queue = queue
# Main loop
while 1:
time.sleep(0.1)
self.processMessages()
# Break from loop if testing. This way we can work with individual
# messages in the NativeMessenger object
if isTesting is True:
return
def processMessages(self):
while not self.queue.empty():
message = self.queue.get_nowait()
if message is None:
self.quit()
return
self.log("DBG: Got message!")
if isTesting is False:
self.parseAndAct(message)
# TODO: find a better way to get device paths
def parseAndAct(self, message):
# Turn message string to json object
jMessage = json.loads(message)
# Extract values from message
messageType = jMessage["message_type"]
userToken = jMessage["message_body"]["userToken"]
preferences = jMessage["message_body"]["preferences"]
if isTesting is True:
return(json.dumps(messageType), json.dumps(userToken), json.dumps(preferences))
# Follow instructions in message, get isSuccessful boolean
if messageType == "write_usb":
if isTesting is False:
self.writeFileAndRespond(preferences, userToken, message)
# TODO: Implement error codes
def sendResponse(self, isSuccessful, errorMessage, message):
self.log("DBG: in sendResponse")
responseSkeleton = json.dumps({"is_successful": "",
"original_message": "",
"error": {"code": "", "message": ""}
})
response = json.loads(responseSkeleton)
response["original_message"] = json.loads(message)
response["error"]["code"] = '00000'
response["error"]["message"] = errorMessage
if isSuccessful is False:
response["is_successful"] = "false"
if isSuccessful is True:
response["is_successful"] = "true"
self.log("DBG: MESSAGE TO RESPOND WITH:")
self.log(json.dumps(response))
if isTesting is True:
return json.dumps(response)
else:
send_message(json.dumps(response))
def writeFileAndRespond(self, preferences, userToken, message):
self.log("DBG: Attempting to write to " + usbDriveLetter)
try:
self.log("DBG: Writing preferences...")
prefsFile = open(usbDriveLetter + '\.first-discovery-preferences.txt', 'w+')
prefsFile.write(json.dumps(preferences))
prefsFile.close()
self.log("DBG: Preferences written")
self.log("DBG: Writing token...")
tokenFile = open(usbDriveLetter + '\.gpii-user-token.txt', 'w+')
tokenFile.write(json.dumps(userToken).strip('"'))
tokenFile.close()
self.log("DBG: Token written")
if isTesting is False:
self.sendResponse(True, "no error", message)
# If the USB path is invalid, attempt to use another drive
except IOError as e:
self.log("ERROR: Error writing to file. Do you have access?")
hasAltDrive = try_another_drive()
if hasAltDrive is True:
self.writeFileAndRespond(preferences, userToken, message)
else:
self.log(str(e))
self.sendResponse(False, "FileIO Error: " + str(e), message)
# Set isLogging to True to enable
def log(self, message):
# if isLogging == True:
if isLogging is True:
with open(logPath, 'a+') as logFile:
logFile.write(strftime(
"%Y-%m-%d %H:%M:%S", gmtime()) + " " + message + "\n")
def tests():
testSuccess = True
print("\nTESTING: Running tests...\n")
print("TESTING: Checking if path to log life exists and is writable")
if os.path.exists(home):
print("SUCCESS: log path is good!")
else:
print("FAILURE: log path isn't good")
testSuccess = False
return
testMessage = json.dumps(
{
"message_type": "write_usb",
"message_body": {
"userToken": "unitTest",
"preferences": {
"contexts": {
"gpii-default": {
"name": "unitTest",
"preferences": {
"gpii_firstDiscovery_language": "en-US",
"gpii_firstDiscovery_speak": "true",
"gpii_firstDiscovery_speechRate": "1.5",
"fluid_prefs_contrast": "bw",
"fluid_prefs_textSize": "1.4",
"gpii_firstDiscovery_onScreenKeyboard": "false",
"gpii_firstDiscovery_captions": "false",
"gpii_firstDiscovery_showSounds": "false",
"gpii_firstDiscovery_stickyKeys": "true"
}
}
}
}
}
}
)
testQueue = Queue.Queue()
messenger = NativeMessenger(testQueue)
# ===== PARSING TESTING ===== #
tempPrefs = json.loads(testMessage)
# Extract preferences from testMessage
goodPrefs = json.dumps(tempPrefs["message_body"]["preferences"])
messageParts = messenger.parseAndAct(testMessage)
messageType = messageParts[0].strip('""')
userToken = messageParts[1].strip('""')
preferences = messageParts[2]
# Check if the parsed message parts have the same vaules as testMessage
if messageType == "write_usb" and userToken == "unitTest" and preferences == goodPrefs:
print("SUCCESS: Parsing was successful")
else:
print("FAILURE: Parsing was not successful!")
testSuccess = False
# ===== RESPONSE TESTING ===== #
response = json.loads(messenger.sendResponse(True, "no error", testMessage))
testMessage = json.loads(testMessage)
if response["original_message"]["message_body"] != testMessage["message_body"]:
print("FAILURE: RESPONSE MESSAGE INCORRECT")
testSuccess = False
if response["error"]["code"] != '00000':
print("FAILURE: RESPONSE MESSAGE INCORRECT")
testSuccess = False
if response["error"]["message"] != 'no error':
print("FAILURE: RESPONSE MESSAGE INCORRECT")
testSuccess = False
else:
print("SUCCESS: Response message was created successfully")
# ===== TOKEN WRITING TESTING ===== #
goodPrefsFile = '{"contexts": {"gpii-default": {"name": "unitTest", "preferences": {"gpii_firstDiscovery_captions": "false", "fluid_prefs_textSize": "1.4", "gpii_firstDiscovery_stickyKeys": "true", "gpii_firstDiscovery_showSounds": "false", "fluid_prefs_contrast": "bw", "gpii_firstDiscovery_speak": "true", "gpii_firstDiscovery_language": "en-US", "gpii_firstDiscovery_onScreenKeyboard": "false", "gpii_firstDiscovery_speechRate": "1.5"}}}}'
# Check if paths are available
if not os.path.exists(usbDriveLetter):
print("FAILURE:"
" USB Drive wasn't found! The host will now attempt to use "
"\n\t another drive. Check if the the token and preferences "
"\n\t were written to your USB drive. The next test will check "
"\n\t if the files were written.")
testSuccess = False
if os.path.exists(usbDriveLetter + '\.first-discovery-preferences.txt'):
print("INFO: preferences file exists. Deleting it...")
os.remove(usbDriveLetter + '\.first-discovery-preferences.txt')
if os.path.exists(usbDriveLetter + '\.gpii-user-token.txt'):
print("INFO: token file exists. Deleting it...")
os.remove(usbDriveLetter + '\.gpii-user-token.txt')
tempPrefs = testMessage["message_body"]["preferences"]
messenger.writeFileAndRespond(tempPrefs, "unitTest", json.dumps(testMessage))
if os.path.exists(usbDriveLetter):
prefsFile = open(usbDriveLetter + '\.first-discovery-preferences.txt', 'r').read()
tokenFile = open(usbDriveLetter + '\.gpii-user-token.txt').read()
# Compare values we want with values we get
if prefsFile != goodPrefsFile or tokenFile != 'unitTest':
print("FAILURE: Files not correctly written")
testSuccess = False
else:
print("SUCCESS: Files properly written to " + usbDriveLetter)
if testSuccess is False:
print("FAILURE: UNIT TESTS FAILED")
if testSuccess is True:
print("SUCCESS: ALL TESTS RUN SUCCESSFULLY!")
else:
print("FAILURE: No USB device available")
def Main():
if isTesting is True:
tests()
queue = Queue.Queue()
thread = threading.Thread(target=read_thread_func, args=(queue,))
thread.daemon = True
thread.start()
NativeMessenger(queue)
sys.exit(0)
if __name__ == '__main__':
Main()