From e2bd934195aaa1c0ea13e16b0b66c12467fc88f2 Mon Sep 17 00:00:00 2001 From: Dylan Knutson Date: Thu, 26 Sep 2024 18:12:34 -0700 Subject: [PATCH] Refactor run_fixture tool, clean up ops --- fixture_tests/fixtures/alarms.nc | 3 +- fixture_tests/fixtures/flow_control_alarm.nc | 2 + fixture_tests/fixtures/idle_status.nc | 3 +- fixture_tests/run_fixture | 265 ++----------------- fixture_tests/tool/__init__.py | 0 fixture_tests/tool/controller.py | 51 ++++ fixture_tests/tool/op_entries.py | 232 ++++++++++++++++ fixture_tests/tool/utils.py | 75 ++++++ 8 files changed, 387 insertions(+), 244 deletions(-) create mode 100644 fixture_tests/tool/__init__.py create mode 100644 fixture_tests/tool/controller.py create mode 100644 fixture_tests/tool/op_entries.py create mode 100644 fixture_tests/tool/utils.py diff --git a/fixture_tests/fixtures/alarms.nc b/fixture_tests/fixtures/alarms.nc index c0ecea13a..ba5a30a41 100644 --- a/fixture_tests/fixtures/alarms.nc +++ b/fixture_tests/fixtures/alarms.nc @@ -4,8 +4,7 @@ -> $Alarm/Send=10 <- ok <- [MSG:INFO: ALARM: Spindle Control] -# end in an unlocked state so other fixtures can run --> $X <- ALARM:10 +-> $X <- [MSG:INFO: Caution: Unlocked] <- ok diff --git a/fixture_tests/fixtures/flow_control_alarm.nc b/fixture_tests/fixtures/flow_control_alarm.nc index 86ecd17ec..e91aaace9 100644 --- a/fixture_tests/fixtures/flow_control_alarm.nc +++ b/fixture_tests/fixtures/flow_control_alarm.nc @@ -10,3 +10,5 @@ <- ok <- ok <- [MSG:INFO: PRINT, success] +<- ok +<- ok diff --git a/fixture_tests/fixtures/idle_status.nc b/fixture_tests/fixtures/idle_status.nc index 2d8021400..5d21e1f49 100644 --- a/fixture_tests/fixtures/idle_status.nc +++ b/fixture_tests/fixtures/idle_status.nc @@ -2,6 +2,7 @@ <~ [MSG:INFO: Caution: Unlocked] <- ok -> ?? +<- +-> ?? <| <| -<| diff --git a/fixture_tests/run_fixture b/fixture_tests/run_fixture index 7bffc8757..0398d748a 100755 --- a/fixture_tests/run_fixture +++ b/fixture_tests/run_fixture @@ -5,9 +5,8 @@ from termcolor import colored import argparse import os import serial -from xmodem import XMODEM -import re -import fnmatch +from tool import op_entries +from tool.controller import Controller parser = argparse.ArgumentParser() parser.add_argument("device") @@ -15,264 +14,48 @@ parser.add_argument("fixture_file") parser.add_argument("-b", "--baudrate", type=int, default=115200) args = parser.parse_args() -OPS = [ - # send command to controller - "->", - # send file to controller - "=>", - # expect from controller - "<-", - # expect from controller, but optional - "<~", - # consume lines until line is found - "<...", - # expect one of - "<|", -] - -fixture_files = [] +fixture_paths = [] # check if fixture_file is a directory if os.path.isdir(args.fixture_file): for file in os.listdir(args.fixture_file): if file.endswith(".nc"): - fixture_files.append(os.path.join(args.fixture_file, file)) + fixture_paths.append(os.path.join(args.fixture_file, file)) else: - fixture_files.append(args.fixture_file) - - -class OpEntry: - def __init__(self, op, data, lineno): - self.op = op - self.data = data - self.lineno = lineno - self.glob_match = False - - def __str__(self): - return f"OpEntry({self.op}, {str(self.data)}, {self.lineno})" - - def __repr__(self): - return str(self) - + fixture_paths.append(args.fixture_file) -def parse_fixture_lines(fixture_file): - # op_entries is a list of tuples: - # (op, match, lineno) - - # Read the fixture file - with open(fixture_file, "r") as f: - op_entries = [] - for lineno, line in enumerate(f.read().splitlines()): - if line.startswith("#"): - # skip comment lines - continue - - for op in OPS: - if line.startswith(op + " "): - line = line[len(op) + 1 :] - if line.startswith("* "): - line = line[2:] - glob_match = True - else: - glob_match = False - - if op == "<|": - if len(op_entries) > 0 and op_entries[-1].op == "<|": - # append to previous group of matches - op_entries[-1].data.append(line) - else: - # new group of matches - op_entry = OpEntry(op, [line], lineno + 1) - op_entries.append(op_entry) - elif op == "=>": - # make the local path relative to the fixture file - line = line.split(" ") - local_file = line[0] - remote_file = line[1] - local_file = os.path.join( - os.path.dirname(fixture_file), local_file - ) - if not os.path.exists(local_file): - raise ValueError( - f"Fixture {fixture_file} references file that does not exist: {local_file}" - ) - if not remote_file.startswith("/"): - raise ValueError( - f"Remote file path must be absolute: {remote_file}" - ) - op_entries.append( - OpEntry(op, (local_file, remote_file), lineno + 1) - ) - - # expect a message that the file was received - op_entries.append( - OpEntry("<-", "[MSG:Files changed]", lineno + 1) - ) - - else: - op_entry = OpEntry(op, line, lineno + 1) - op_entry.glob_match = glob_match - op_entries.append(op_entry) - break - else: - raise ValueError( - f"Invalid line {lineno} in fixture file {fixture_file}: {line}" - ) - return op_entries - - -def run_fixture(fixture_file): - fixture_lines = parse_fixture_lines(fixture_file) - controller = serial.Serial(args.device, args.baudrate, timeout=1) - - # last line read from the controller - line = None - - def ensure_line(): - nonlocal line - if line is None: - line = controller.readline().decode("utf-8").strip() +def run_fixture(fixture_path, controller): + op_entries_parsed = op_entries.parse_file(fixture_path) try: - for op_entry in fixture_lines: - op = op_entry.op - op_data = op_entry.data - lineno = op_entry.lineno - if op == "->": - # send the fixture line to the controller + for op_entry in op_entries_parsed: + if not op_entry.execute(controller): print( - colored(f"{op} ", "dark_grey") - + colored(op_data, "green", attrs=["dark"]) + colored(f"--- Fixture ", "red") + + colored(fixture_path, "red", attrs=["bold"]) + + colored(" failed ---", "red") ) - controller.write(op_data.encode("utf-8") + b"\n") - elif op == "<-" or op == "<~" or op == "<|": - is_optional = op == "<~" - ensure_line() - if op == "<|": # match any one of - if line in op_data: - print( - colored(f"{op} ", "dark_grey") - + colored(line, "green", attrs=["dark", "bold"]) - ) - line = None - else: - print(f"Test failed at line {colored(str(lineno), 'red')}") - print(f"Expected one of:") - for fline in op_data: - print(f" `{colored(fline, 'red')}'") - print(f"Actual: `{colored(line, 'red')}'") - exit(1) - elif line == op_data: # exact match - print( - colored(f"{op} ", "dark_grey") - + colored(line, "green", attrs=["dark", "bold"]) - ) - line = None - else: # match failed - if is_optional: # but that's okay if it's an optional line - print( - colored(f"{op} Did not get optional line ", "dark_grey") - + colored(op_data, "dark_grey", attrs=["bold"]) - ) - # do not clear line, so we can try to match it again on - # the next op - else: - print(f"Test failed at line {colored(str(lineno), 'red')}") - print(f"Expected: `{colored(op_data, 'red')}'") - print(f"Actual: `{colored(line, 'red')}'") - exit(1) - elif op == "=>": - local_file, remote_file = op_data - with open(local_file, "rb") as file_stream: - - def getc(size, timeout=1): - return controller.read(size) or None - - def putc(data, timeout=1): - return controller.write(data) or None - - print(f"Sending {local_file} to {remote_file}") - controller.write(f"$XModem/Receive={remote_file}\n".encode("utf-8")) - while True: - # wait for the 'C' character to start the transfer - controller.timeout = 2 - c = controller.read(1) - if c == b"C": - break - if c == b"": - raise TimeoutError( - f"XModem start timeout at line {lineno} in fixture file {fixture_file}" - ) - controller.timeout = 1 - xmodem = XMODEM(getc, putc) - xmodem.send(file_stream) - rx_ack_line = controller.readline().decode("utf-8").strip() - print( - colored(f"{op} ", "dark_grey") - + colored(rx_ack_line, "green", attrs=["dark", "bold"]) - ) - matcher = re.match( - r"\[MSG:INFO: Received (\d+) bytes to file ([\w\/\.]+)\]", - rx_ack_line, - ) - if matcher is None: - raise ValueError( - f"Transfer failed (ack line): {rx_ack_line} at line {lineno} in fixture file {fixture_file}" - ) - num_tx_bytes = int(matcher.group(1)) - name_tx_file = matcher.group(2) - if name_tx_file != remote_file: - print(f"Expected: {remote_file}") - print(f"Actual: {name_tx_file}") - raise ValueError( - f"Transfer failed (filename mismatch): {rx_ack_line} at line {lineno} in fixture file {fixture_file}" - ) - print( - colored(f"{op} ", "dark_grey") - + colored(local_file, "green", attrs=["bold"]) - + colored(" => ", "dark_grey") - + colored(remote_file, "green", attrs=["bold"]) - + colored(f" ({num_tx_bytes} bytes)", "green") - ) - elif op == "<...": - while True: - ensure_line() - print( - colored( - f"{op} " + ("(*) " if op_entry.glob_match else ""), - "dark_grey", - ) - + colored(line, "green", attrs=["dark", "bold"]) - ) - - matched = False - if op_entry.glob_match: - matched = fnmatch.fnmatch(line, op_data) - else: - matched = line == op_data - line = None - - if matched: - break - - else: - raise ValueError(f"Invalid operation {op}") + exit(1) except KeyboardInterrupt: print("Interrupt") + except TimeoutError as e: print("Timeout waiting for response, line: " + e.args[0]) - finally: - controller.close() print( colored(f"--- Fixture ", "green") - + colored(fixture_file, "green", attrs=["bold"]) - + colored(" passed ---", "green") + + colored(fixture_path, "green", attrs=["bold"]) + + colored(" passed ---", "green"), + end="\n\n", ) - print() -for fixture_file in fixture_files: - run_fixture(fixture_file) +if __name__ == "__main__": + controller = Controller(args.device, args.baudrate, timeout=1) + for fixture_path in fixture_paths: + controller.send_soft_reset() + run_fixture(fixture_path, controller) + # clear the buffer so rest of the fixtures can run + controller.drain() diff --git a/fixture_tests/tool/__init__.py b/fixture_tests/tool/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/fixture_tests/tool/controller.py b/fixture_tests/tool/controller.py new file mode 100644 index 000000000..d443617a3 --- /dev/null +++ b/fixture_tests/tool/controller.py @@ -0,0 +1,51 @@ +import serial +from termcolor import colored + + +class Controller: + def __init__(self, device, baudrate, timeout): + self._debug = False + self._serial = serial.Serial(device, baudrate, timeout=timeout) + self._current_line = None + + def send_soft_reset(self): + self._serial.write(b"\x18") + self._serial.flush() + self.clear_line() + # wait for startup message + while not self.current_line().startswith("Grbl 3.8"): + self.clear_line() + self.clear_line() + + def current_line(self): + if self._current_line is None: + self._current_line = self._serial.readline().decode("utf-8").strip() + # print(colored("[c] <- " + self._current_line, "light_blue")) + return self._current_line + + def clear_line(self): + self._current_line = None + + def next_line(self): + self.clear_line() + return self.current_line() + + def send_line(self, line): + # print(colored("[c] -> " + line, "light_blue")) + self._serial.write(line.encode("utf-8") + b"\n") + + def getc(self, size): + return self._serial.read(size) or None + + def putc(self, data): + return self._serial.write(data) or None + + def drain(self, wait_for=0.1): + timeout = self._serial.timeout + self._serial.timeout = wait_for + while self._serial.read(1): + pass + self._serial.timeout = timeout + + def close(self): + self._serial.close() diff --git a/fixture_tests/tool/op_entries.py b/fixture_tests/tool/op_entries.py new file mode 100644 index 000000000..c453612dd --- /dev/null +++ b/fixture_tests/tool/op_entries.py @@ -0,0 +1,232 @@ +import re +from xmodem import XMODEM +import os +from tool.utils import remote_file_sha256, file_stream_sha256, color +import fnmatch + + +def parse_file(fixture_path): + with open(fixture_path, "r") as f: + op_entries = [] + for lineno, line in enumerate(f.read().splitlines()): + if line.startswith("#"): + # skip comment lines + continue + + op = line.split(" ")[0] + if op in OPS_MAP: + ctor = OPS_MAP[op] + line = line[len(op) + 1 :] + if ( + len(op_entries) > 0 + and op_entries[-1].__class__.data_is_multi_line + and op_entries[-1].op == op + ): + # append to previous group of matches + op_entries[-1].data.append(line) + else: + op_entry = ctor(op, line, lineno + 1, fixture_path) + op_entries.append(op_entry) + else: + raise ValueError( + f"Invalid op '{op}' at line {lineno} in fixture file {fixture_path}: {line}" + ) + + return op_entries + + +class OpEntry: + # When parsing the fixtures file, this is set to True if the data for this type of + # operation spans multiple lines + data_is_multi_line = False + + def __init__(self, op, data, lineno, fixture_path): + self.op = op + self.data = data + self.lineno = lineno + self.fixture_path = fixture_path + + def __str__(self): + return f"{self.__class__.__name__}({self.op}, {str(self.data)}, {self.lineno})" + + def __repr__(self): + return str(self) + + def execute(self, controller): + raise NotImplementedError + + def _op_str(self): + return color.dark_grey(self.op) + " " + + +class SendLineOpEntry(OpEntry): + def execute(self, controller): + print(self._op_str() + color.sent_line(self.data)) + controller.send_line(self.data) + return True + + +class StringMatchOpEntry(OpEntry): + def __init__(self, op, data, lineno, fixture_path): + super().__init__(op, data, lineno, fixture_path) + self.optional = op == "<~" + + def execute(self, controller): + # optional unconditionally matches all lines + line = controller.current_line() + matches = self.data == line + if matches: + print(self._op_str() + color.received_line(line)) + controller.clear_line() + elif self.optional and not matches: + print(self._op_str() + color.dark_grey(self.data, dark=True, bold=True)) + else: + print(color.error("Expected: ") + self.data) + print(color.error("Actual : ") + line) + return False + return True + + +class AnyStringMatchOpEntry(OpEntry): + data_is_multi_line = True + + def __init__(self, op, data, lineno, fixture_path): + super().__init__(op, [data], lineno, fixture_path) + + def execute(self, controller): + line = controller.current_line() + if line not in self.data: + print(color.error("Expected one of: ")) + for fline in self.data: + print(f" - `{color.error(fline)}'") + print(color.error("Actual: ") + line) + return False + else: + print(self._op_str() + color.received_line(line)) + controller.clear_line() + return True + + +class UntilStringMatchOpEntry(OpEntry): + def __init__(self, op, data, lineno, fixture_path): + self.glob_match = data.startswith("* ") + super().__init__(op, data.removeprefix("* "), lineno, fixture_path) + + def execute(self, controller): + while True: + matches = self._line_matches(controller) + print( + self._op_str() + + color.green(controller.current_line(), dark=True, bold=matches) + ) + controller.clear_line() + if matches: + break + return True + + def _line_matches(self, controller): + if self.glob_match: + return fnmatch.fnmatch(controller.current_line(), self.data) + else: + return self.data == controller.current_line() + + +class SendFileOpEntry(OpEntry): + def __init__(self, op, data, lineno, fixture_path): + super().__init__(op, data, lineno, fixture_path) + data = data.split(" ") + # make local file path relative to the fixture file + self.local_file_path = os.path.normpath( + os.path.join(os.path.dirname(fixture_path), data[0]) + ) + self.remote_file_path = data[1] + + # validate the local path exists + if not os.path.exists(self.local_file_path): + raise ValueError( + f"Local file '{self.local_file_path}' does not exist at line {lineno} in fixture file {fixture_path}" + ) + + def execute(self, controller): + with open(self.local_file_path, "rb") as file_stream: + remote_sha256 = remote_file_sha256(controller, self.remote_file_path) + local_sha256 = file_stream_sha256(file_stream) + + print( + self._op_str() + + color.green(self.local_file_path) + + color.dark_grey(" => ") + + color.green(self.remote_file_path), + end=" ", + ) + if remote_sha256 == local_sha256: + print(color.dark_grey(f"(up-to-date, hash={local_sha256[:8]})")) + return True + elif remote_sha256 is None: + print(color.green("(file does not exist)")) + else: + print( + color.green( + f"(file changed, " + f"local:{local_sha256[:8]} != remote:{remote_sha256[:8]}f" + f")", + ) + ) + + controller.send_line(f"$XModem/Receive={remote_file}") + while True: + # wait for the 'C' character to start the transfer + controller.timeout = 2 + c = controller.read(1) + if c == b"C": + break + if c == b"": + raise TimeoutError( + f"XModem start timeout at line {lineno} in fixture file {fixture_file}" + ) + controller.timeout = 1 + xmodem = XMODEM(controller.getc, controller.putc) + xmodem.send(file_stream) + rx_ack_line = controller.next_line() + controller.clear_line() + print(self._op_str() + color.received_line(rx_ack_line)) + matcher = re.match( + r"\[MSG:INFO: Received (\d+) bytes to file ([\w\/\.]+)\]", + rx_ack_line, + ) + if matcher is None: + raise ValueError( + f"Transfer failed (ack line): {rx_ack_line} at line {lineno} in fixture file {fixture_file}" + ) + num_tx_bytes = int(matcher.group(1)) + name_tx_file = matcher.group(2) + if name_tx_file != remote_file: + print(f"Expected: {remote_file}") + print(f"Actual: {name_tx_file}") + raise ValueError( + f"Transfer failed (filename mismatch): {rx_ack_line} at line {lineno} in fixture file {fixture_file}" + ) + print( + self._op_str() + + color.green(local_file, bold=True) + + color.dark_grey(" => ") + + color.green(remote_file, bold=True) + + color.green(f" ({num_tx_bytes} bytes)") + ) + return True + + +OPS_MAP = { + # send command to controller + "->": SendLineOpEntry, + # send file to controller + "=>": SendFileOpEntry, + # expect from controller + "<-": StringMatchOpEntry, + # expect from controller, but optional + "<~": StringMatchOpEntry, + # consume lines until line is found + "<...": UntilStringMatchOpEntry, + # expect one of + "<|": AnyStringMatchOpEntry, +} diff --git a/fixture_tests/tool/utils.py b/fixture_tests/tool/utils.py new file mode 100644 index 000000000..949009552 --- /dev/null +++ b/fixture_tests/tool/utils.py @@ -0,0 +1,75 @@ +import hashlib +import re +import json +from termcolor import colored + + +def remote_file_sha256(controller, remote_path): + remote_path = remote_path.removeprefix("/littlefs") + controller.send_line(f"$File/ShowHash={remote_path}") + json_response = "" + while True: + line = controller.next_line() + if line == "": + raise TimeoutError("Timeout waiting for controller") + if line == "ok": + break + if line.startswith("error:"): + raise ValueError(f"Error from controller: {line}") + matcher = re.match(r"\[JSON:(.+)\]", line) + if matcher is None: + raise ValueError(f"Invalid response from controller: {line}") + json_response += matcher.group(1) + + json_response = json.loads(json_response) + signature = json_response["signature"] + if signature["algorithm"] != "SHA2-256": + raise ValueError(f"Unsupported algorithm: {signature['algorithm']}") + + if signature["value"] != "": + return signature["value"].lower() + else: + return None + + +def file_stream_sha256(file_stream): + file_stream.seek(0) + sha256 = hashlib.sha256() + while True: + data = file_stream.read(4096) + if not data: + break + sha256.update(data) + file_stream.seek(0) + return sha256.hexdigest().lower() + + +class ColorHelper: + def green(self, s, **kwargs): + return self._impl(s, "green", **kwargs) + + def red(self, s, **kwargs): + return self._impl(s, "red", **kwargs) + + def dark_grey(self, s, **kwargs): + return self._impl(s, "dark_grey", **kwargs) + + def received_line(self, s): + return self.green(s) + + def sent_line(self, s): + return self.green(s, dark=True) + + def error(self, s): + return self.red(s) + + def _impl(self, s, color, dark=False, bold=False): + attrs = [] + if dark: + attrs.append("dark") + if bold: + attrs.append("bold") + return colored(s, color, attrs=attrs) + + +color = ColorHelper()