-
Notifications
You must be signed in to change notification settings - Fork 1
/
run_python_test.py
210 lines (174 loc) · 9.2 KB
/
run_python_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
import datetime
import io
import logging
import os
import os.path
import queue
import re
import shlex
import signal
import subprocess
import sys
import threading
import time
import typing
import click
import coloredlogs
from colorama import Fore, Style
from metadata import MetadataReader, Metadata
DEFAULT_CHIP_ROOT = os.path.abspath(
os.path.join(os.path.dirname(__file__), '..', '..'))
MATTER_DEVELOPMENT_PAA_ROOT_CERTS = "credentials/development/paa-root-certs"
def EnqueueLogOutput(fp, tag, output_stream, q):
for line in iter(fp.readline, b''):
timestamp = time.time()
if len(line) > len('[1646290606.901990]') and line[0:1] == b'[':
try:
timestamp = float(line[1:18].decode())
line = line[19:]
except Exception:
pass
output_stream.write(
(f"[{datetime.datetime.fromtimestamp(timestamp).isoformat(sep=' ')}]").encode() + tag + line)
sys.stdout.flush()
fp.close()
def RedirectQueueThread(fp, tag, stream_output, queue) -> threading.Thread:
log_queue_thread = threading.Thread(target=EnqueueLogOutput, args=(
fp, tag, stream_output, queue))
log_queue_thread.start()
return log_queue_thread
def DumpProgramOutputToQueue(thread_list: typing.List[threading.Thread], tag: str, process: subprocess.Popen, stream_output, queue: queue.Queue):
thread_list.append(RedirectQueueThread(process.stdout,
(f"[{tag}][{Fore.YELLOW}STDOUT{Style.RESET_ALL}]").encode(), stream_output, queue))
thread_list.append(RedirectQueueThread(process.stderr,
(f"[{tag}][{Fore.RED}STDERR{Style.RESET_ALL}]").encode(), stream_output, queue))
@click.command()
@click.option("--app", type=click.Path(exists=True), default=None,
help='Path to local application to use, omit to use external apps.')
@click.option("--factoryreset", is_flag=True,
help='Remove app config and repl configs (/tmp/chip* and /tmp/repl*) before running the tests.')
@click.option("--factoryreset-app-only", is_flag=True,
help='Remove app config and repl configs (/tmp/chip* and /tmp/repl*) before running the tests, but not the controller config')
@click.option("--app-args", type=str, default='',
help='The extra arguments passed to the device. Can use placholders like {SCRIPT_BASE_NAME}')
@click.option("--script", type=click.Path(exists=True), default=os.path.join(DEFAULT_CHIP_ROOT,
'src',
'controller',
'python',
'test',
'test_scripts',
'mobile-device-test.py'), help='Test script to use.')
@click.option("--script-args", type=str, default='',
help='Script arguments, can use placeholders like {SCRIPT_BASE_NAME}.')
@click.option("--script-gdb", is_flag=True,
help='Run script through gdb')
@click.option("--quiet", is_flag=True, help="Do not print output from passing tests. Use this flag in CI to keep github log sizes manageable.")
@click.option("--load-from-env", default=None, help="YAML file that contains values for environment variables.")
def main(app: str, factoryreset: bool, factoryreset_app_only: bool, app_args: str, script: str, script_args: str, script_gdb: bool, quiet: bool, load_from_env):
if load_from_env:
reader = MetadataReader(load_from_env)
runs = reader.parse_script(script)
else:
runs = [
Metadata(
py_script_path=script,
run="cmd-run",
app=app,
app_args=app_args,
script_args=script_args,
factoryreset=factoryreset,
factoryreset_app_only=factoryreset_app_only,
script_gdb=script_gdb,
quiet=quiet
)
]
for run in runs:
print(f"Executing run: {run.py_script_path}")
main_impl(run.app, run.factoryreset, run.factoryreset_app_only, run.app_args, run.py_script_path, run.script_args, run.script_gdb, run.quiet)
def main_impl(app: str, factoryreset: bool, factoryreset_app_only: bool, app_args: str, script: str, script_args: str, script_gdb: bool, quiet: bool):
app_args = app_args.replace('{SCRIPT_BASE_NAME}', os.path.splitext(os.path.basename(script))[0])
script_args = script_args.replace('{SCRIPT_BASE_NAME}', os.path.splitext(os.path.basename(script))[0])
if factoryreset or factoryreset_app_only:
# Remove native app config
retcode = subprocess.call("rm -rf /tmp/chip* /tmp/repl*", shell=True)
if retcode != 0:
raise Exception("Failed to remove /tmp/chip* for factory reset.")
print("Contents of test directory: %s" % os.getcwd())
print(subprocess.check_output(["ls -l"], shell=True).decode('utf-8'))
# Remove native app KVS if that was used
kvs_match = re.search(r"--KVS (?P<kvs_path>[^ ]+)", app_args)
if kvs_match:
kvs_path_to_remove = kvs_match.group("kvs_path")
retcode = subprocess.call("rm -f %s" % kvs_path_to_remove, shell=True)
print("Trying to remove KVS path %s" % kvs_path_to_remove)
if retcode != 0:
raise Exception("Failed to remove %s for factory reset." % kvs_path_to_remove)
if factoryreset:
# Remove Python test admin storage if provided
storage_match = re.search(r"--storage-path (?P<storage_path>[^ ]+)", script_args)
if storage_match:
storage_path_to_remove = storage_match.group("storage_path")
retcode = subprocess.call("rm -f %s" % storage_path_to_remove, shell=True)
print("Trying to remove storage path %s" % storage_path_to_remove)
if retcode != 0:
raise Exception("Failed to remove %s for factory reset." % storage_path_to_remove)
coloredlogs.install(level='INFO')
log_queue = queue.Queue()
log_cooking_threads = []
app_process = None
app_pid = 0
stream_output = sys.stdout.buffer
if quiet:
stream_output = io.BytesIO()
if app:
if not os.path.exists(app):
if app is None:
raise FileNotFoundError(f"{app} not found")
app_args = [app] + shlex.split(app_args)
logging.info(f"Execute: {app_args}")
app_process = subprocess.Popen(
app_args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=0)
app_pid = app_process.pid
DumpProgramOutputToQueue(
log_cooking_threads, Fore.GREEN + "APP " + Style.RESET_ALL, app_process, stream_output, log_queue)
script_command = [script, "--paa-trust-store-path", os.path.join(DEFAULT_CHIP_ROOT, MATTER_DEVELOPMENT_PAA_ROOT_CERTS),
'--log-format', '%(message)s', "--app-pid", str(app_pid)] + shlex.split(script_args)
if script_gdb:
#
# When running through Popen, we need to preserve some space-delimited args to GDB as a single logical argument.
# To do that, let's use '|' as a placeholder for the space character so that the initial split will not tokenize them,
# and then replace that with the space char there-after.
#
script_command = ("gdb -batch -return-child-result -q -ex run -ex "
"thread|apply|all|bt --args python3".split() + script_command)
else:
script_command = "/usr/bin/env python3".split() + script_command
final_script_command = [i.replace('|', ' ') for i in script_command]
logging.info(f"Execute: {final_script_command}")
test_script_process = subprocess.Popen(
final_script_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
DumpProgramOutputToQueue(log_cooking_threads, Fore.GREEN + "TEST" + Style.RESET_ALL,
test_script_process, stream_output, log_queue)
test_script_exit_code = test_script_process.wait()
if test_script_exit_code != 0:
logging.error("Test script exited with error %r" % test_script_exit_code)
test_app_exit_code = 0
if app_process:
logging.warning("Stopping app with SIGINT")
app_process.send_signal(signal.SIGINT.value)
test_app_exit_code = app_process.wait()
# There are some logs not cooked, so we wait until we have processed all logs.
# This procedure should be very fast since the related processes are finished.
for thread in log_cooking_threads:
thread.join()
# We expect both app and test script should exit with 0
exit_code = test_script_exit_code if test_script_exit_code != 0 else test_app_exit_code
if quiet:
if exit_code:
sys.stdout.write(stream_output.getvalue().decode('utf-8'))
else:
logging.info("Test completed successfully")
if exit_code != 0:
sys.exit(exit_code)
if __name__ == '__main__':
main(auto_envvar_prefix='CHIP')