diff --git a/dpu-tools/Dockerfile b/dpu-tools/Dockerfile index 17969a4..deb5fbf 100644 --- a/dpu-tools/Dockerfile +++ b/dpu-tools/Dockerfile @@ -1,6 +1,6 @@ FROM quay.io/centos/centos:stream9 RUN dnf install -y \ - minicom python39 pciutils lshw && \ + procps-ng openssh-clients minicom python39 python3-pexpect python3-requests pciutils lshw && \ dnf clean all && \ rm -rf /var/cache/* && \ ln -s /usr/bin/pip3.9 /usr/bin/pip && \ diff --git a/dpu-tools/common_ipu.py b/dpu-tools/common_ipu.py new file mode 100644 index 0000000..fca3e17 --- /dev/null +++ b/dpu-tools/common_ipu.py @@ -0,0 +1,205 @@ +import subprocess +import logging +from typing import IO +import requests +import sys +import tarfile +import os +import dataclasses +import threading +import re +import pexpect +from minicom import configure_minicom, pexpect_child_wait, minicom_cmd + + +VERSIONS = ["1.2.0.7550", "1.6.2.9418", "1.8.0.10052"] + + +@dataclasses.dataclass(frozen=True) +class Result: + out: str + err: str + returncode: int + + +def setup_logging(verbose: bool) -> None: + if verbose: + log_level = logging.DEBUG + else: + log_level = logging.INFO + + logging.basicConfig( + level=log_level, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", + handlers=[ + logging.StreamHandler(sys.stdout), # Log to stdout + ], + ) + + +logger = logging.getLogger(__name__) + + +def run(command: str, capture_output: bool = False, dry_run: bool = False) -> Result: + """ + This run command is able to both output to the screen and capture its respective stream into a Result, using multithreading + to avoid the blocking operaton that comes from reading from both pipes and outputing in real time. + """ + if dry_run: + logger.info(f"[DRY RUN] Command: {command}") + return Result("", "", 0) + + logger.debug(f"Executing: {command}") + process = subprocess.Popen( + command, + shell=True, # Lets the shell interpret what it should do with the command which allows us to use its features like being able to pipe commands + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + universal_newlines=True, + ) + + def stream_output(pipe: IO[str], buffer: list[str], stream_type: str) -> None: + for line in iter(pipe.readline, ""): + if stream_type == "stdout": + logger.debug(line.strip()) + else: + logger.debug(line.strip()) + + if capture_output: + buffer.append(line) + pipe.close() + + stdout_lines: list[str] = [] + stderr_lines: list[str] = [] + + # Create threads to handle `stdout` and `stderr` + stdout_thread = threading.Thread( + target=stream_output, + args=(process.stdout, stdout_lines, "stdout"), + ) + stderr_thread = threading.Thread( + target=stream_output, + args=(process.stderr, stderr_lines, "stderr"), + ) + + stdout_thread.start() + stderr_thread.start() + + # Wait for process to complete and for threads to finish so we can capture return its result + process.wait() + stdout_thread.join() + stderr_thread.join() + + # Avoid joining operation if the output isn't captured + if capture_output: + stdout_str = "".join(stdout_lines) + stderr_str = "".join(stderr_lines) + else: + stdout_str = "" + stderr_str = "" + + return Result(stdout_str, stderr_str, process.returncode) + + +def download_file(url: str, dest_dir: str) -> str: + """ + Download a file from the given URL and save it to the destination directory. + """ + local_filename = os.path.join(dest_dir, url.split("/")[-1]) + with requests.get(url, stream=True) as r: + r.raise_for_status() + with open(local_filename, "wb") as f: + for chunk in r.iter_content(chunk_size=8192): + if chunk: # filter out keep-alive chunks + f.write(chunk) + return local_filename + + +def extract_tar_gz(tar_path: str, extract_dir: str) -> list[str]: + """ + Extract a .tar.gz file and return the list of all extracted files. + """ + extracted_files = [] + with tarfile.open(tar_path, "r:gz") as tar: + tar.extractall(path=extract_dir) + extracted_files = [os.path.join(extract_dir, name) for name in tar.getnames()] + return extracted_files + + +def find_image( + extracted_files: list[str], bin_file_prefix: str, identifier: str = "" +) -> str: + """ + Search through extracted files to find the binary file matching the prefix and identifier. + """ + for root, _, files in os.walk(extracted_files[0]): # Traverse directory + for file in files: + if bin_file_prefix in file and identifier in file: + return os.path.join(root, file) + raise FileNotFoundError( + f"{bin_file_prefix} with identifier {identifier} not found in the extracted files." + ) + + +def get_current_version( + imc_address: str, logger: logging.Logger, dry_run: bool = False +) -> Result: + logger.debug("Getting Version via SSH") + version = "" + # Execute the commands over SSH with dry_run handling + result = run( + f"ssh -o 'StrictHostKeyChecking=no' -o 'UserKnownHostsFile=/dev/null' {imc_address} 'cat /etc/issue.net'", + dry_run=dry_run, + capture_output=True, + ) + # Regular expression to match the full version (e.g., 1.8.0.10052) + version_pattern = r"\d+\.\d+\.\d+\.\d+" + + # Search for the pattern in the input string + match = re.search(version_pattern, result.out) + + if match: + version = match.group(0) + return Result(version, result.err, result.returncode) + + +def minicom_get_version(logger: logging.Logger) -> str: + version = "" + run("pkill -9 minicom") + logger.debug("Configuring minicom") + configure_minicom() + logger.debug("spawn minicom") + child = pexpect.spawn(minicom_cmd("imc")) + child.maxread = 10000 + pexpect_child_wait(child, ".*Press CTRL-A Z for help on special keys.*", 120) + logger.debug("Ready to enter command") + child.sendline("cat /etc/issue.net") + + # Wait for the expected response (adjust the timeout as needed) + + try: + pexpect_child_wait(child, ".*IPU IMC MEV-HW-B1-ci-ts.release.*", 120) + except Exception as e: + raise e + + # Capture and print the output + assert child.before is not None + logger.debug(child.before.decode("utf-8")) + logger.debug(child.after.decode("utf-8")) + version_line = child.after.decode("utf-8") + + # Regular expression to match the full version (e.g., 1.8.0.10052) + version_pattern = r"\d+\.\d+\.\d+\.\d+" + + # Search for the pattern in the input string + match = re.search(version_pattern, version_line) + + if match: + version = match.group(0) + + # Gracefully close Picocom (equivalent to pressing Ctrl-A and Ctrl-X) + child.sendcontrol("a") + child.sendline("x") + # Ensure Picocom closes properly + child.expect(pexpect.EOF) + return version diff --git a/dpu-tools/dpu-tools b/dpu-tools/dpu-tools index ca7955a..798a51f 100755 --- a/dpu-tools/dpu-tools +++ b/dpu-tools/dpu-tools @@ -1,39 +1,73 @@ #!/usr/bin/env python3 import argparse -import dataclasses import os import re -import shlex import shutil -import subprocess import tempfile +import sys +import logging +from fwutils import IPUFirmware +from common_ipu import ( + VERSIONS, + get_current_version, + setup_logging, + run, + minicom_get_version, +) -@dataclasses.dataclass(frozen=True) -class Result: - out: str - err: str - returncode: int +logger = logging.getLogger(__name__) -def run(cmd: str, env: dict[str, str] = os.environ.copy()) -> Result: - args = shlex.split(cmd) - res = subprocess.run( - args, - capture_output=True, - env=env, - ) +def reset(args: argparse.Namespace) -> None: + run("ssh root@100.0.0.100 sudo reboot") - return Result( - out=res.stdout.decode("utf-8"), - err=res.stderr.decode("utf-8"), - returncode=res.returncode, + +def firmware_up(args: argparse.Namespace) -> None: + fw = IPUFirmware( + args.imc_address, + args.version, + repo_url=args.repo_url, + dry_run=args.dry_run, + verbose=args.verbose, + ) + fw.reflash_ipu() + + +def firmware_reset(args: argparse.Namespace) -> None: + result = get_current_version(args.imc_address, logger=logger) + if result.returncode: + logger.debug("Failed with ssh, trying minicom!") + try: + minicom_get_version(logger=logger) + except Exception as e: + logger.error(f"Error ssh try: {result.err}") + logger.error(f"Exception with minicom: {e}") + logger.error("Exiting...") + sys.exit(result.returncode) + fw = IPUFirmware( + args.imc_address, + version=result.out, + repo_url=args.repo_url, + dry_run=args.dry_run, + verbose=args.verbose, ) + fw.reflash_ipu() -def reset(args: argparse.Namespace) -> None: - run("ssh root@100.0.0.100 sudo reboot") +def firmware_version(args: argparse.Namespace) -> None: + result = get_current_version(args.imc_address, logger=logger) + if result.returncode: + logger.debug("Failed with ssh, trying minicom!") + try: + minicom_get_version(logger=logger) + except Exception as e: + logger.error(f"Error ssh try: {result.err}") + logger.error(f"Exception with minicom: {e}") + logger.error("Exiting...") + sys.exit(result.returncode) + print(result.out) def console(args: argparse.Namespace) -> None: @@ -92,6 +126,11 @@ def list_dpus(args: argparse.Namespace) -> None: def main() -> None: parser = argparse.ArgumentParser(description="Tools to interact with an IPU") + parser.add_argument( + "--verbose", + action="store_true", + help="Increse Output", + ) subparsers = parser.add_subparsers( title="subcommands", description="Valid subcommands", dest="subcommand" ) @@ -99,6 +138,50 @@ def main() -> None: reset_parser = subparsers.add_parser("reset", help="Reset the IPU") reset_parser.set_defaults(func=reset) + # Firmware command with its own subcommands (reset/up) + firmware_parser = subparsers.add_parser("firmware", help="Control the IPU firmware") + firmware_subparsers = firmware_parser.add_subparsers( + title="firmware commands", + description="Valid firmware subcommands", + dest="firmware_command", + ) + + firmware_parser.add_argument( + "--imc-address", required=True, help="IMC address for the firmware" + ) + firmware_parser.add_argument( + "--repo-url", help="Repo address for the firmware images" + ) + + firmware_parser.add_argument( + "--dry-run", + action="store_true", # This makes it a flag (boolean) + help="Simulate the firmware changes without making actual changes", + ) + # Firmware reset subcommand + firmware_reset_parser = firmware_subparsers.add_parser( + "reset", help="Reset the firmware" + ) + firmware_reset_parser.set_defaults(func=firmware_reset) + + # Firmware up subcommand + firmware_up_parser = firmware_subparsers.add_parser( + "up", help="Update the firmware" + ) + firmware_up_parser.set_defaults(func=firmware_up) + firmware_up_parser.add_argument( + "--version", + choices=VERSIONS, + help="Version for the firmware Up", + ) + + # firmware version subcommand + firmware_version_parser = firmware_subparsers.add_parser( + "version", help="Retrieve firmware version" + ) + firmware_version_parser.set_defaults(func=firmware_version) + + # List commands list_parser = subparsers.add_parser("list", help="list devices") list_parser.set_defaults(func=list_dpus) @@ -109,6 +192,7 @@ def main() -> None: ) args = parser.parse_args() + setup_logging(args.verbose) if hasattr(args, "func"): args.func(args) else: diff --git a/dpu-tools/fwutils.py b/dpu-tools/fwutils.py new file mode 100644 index 0000000..c9a7f4a --- /dev/null +++ b/dpu-tools/fwutils.py @@ -0,0 +1,225 @@ +#!/usr/bin/env python3 +import logging +import sys +import pexpect +from minicom import minicom_cmd, pexpect_child_wait, configure_minicom +from common_ipu import ( + extract_tar_gz, + run, + download_file, + find_image, + get_current_version, + VERSIONS, + minicom_get_version, +) + + +class IPUFirmware: + def __init__( + self, + imc_address: str, + version: str = "", + repo_url: str = "", + steps_to_run: list[str] = [], + dry_run: bool = False, + verbose: bool = False, + ): + self.verbose = verbose + self.logger = logging.getLogger(__name__) + self.imc_address = imc_address + self.dry_run = dry_run + self.version_to_flash = version or VERSIONS[-1] + self.repo_url = repo_url or "wsfd-advnetlab-amp04.anl.eng.bos2.dc.redhat.com" + if not steps_to_run: + steps_to_run = [ + "clean_up_imc", + "flash_ssd_image", + "flash_spi_image", + ] + self.steps_to_run = steps_to_run + if self.dry_run: + self.logger.info( + "DRY RUN, This is just a preview of the actions that will be taken" + ) + self.logger.debug(f"version_to_flash: {self.version_to_flash}") + self.logger.debug(f"imc_address: {self.imc_address}") + self.logger.debug(f"steps_to_run: {self.steps_to_run}") + self.logger.debug(f"repo_url: {self.repo_url}") + self.logger.debug(f"dry_run: {self.dry_run}") + self.logger.debug(f"verbose: {self.verbose}") + + def should_run(self, step_name: str) -> bool: + """Check if the step should be run""" + return step_name in self.steps_to_run + + def reflash_ipu(self) -> None: + self.logger.info("Reflashing the firmware of IPU.") + + if not self.dry_run: + self.logger.info("Detecting version") + result = get_current_version( + imc_address=self.imc_address, logger=self.logger + ) + if result.returncode: + current_version = minicom_get_version(self.logger) + else: + current_version = result.out + self.logger.info(f"Version: '{self.version_to_flash}'") + if current_version == "1.2.0.7550": + self.steps_to_run.insert(0, "ipu_runtime_access") + else: + self.logger.info("[DRY RUN] Detecting version") + + # Retrieve images if not a dry run + self.logger.info("Retrieving images.....") + ssd_image_path, spi_image_path = ( + self.get_images() if not self.dry_run else ("", "") + ) + self.logger.info("Done Retrieving images") + + # Step 1: ipu_runtime_access + self.logger.info("Step 1: ipu_runtime_access") + if self.should_run("ipu_runtime_access"): + self.ipu_runtime_access() + else: + logging.info("Skipping ipu_runtime_access") + + # Step 2: clean_up_imc + self.logger.info("Step 2: clean_up_imc") + if self.should_run("clean_up_imc"): + self.clean_up_imc() + else: + logging.info("Skipping clean_up_imc") + + # Step 3: Flash SSD image using dd + self.logger.info("Step 3: flash_ssd_image") + if self.should_run("flash_ssd_image"): + result = run( + f"dd bs=16M if={ssd_image_path} | ssh -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null {self.imc_address} 'dd bs=16M of=/dev/nvme0n1' status=progress", + dry_run=self.dry_run, + ) + if result.returncode: + self.logger.error("Failed to flash_ssd_image") + sys.exit(result.returncode) + else: + self.logger.info("Skipping flash_ssd_image") + + # Step 4: Flash SPI image + self.logger.info("Step 4: flash_spi_image") + if self.should_run("flash_spi_image"): + result = run( + f"ssh -o 'StrictHostKeyChecking=no' -o 'UserKnownHostsFile=/dev/null' {self.imc_address} 'flash_erase /dev/mtd0 0 0'", + dry_run=self.dry_run, + ) + if result.returncode: + self.logger.error("Failed to erase SPI image") + sys.exit(result.returncode) + + result = run( + f"dd bs=16M if={spi_image_path} | ssh -o 'StrictHostKeyChecking=no' -o 'UserKnownHostsFile=/dev/null' {self.imc_address} 'dd bs=16M of=/dev/mtd0 status=progress'", + dry_run=self.dry_run, + ) + if result.returncode: + self.logger.error("Failed to flash_spi_image") + sys.exit(result.returncode) + else: + self.logger.info("Skipping flash_spi_image") + + # Step 5: Reboot IMC + self.logger.info("Done!") + self.logger.info(f"Please cold reboot IMC at {self.imc_address}") + + def ipu_runtime_access(self) -> None: + if self.dry_run: + + self.logger.debug("[DRY RUN] pkill -9 minicom") + self.logger.debug( + "[DRY RUN] Wait for 'Press CTRL-A Z for help on special keys.'" + ) + self.logger.debug("[DRY RUN] Ready to enter command") + self.logger.debug("[DRY RUN] Send '/etc/ipu/ipu_runtime_access'") + self.logger.debug("[DRY RUN] Wait for '.*#'") + self.logger.debug("[DRY RUN] Capturing and printing output") + self.logger.debug("[DRY RUN] Send Ctrl-A and 'x' to exit minicom") + self.logger.debug("[DRY RUN] Expect EOF") + else: + run("pkill -9 minicom") + self.logger.debug("Configuring minicom") + configure_minicom() + self.logger.debug("spawn minicom") + child = pexpect.spawn(minicom_cmd("imc")) + child.maxread = 10000 + pexpect_child_wait( + child, ".*Press CTRL-A Z for help on special keys.*", 120 + ) + self.logger.debug("Ready to enter command") + child.sendline("/etc/ipu/ipu_runtime_access") + # Wait for the expected response (adjust the timeout as needed) + pexpect_child_wait(child, ".*Enabling network and sshd.*", 120) + + # Capture and self.logger.debug the output + assert child.before is not None + self.logger.debug(child.before.decode("utf-8")) + self.logger.debug(child.after.decode("utf-8")) + # Gracefully close Picocom (equivalent to pressing Ctrl-A and Ctrl-X) + child.sendcontrol("a") + child.sendline("x") + # Ensure Picocom closes properly + child.expect(pexpect.EOF) + + def clean_up_imc(self) -> None: + self.logger.info("Cleaning up IMC via SSH") + + # Execute the commands over SSH with dry_run handling + run( + f"ssh -o 'StrictHostKeyChecking=no' -o 'UserKnownHostsFile=/dev/null' {self.imc_address} 'umount -l /dev/loop0'", + dry_run=self.dry_run, + ) + run( + f"ssh -o 'StrictHostKeyChecking=no' -o 'UserKnownHostsFile=/dev/null' {self.imc_address} 'umount -l /dev/nvme0n1p*'", + dry_run=self.dry_run, + ) + run( + f"ssh -o 'StrictHostKeyChecking=no' -o 'UserKnownHostsFile=/dev/null' {self.imc_address} 'killall -9 tgtd'", + dry_run=self.dry_run, + ) + + self.logger.debug("Filling nvme0n1 with zeros") + run( + f"ssh -o 'StrictHostKeyChecking=no' -o 'UserKnownHostsFile=/dev/null' {self.imc_address} 'dd if=/dev/zero of=/dev/nvme0n1 bs=64k status=progress'", + dry_run=self.dry_run, + ) + self.logger.debug("Done filling nvme0n1 with zeros") + + def get_images(self) -> tuple[str, str]: + """ + Download and extract the SSD image and recovery firmware for the given version. + Return the paths for both files. + """ + base_url = f"http://{self.repo_url}/intel-ipu-mev-{self.version_to_flash}" + download_dir = "/tmp" # Or any preferred directory for temp storage + + # URLs for the tar.gz files based on self.version + ssd_tar_url = ( + f"{base_url}/intel-ipu-eval-ssd-image-{self.version_to_flash}.tar.gz" + ) + recovery_tar_url = f"{base_url}/intel-ipu-recovery-firmware-and-tools-{self.version_to_flash}.tar.gz" + + # Download both tar.gz files + ssd_tar_path = download_file(ssd_tar_url, download_dir) + recovery_tar_path = download_file(recovery_tar_url, download_dir) + + # Extract both tar.gz files + extracted_ssd_files = extract_tar_gz(ssd_tar_path, download_dir) + extracted_recovery_files = extract_tar_gz(recovery_tar_path, download_dir) + + # Assume the identifier is 1001 for recovery firmware, but this could be passed as an argument + identifier = "1001" + + # Find the required .bin files + ssd_bin_file = find_image(extracted_ssd_files, "ssd-image-mev.bin") + recovery_bin_file = find_image( + extracted_recovery_files, "intel-ipu-recovery-firmware", identifier + ) + + return ssd_bin_file, recovery_bin_file diff --git a/dpu-tools/minicom.py b/dpu-tools/minicom.py new file mode 100644 index 0000000..85595a3 --- /dev/null +++ b/dpu-tools/minicom.py @@ -0,0 +1,57 @@ +import time +import pexpect +import os +import shutil +import tempfile +import logging + + +logger = logging.getLogger(__name__) + + +def minicom_cmd(dpu_type: str) -> str: + return ( + "minicom -b 460800 -D /dev/ttyUSB2" + if dpu_type == "imc" + else "minicom -b 115200 -D /dev/ttyUSB0" + ) + + +def pexpect_child_wait(child: pexpect.spawn, pattern: str, timeout: float) -> float: + logger.debug(f"Waiting {timeout} sec for pattern '{pattern}'") + start_time = time.time() + found = False + last_exception = None + while timeout and not found: + cur_wait = min(timeout, 30) + try: + last_exception = None + child.expect(pattern, timeout=cur_wait) + found = True + break + except Exception as e: + last_exception = e + timeout -= cur_wait + pass + + if not found: + assert last_exception + raise last_exception + return round(time.time() - start_time, 2) + + +def configure_minicom() -> None: + minirc_path = "/root/.minirc.dfl" + if os.path.exists(minirc_path): + backed_up = True + with tempfile.NamedTemporaryFile(delete=False) as temp_file: + temp_file_path = temp_file.name + shutil.move(minirc_path, temp_file_path) + else: + backed_up = False + temp_file_path = "" + + with open(minirc_path, "w") as new_file: + new_file.write("pu rtscts No\n") + if backed_up: + shutil.move(temp_file_path, minirc_path) diff --git a/dpu-tools/mypy.ini b/dpu-tools/mypy.ini new file mode 100644 index 0000000..5e90737 --- /dev/null +++ b/dpu-tools/mypy.ini @@ -0,0 +1,10 @@ +[mypy] +strict = true +scripts_are_modules = true +files = *.py, dpu-tools + +[mypy-pexpect] +ignore_missing_imports = true + +[mypy-requests] +ignore_missing_imports = true diff --git a/mypy.ini b/mypy.ini index b467ccc..4f56633 100644 --- a/mypy.ini +++ b/mypy.ini @@ -1,7 +1,10 @@ [mypy] strict = true scripts_are_modules = true -files = *.py, bfb, console, cx_fwup, fwdefaults, fwup, fwversion, get_mode, listbf, pxeboot, reset, set_mode, dpu-tools/dpu-tools +files = *.py, bfb, console, cx_fwup, fwdefaults, fwup, fwversion, get_mode, listbf, pxeboot, reset, set_mode, dpu-tools/dpu-tools, dpu-tools/*.py [mypy-pexpect] ignore_missing_imports = true + +[mypy-requests] +ignore_missing_imports = true