From 40f7d3fdd568012ee3bc43701d25aa9b70025a34 Mon Sep 17 00:00:00 2001 From: Calvin Remsburg Date: Mon, 15 Jan 2024 17:51:45 -0600 Subject: [PATCH] Update type hints in upgrade.py --- upgrade.py | 97 +++++++++++++++++++++++++++--------------------------- 1 file changed, 49 insertions(+), 48 deletions(-) diff --git a/upgrade.py b/upgrade.py index 6a03300..2e9f03f 100644 --- a/upgrade.py +++ b/upgrade.py @@ -5,7 +5,7 @@ import sys import time from logging.handlers import RotatingFileHandler -from typing import Dict, Tuple, Union +from typing import Dict, List, Tuple, Union # Palo Alto Networks PAN-OS imports import panos @@ -473,7 +473,9 @@ def parse_version(version: str) -> Tuple[int, int, int, int]: upgrade_needed = current_version < target_version if upgrade_needed: - logging.info("Upgrade is required.") + logging.info( + f"Confirmed that moving from {firewall.version} to {target_major}.{target_minor}.{target_maintenance} is an upgrade" + ) return else: @@ -721,7 +723,7 @@ def run_assurance( firewall: Firewall, hostname: str, operation_type: str, - action: str, + actions: List[str], config: Dict[str, Union[str, int, float, bool]], ) -> Union[AssuranceReport, None]: """ @@ -738,8 +740,8 @@ def run_assurance( An instance of the Firewall class representing the firewall to operate on. operation_type : str The type of operation to be executed, e.g., 'readiness_check', 'state_snapshot', 'report'. - action : str - The specific action to be performed within the operation type. + actions : List[str] + A list of specific actions to be performed within the operation type. config : Dict[str, Union[str, int, float, bool]] Configuration settings for the specified action. @@ -763,48 +765,40 @@ def run_assurance( assurance_firewall = FirewallProxy(firewall) results = None - passed = True if operation_type == "readiness_check": - if action not in AssuranceOptions.READINESS_CHECKS: - logging.error(f"Invalid action for readiness check: {action}") - return - logging.info(f"Performing readiness check: {action}") + for action in actions: + if action not in AssuranceOptions.READINESS_CHECKS: + logging.error(f"Invalid action for readiness check: {action}") + return - checks = CheckFirewall(assurance_firewall) + logging.info(f"Performing readiness check: {action}") - # check if arp entry exists - checks_configuration = [{action: config}] + checks = CheckFirewall(assurance_firewall) + checks_configuration = {action: config} - # run checks - try: - logging.info("Running readiness checks...") - results = checks.run_readiness_checks(checks_configuration) - logging.debug(results) - for check in checks_configuration: - check_name = list(check.keys())[0] - passed = passed & results[check_name]["state"] - - if not results[check_name]["state"]: - logging.info( - "FAILED: %s - %s", check_name, results[check_name]["reason"] - ) - - except Exception as e: - logging.error("Error running readiness checks: %s", e) - return + try: + logging.info("Running readiness checks...") + result = checks.run_readiness_check(checks_configuration) + if result["state"]: + logging.info(f"Passed: {action}") + else: + logging.error(f"FAILED: {action} - {result['reason']}") + results = results or {} + results.update({action: result}) + except Exception as e: + logging.error(f"Error running readiness checks: {e}") + return logging.info("Completed checks successfully!") elif operation_type == "state_snapshot": - actions = action.split(",") - snapshot_node = CheckFirewall(assurance_firewall) # validate each type of action - for each in actions: - if each not in AssuranceOptions.STATE_SNAPSHOTS: - logging.error(f"Invalid action for state snapshot: {each}") + for action in actions: + if action not in AssuranceOptions.STATE_SNAPSHOTS: + logging.error(f"Invalid action for state snapshot: {action}") return # take snapshots @@ -820,15 +814,16 @@ def run_assurance( return None except Exception as e: - logging.error("Error running readiness checks: %s", e) + logging.error("Error running snapshots: %s", e) return elif operation_type == "report": - if action not in AssuranceOptions.REPORTS: - logging.error(f"Invalid action for report: {action}") - return - logging.info(f"Generating report: {action}") - # result = getattr(Report(firewall), action)(**config) + for action in actions: + if action not in AssuranceOptions.REPORTS: + logging.error(f"Invalid action for report: {action}") + return + logging.info(f"Generating report: {action}") + # result = getattr(Report(firewall), action)(**config) else: logging.error(f"Invalid operation type: {operation_type}") @@ -939,14 +934,22 @@ def main() -> None: logging.info(f"{args['target_version']} has been downloaded.") # Begin collecting network state information with panos-upgrade-assurance - logging.info("Collecting network state information...") + logging.info("Taking a pre-upgrade snapshot of network state information...") if image_downloaded: # Use the modified run_assurance function assurance_report = run_assurance( firewall, firewall_details.hostname, operation_type="state_snapshot", - action="arp_table,content_version,ip_sec_tunnels,license,nics,routes,session_stats", + actions=[ + "arp_table", + "content_version", + "ip_sec_tunnels", + "license", + "nics", + "routes", + "session_stats", + ], config={}, ) @@ -967,11 +970,9 @@ def main() -> None: else: logging.error("Failed to create Assurance Report") - logging.info(f"Network state information collected from {firewall.serial}") - - logging.info( - f"Network state information collected from {firewall_details.hostname}" - ) + logging.info( + f"Pre-Upgrade network state snapshot collected from {firewall_details.hostname}, saved to {file_path}" + ) if __name__ == "__main__":