diff --git a/dpu-tools b/dpu-tools index 8ce93f2..c78be1f 100755 --- a/dpu-tools +++ b/dpu-tools @@ -1,8 +1,10 @@ #!/usr/bin/env python3 +from abc import ABC, abstractmethod import argparse import sys import logging +from typing import Optional from utils.fwutils import BFFirmware, IPUFirmware, cx_fwup from utils.common_ipu import ( VERSIONS, @@ -23,25 +25,114 @@ from utils.pxeboot import Pxeboot logger = logging.getLogger(__name__) -class DPUTools: - def __init__(self, dpu_type: str, args: argparse.Namespace): - # Initialize shared attributes - self.args = args - assert dpu_type is not None - self.dpu_type = dpu_type.upper() +class DPUTools(ABC): + def __init__(self, parser: argparse.ArgumentParser) -> None: + self.args = self.setup_arguments(parser) def dispatch(self) -> None: """Map subcommands to methods and execute the chosen command.""" + command_map = { + "list_dpus": self.list_dpus, + "cx-fwup": cx_fwup, + } + # Execute the selected command + if self.args.subcommand in command_map: + command_map[self.args.subcommand]() + + @abstractmethod + def reset(self) -> None: + pass + + @abstractmethod + def firmware_up(self) -> None: + pass + + @abstractmethod + def firmware_reset(self) -> None: + pass + + @abstractmethod + def firmware_version(self) -> None: + pass + + @abstractmethod + def console(self) -> None: + pass + + def list_dpus(self) -> None: + """ + This function + """ + devs = scan_for_dpus() + print("ID netdev PCI-Address Kind") + print("----- -------- ------------ ------") + for i, (k, (d, kind)) in enumerate(devs.items()): + print(f"{i: 5d} {k.ljust(8)} {d.ljust(12)} {kind}") + + def setup_logging(self) -> None: + setup_logging(self.args.verbose) + + def setup_arguments(self, parser: argparse.ArgumentParser) -> argparse.Namespace: + """Add common arguments and subcommands.""" + # Add global arguments + parser.add_argument( + "--verbose", action="store_true", help="Enable verbose output" + ) + parser.add_argument("--dry-run", action="store_true", help="Simulate changes") + + # Add shared subcommands + subparsers = parser.add_subparsers(title="subcommands", dest="subcommand") + self._add_shared_subcommands(subparsers) + + # Let the subclass add its own arguments + self._add_subclass_specific_arguments(parser, subparsers) + + # add help later, when all other args have been added + parser.add_argument( + "-h", + "--help", + action="help", + default=argparse.SUPPRESS, + help="Show this help message and exit.", + ) + return parser.parse_args() + + def _add_shared_subcommands(self, subparsers: argparse._SubParsersAction) -> None: + """Add subcommands common to all DPU types.""" + list_parser = subparsers.add_parser("list", help="List all DPUs") + list_parser.set_defaults(subcommand="list_dpus") + + utils_parser = subparsers.add_parser( + "utils", help="Other non-dpu utilities that may still be useful" + ) + utils_subparser = utils_parser.add_subparsers( + title="utils subcommand", help="Houses the useful non-dpu utilities" + ) + cx_fwup_parser = utils_subparser.add_parser( + "cx-fwup", help="Used for upgrading Nvidia cx Smart Nics" + ) + cx_fwup_parser.set_defaults(subcommand="cx-fwup") + + @abstractmethod + def _add_subclass_specific_arguments( + self, parser: argparse.ArgumentParser, subparsers: argparse._SubParsersAction + ) -> None: + """Subclasses must define their specific arguments and subcommands.""" + pass + + +class BFTools(DPUTools): + def dispatch(self) -> None: + """Map subcommands to methods and execute the chosen command.""" + super().dispatch() command_map = { "reset": self.reset, "firmware_reset": self.firmware_reset, "firmware_up": self.firmware_up, "firmware_version": self.firmware_version, - "list_dpus": self.list_dpus, "console": self.console, "mode": self.mode, "pxeboot": self.pxeboot, - "cx-fwup": cx_fwup, "bfb": self.bfb, } # Execute the selected command @@ -52,92 +143,25 @@ class DPUTools: sys.exit(1) def reset(self) -> None: - if self.dpu_type == DPUType.IPU.name: - run( - f"ssh -o 'StrictHostKeyChecking=no' -o 'UserKnownHostsFile=/dev/null' {self.args.imc_address} 'reboot'" - ) - if self.dpu_type == DPUType.BF.name: - bf_reset(self.args.bf_id) + bf_reset(self.args.bf_id) def firmware_up(self) -> None: - if self.dpu_type == DPUType.IPU.name: - fw = IPUFirmware( - self.args.imc_address, - self.args.version, - repo_url=self.args.repo_url, - dry_run=self.args.dry_run, - verbose=self.args.verbose, - ) - fw.reflash_ipu() - elif self.dpu_type == DPUType.BF.name: - bf_fw = BFFirmware(self.args.bf_id, self.args.version) - bf_fw.firmware_up() + bf_fw = BFFirmware(self.args.bf_id, self.args.version) + bf_fw.firmware_up() def firmware_reset(self) -> None: - if self.dpu_type == DPUType.IPU.name: - result = get_current_version(self.args.imc_address, logger=logger) - if result.returncode: - logger.debug("Failed with ssh, trying minicom!") - try: - minicom_get_version(logger=logger) - except Exception as e: - logger.error(f"Error ssh try: {result.err}") - logger.error(f"Exception with minicom: {e}") - logger.error("Exiting...") - sys.exit(result.returncode) - fw = IPUFirmware( - self.args.imc_address, - version=result.out, - repo_url=self.args.repo_url, - dry_run=self.args.dry_run, - verbose=self.args.verbose, - ) - fw.reflash_ipu() - elif self.dpu_type == DPUType.BF.name: - bf_fw = BFFirmware(self.args.bf_id) - bf_fw.firmware_reset() + bf_fw = BFFirmware(self.args.bf_id) + bf_fw.firmware_reset() def firmware_version(self) -> None: - if self.dpu_type == DPUType.IPU.name: - result = get_current_version(self.args.imc_address, logger=logger) - if result.returncode: - logger.debug("Failed with ssh, trying minicom!") - try: - minicom_get_version(logger=logger) - except Exception as e: - logger.error(f"Error ssh try: {result.err}") - logger.error(f"Exception with minicom: {e}") - logger.error("Exiting...") - sys.exit(result.returncode) - print(result.out) - elif self.dpu_type == DPUType.BF.name: - bf_fw = BFFirmware(self.args.bf_id) - bf_fw.firmware_version() + bf_fw = BFFirmware(self.args.bf_id) + bf_fw.firmware_version() def console(self) -> None: - logger.debug(f"Current DPU Type: {self.dpu_type}") - # TODO Since we can only console into the ipu through the provisioner, the dpu type should be given as an argument if self.args.dpu_type is None: logger.error("Console requires --dpu-type to function") exit(1) - if self.dpu_type == DPUType.IPU.name: - console_ipu(self.args) - elif self.dpu_type == DPUType.BF.name: - console_bf(self.args) - elif self.dpu_type == DPUType.OCTEON.name: - logger.error("Marvel Octeon console support has not yet been implemented!") - else: - logger.error("UNKNOWN Type") - - def list_dpus(self) -> None: - """ - This function - """ - devs = scan_for_dpus() - print("ID netdev PCI-Address Kind") - print("----- -------- ------------ ------") - for i, (k, (d, kind)) in enumerate(devs.items()): - print(f"{i: 5d} {k.ljust(8)} {d.ljust(12)} {kind}") + console_bf(self.args) def mode(self) -> None: if self.args.set_mode: @@ -150,76 +174,43 @@ class DPUTools: px.start_pxeboot() def bfb(self) -> None: - if self.dpu_type == DPUType.BF.name: - download_bfb(self.args.bf_id) - + download_bfb(self.args.bf_id) -def main() -> None: - parser = argparse.ArgumentParser(description="Tools to interact with an IPU") - parser.add_argument( - "--dpu-type", choices=["bf", "ipu"], help="Specify the DPU type" - ) - known_args, remaining_known_args = parser.parse_known_args() - dpu_type = known_args.dpu_type - # Step 2: Detect DPU type if --dpu-type is not provided - if dpu_type is None: - logger.debug(f"Dpu_type was not given, detecting it...") - detected = detect_dpu_type() - if detected.returncode != 0: - logging.error(f"Couldn't detect DPU type: {detected.err}") - sys.exit(detected.returncode) - dpu_type = detected.out.upper() - logger.debug(f"Detected dpu_type: {dpu_type}") - else: - dpu_type = dpu_type.upper() - - parser.add_argument( - "--verbose", - action="store_true", - help="Increase output verbosity", - ) - if dpu_type == DPUType.BF.name: + def _add_subclass_specific_arguments( + self, parser: argparse.ArgumentParser, subparsers: argparse._SubParsersAction + ) -> None: + """Subclasses must define their specific arguments and subcommands.""" parser.add_argument("-i", "--bf-id", type=int, default=0, help="Specify BF ID") - parser.add_argument("--dry-run", action="store_true", help="Simulate changes") - subparsers = parser.add_subparsers(title="subcommands", dest="subcommand") - - reset_parser = subparsers.add_parser("reset", help="Reset the IPU") - reset_parser.set_defaults(subcommand="reset") - if dpu_type == DPUType.IPU.name: - reset_parser.add_argument("--imc-address", required=True, help="IMC address") - - firmware_parser = subparsers.add_parser("firmware", help="Control the IPU firmware") - if dpu_type == DPUType.IPU.name: - firmware_parser.add_argument("--imc-address", required=True, help="IMC address") - firmware_parser.add_argument("--repo-url", help="Firmware repo URL") - firmware_subparsers = firmware_parser.add_subparsers(dest="firmware_command") + reset_parser = subparsers.add_parser("reset", help="Reset the BF") + reset_parser.set_defaults(subcommand="reset") + firmware_parser = subparsers.add_parser( + "firmware", help="Control the BF firmware" + ) + firmware_subparsers = firmware_parser.add_subparsers(dest="firmware_command") - firmware_subparsers.add_parser("reset", help="Reset firmware").set_defaults( - subcommand="firmware_reset" - ) - firmware_up_parser = firmware_subparsers.add_parser("up", help="Update firmware") - firmware_up_parser.set_defaults(subcommand="firmware_up") + firmware_subparsers.add_parser("reset", help="Reset firmware").set_defaults( + subcommand="firmware_reset" + ) + firmware_up_parser = firmware_subparsers.add_parser( + "up", help="Update firmware" + ) + firmware_up_parser.set_defaults(subcommand="firmware_up") - if dpu_type == DPUType.BF.name: firmware_up_parser.add_argument( "--version", action="store_true", help="BF Version to Upgrade" ) - elif dpu_type == DPUType.IPU.name: - firmware_up_parser.add_argument("--version", choices=VERSIONS) - firmware_subparsers.add_parser("version", help="Get firmware version").set_defaults( - subcommand="firmware_version" - ) + firmware_subparsers.add_parser( + "version", help="Get firmware version" + ).set_defaults(subcommand="firmware_version") - subparsers.add_parser("list", help="List devices").set_defaults( - subcommand="list_dpus" - ) + subparsers.add_parser("list", help="List devices").set_defaults( + subcommand="list_dpus" + ) - console_parser = subparsers.add_parser("console", help="Open IPU console") - console_parser.set_defaults(subcommand="console") - console_parser.add_argument("--target", choices=["imc", "acc"], default="imc") + console_parser = subparsers.add_parser("console", help="Open BF console") + console_parser.set_defaults(subcommand="console") - if dpu_type == DPUType.BF.name: mode_parser = subparsers.add_parser( "mode", help="Retrieves the current mode for a Bluefield" ) @@ -259,24 +250,150 @@ def main() -> None: "-w", metavar="key", dest="key", default="", type=str, help=help ) - utils_parser = subparsers.add_parser( - "utils", help="Other non-dpu utilities that may still be useful" - ) - utils_subparser = utils_parser.add_subparsers( - title="utils subcommand", help="Houses the useful non-dpu utilities" - ) - cx_fwup_parser = utils_subparser.add_parser( - "cx-fwup", help="Used for upgrading Nvidia cx Smart Nics" + bfb_parser = subparsers.add_parser( + "bfb", help="Downloads BFB images and sends it to BF" + ) + bfb_parser.set_defaults(subcommand="bfb") + + +class IPUTools(DPUTools): + def dispatch(self) -> None: + """Map subcommands to methods and execute the chosen command.""" + super().dispatch() + command_map = { + "reset": self.reset, + "firmware_reset": self.firmware_reset, + "firmware_up": self.firmware_up, + "firmware_version": self.firmware_version, + "console": self.console, + } + # Execute the selected command + if self.args.subcommand in command_map: + command_map[self.args.subcommand]() + else: + print("Invalid command. Use --help for a list of available commands.") + sys.exit(1) + + def reset(self) -> None: + run( + f"ssh -o 'StrictHostKeyChecking=no' -o 'UserKnownHostsFile=/dev/null' {self.args.imc_address} 'reboot'" + ) + + def firmware_up(self) -> None: + fw = IPUFirmware( + self.args.imc_address, + self.args.version, + repo_url=self.args.repo_url, + dry_run=self.args.dry_run, + verbose=self.args.verbose, + ) + fw.reflash_ipu() + + def firmware_reset(self) -> None: + result = get_current_version(self.args.imc_address, logger=logger) + if result.returncode: + logger.debug("Failed with ssh, trying minicom!") + try: + minicom_get_version(logger=logger) + except Exception as e: + logger.error(f"Error ssh try: {result.err}") + logger.error(f"Exception with minicom: {e}") + logger.error("Exiting...") + sys.exit(result.returncode) + fw = IPUFirmware( + self.args.imc_address, + version=result.out, + repo_url=self.args.repo_url, + dry_run=self.args.dry_run, + verbose=self.args.verbose, + ) + fw.reflash_ipu() + + def firmware_version(self) -> None: + result = get_current_version(self.args.imc_address, logger=logger) + if result.returncode: + logger.debug("Failed with ssh, trying minicom!") + try: + minicom_get_version(logger=logger) + except Exception as e: + logger.error(f"Error ssh try: {result.err}") + logger.error(f"Exception with minicom: {e}") + logger.error("Exiting...") + sys.exit(result.returncode) + print(result.out) + + def console(self) -> None: + # NOTE Since we can only console into the ipu through the provisioner, the dpu type should be given as an argument + if self.args.dpu_type is None: + logger.error("Console requires --dpu-type to function") + exit(1) + console_ipu(self.args) + + def _add_subclass_specific_arguments( + self, parser: argparse.ArgumentParser, subparsers: argparse._SubParsersAction + ) -> None: + parser.add_argument("--imc-address", required=True, help="IMC address") + reset_parser = subparsers.add_parser("reset", help="Reset the IPU") + reset_parser.set_defaults(subcommand="reset") + + firmware_parser = subparsers.add_parser( + "firmware", help="Control the IPU firmware" + ) + firmware_parser.add_argument("--repo-url", help="Firmware repo URL") + firmware_subparsers = firmware_parser.add_subparsers(dest="firmware_command") + + firmware_subparsers.add_parser("reset", help="Reset firmware").set_defaults( + subcommand="firmware_reset" + ) + firmware_up_parser = firmware_subparsers.add_parser( + "up", help="Update firmware" + ) + firmware_up_parser.set_defaults(subcommand="firmware_up") + + firmware_up_parser.add_argument("--version", choices=VERSIONS) + + firmware_subparsers.add_parser( + "version", help="Get firmware version" + ).set_defaults(subcommand="firmware_version") + console_parser = subparsers.add_parser("console", help="Open IPU console") + console_parser.set_defaults(subcommand="console") + console_parser.add_argument("--target", choices=["imc", "acc"], default="imc") + + +def main() -> None: + parser = argparse.ArgumentParser( + formatter_class=argparse.ArgumentDefaultsHelpFormatter, + add_help=False, + description="Tools to interact with an IPU", ) - cx_fwup_parser.set_defaults(subcommand="cx-fwup") - bfb_parser = utils_subparser.add_parser( - "bfb", help="Downloads BFB images and sends it to BF" + parser.add_argument( + "--dpu-type", choices=["bf", "ipu"], help="Specify the DPU type" ) - bfb_parser.set_defaults(subcommand="bfb") - # Parse arguments and initialize DPUTools - args = parser.parse_args(remaining_known_args) - setup_logging(args.verbose) - dpu_tools = DPUTools(dpu_type, args) + known_args, _ = parser.parse_known_args() + dpu_type = known_args.dpu_type + # Step 2: Detect DPU type if --dpu-type is not provided + if dpu_type is None: + logger.debug(f"Dpu_type was not given, detecting it...") + detected = detect_dpu_type() + if detected.returncode != 0: + logging.error(f"Couldn't detect DPU type: {detected.err}") + sys.exit(detected.returncode) + dpu_type = detected.out.upper() + logger.debug(f"Detected dpu_type: {dpu_type}") + else: + dpu_type = dpu_type.upper() + + dpu_tools: Optional[DPUTools] = None + if dpu_type == DPUType.IPU.name: + dpu_tools = IPUTools(parser) + elif dpu_type == DPUType.BF.name: + dpu_tools = BFTools(parser) + else: + logger.error(f"Dpu_type: {dpu_type} has not yet been implemented") + exit(1) + assert dpu_tools is not None + + dpu_tools.setup_logging() dpu_tools.dispatch()