diff --git a/.gitignore b/.gitignore index 55eb424..2141d12 100644 --- a/.gitignore +++ b/.gitignore @@ -163,4 +163,6 @@ cython_debug/ .env # ignore my personal snapshots -snapshots/ +assurance/snapshots/ +assurance/reports/ +assurance/readiness_checks/ diff --git a/snapshots/.gitkeep b/assurance/.gitkeep similarity index 100% rename from snapshots/.gitkeep rename to assurance/.gitkeep diff --git a/assurance/reports/.gitkeep b/assurance/reports/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/assurance/snapshots/.gitkeep b/assurance/snapshots/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/models/__init__.py b/models/__init__.py index 5ac77bb..cce4c0b 100644 --- a/models/__init__.py +++ b/models/__init__.py @@ -4,10 +4,10 @@ from .arp_table import ArpTableEntry # trunk-ignore(ruff/F401) -from .content_version import ContentVersion +from .assurance_report import ReadinessCheckReport, SnapshotReport # trunk-ignore(ruff/F401) -from .assurance_report import AssuranceReport +from .content_version import ContentVersion # trunk-ignore(ruff/F401) from .ip_sec_tunnel import IPSecTunnelEntry diff --git a/models/assurance_report.py b/models/assurance_report.py index 5760ae3..bb51470 100644 --- a/models/assurance_report.py +++ b/models/assurance_report.py @@ -11,7 +11,7 @@ from .session_stats import SessionStats -class AssuranceReport(BaseModel): +class SnapshotReport(BaseModel): hostname: str arp_table: Optional[Dict[str, ArpTableEntry]] = None content_version: Optional[ContentVersion] = None @@ -20,3 +20,26 @@ class AssuranceReport(BaseModel): nics: Optional[Dict[str, NetworkInterfaceStatus]] = None routes: Optional[Dict[str, RouteEntry]] = None session_stats: Optional[SessionStats] = None + + +class ReadinessCheckResult(BaseModel): + state: bool + reason: str + + +class ReadinessCheckReport(BaseModel): + active_support: Optional[ReadinessCheckResult] = None + arp_entry_exist: Optional[ReadinessCheckResult] = None + candidate_config: Optional[ReadinessCheckResult] = None + certificates_requirements: Optional[ReadinessCheckResult] = None + content_version: Optional[ReadinessCheckResult] = None + dynamic_updates: Optional[ReadinessCheckResult] = None + expired_licenses: Optional[ReadinessCheckResult] = None + free_disk_space: Optional[ReadinessCheckResult] = None + ha: Optional[ReadinessCheckResult] = None + ip_sec_tunnel_status: Optional[ReadinessCheckResult] = None + jobs: Optional[ReadinessCheckResult] = None + ntp_sync: Optional[ReadinessCheckResult] = None + panorama: Optional[ReadinessCheckResult] = None + planes_clock_sync: Optional[ReadinessCheckResult] = None + session_exist: Optional[ReadinessCheckResult] = None diff --git a/upgrade.py b/upgrade.py index f5a1e88..23366ac 100644 --- a/upgrade.py +++ b/upgrade.py @@ -24,7 +24,7 @@ from pydantic import BaseModel # project imports -from models import AssuranceReport +from models import SnapshotReport, ReadinessCheckReport # ---------------------------------------------------------------------------- @@ -44,38 +44,107 @@ # ---------------------------------------------------------------------------- class AssuranceOptions: """ - AssuranceOptions provides configuration options for the panos-upgrade-assurance process. + Configuration options for the panos-upgrade-assurance process. - This class encapsulates the configurations used in the upgrade assurance process for PAN-OS appliances. It includes definitions for various readiness checks, state snapshots, and reports that are essential in the upgrade process of PAN-OS appliances. + This class encapsulates various configurations used in the upgrade assurance process for PAN-OS appliances. + It includes definitions for readiness checks, state snapshots, and reports, which are crucial in the upgrade + process of PAN-OS appliances. Attributes ---------- - READINESS_CHECKS : list of str - A list of readiness checks to be performed on the PAN-OS appliance. These checks include various system and network parameters like active support status, arp entry existence, candidate configuration, etc. - - STATE_SNAPSHOTS : list of str - A list of state snapshot types to be taken from the PAN-OS appliance. These snapshots capture essential data regarding the appliance's current state, such as arp table, content version, IP sec tunnels, etc. + READINESS_CHECKS : dict + A dictionary mapping each readiness check to its description, log level, and whether to exit on failure. + This provides a more detailed context for each check, allowing for tailored logging and error handling. REPORTS : list of str - A list of report types that can be generated from the PAN-OS appliance. These reports include detailed information about various aspects of the appliance like arp table, content version, IP sec tunnels, license details, etc. + A list of report types that can be generated for the PAN-OS appliance. These reports provide detailed + information on various aspects of the appliance, including ARP table, content version, IPsec tunnels, + license details, and more. + + STATE_SNAPSHOTS : list of str + A list of state snapshot types to capture from the PAN-OS appliance. These snapshots record critical + data regarding the appliance's current state, such as ARP table, content version, IPsec tunnels, etc. """ - READINESS_CHECKS = [ - "active_support", - "arp_entry_exist", - "candidate_config", - "content_version", - "free_disk_space", - "expired_licenses", - "ha", - "ip_sec_tunnel_status", - "ntp_sync", - "panorama", - "planes_clock_sync", - "session_exist", - ] + READINESS_CHECKS = { + "active_support": { + "description": "Check if active support is available", + "log_level": "warning", + "exit_on_failure": False, + }, + "arp_entry_exist": { + "description": "Check if a given ARP entry is available in the ARP table", + "log_level": "warning", + "exit_on_failure": False, + }, + "candidate_config": { + "description": "Check if there are pending changes on device", + "log_level": "error", + "exit_on_failure": True, + }, + "certificates_requirements": { + "description": "Check if the certificates' keys meet minimum size requirements", + "log_level": "warning", + "exit_on_failure": False, + }, + "content_version": { + "description": "Running Latest Content Version", + "log_level": "warning", + "exit_on_failure": False, + }, + "dynamic_updates": { + "description": "Check if any Dynamic Update job is scheduled to run within the specified time window", + "log_level": "warning", + "exit_on_failure": False, + }, + "expired_licenses": { + "description": "No Expired Licenses", + "log_level": "warning", + "exit_on_failure": False, + }, + "free_disk_space": { + "description": "Check if a there is enough space on the `/opt/panrepo` volume for downloading an PanOS image.", + "log_level": "error", + "exit_on_failure": True, + }, + "ha": { + "description": "Checks HA pair status from the perspective of the current device", + "log_level": "info", + "exit_on_failure": False, + }, + "ip_sec_tunnel_status": { + "description": "Check if a given IPsec tunnel is in active state", + "log_level": "warning", + "exit_on_failure": False, + }, + "jobs": { + "description": "Check for any job with status different than FIN", + "log_level": "warning", + "exit_on_failure": False, + }, + "ntp_sync": { + "description": "Check if NTP is synchronized", + "log_level": "warning", + "exit_on_failure": False, + }, + "planes_clock_sync": { + "description": "Check if the clock is synchronized between dataplane and management plane", + "log_level": "warning", + "exit_on_failure": False, + }, + "panorama": { + "description": "Check connectivity with the Panorama appliance", + "log_level": "warning", + "exit_on_failure": False, + }, + # "session_exist": { + # "description": "Check if a critical session is present in the sessions table", + # "log_level": "error", + # "exit_on_failure": True, + # }, + } - STATE_SNAPSHOTS = [ + REPORTS = [ "arp_table", "content_version", "ip_sec_tunnels", @@ -85,7 +154,7 @@ class AssuranceOptions: "session_stats", ] - REPORTS = [ + STATE_SNAPSHOTS = [ "arp_table", "content_version", "ip_sec_tunnels", @@ -353,6 +422,79 @@ def configure_logging(level: str) -> None: logger.addHandler(file_handler) +# ---------------------------------------------------------------------------- +# Helper function to flip XML objects into Python dictionaries +# ---------------------------------------------------------------------------- +def xml_to_dict(xml_object) -> dict: + """ + Convert an XML object into a Python dictionary. + + This function takes an XML object, typically obtained from parsing XML data, and converts it into a Python dictionary + for easier access and manipulation. The conversion is done using the xmltodict library, which transforms the XML tree + structure into a dictionary format, maintaining elements as keys and their contents as values. This is particularly useful + for processing and interacting with XML data in a more Pythonic way. + + Parameters + ---------- + xml_object : ET.Element + An XML object to convert into a Python dictionary. This is typically an ElementTree Element. + + Returns + ------- + dict + A Python dictionary representation of the XML object. The structure of the dictionary corresponds to the structure + of the XML, with tags as keys and their contents as values. + """ + xml_string = ET.tostring(xml_object) + xml_dict = xmltodict.parse(xml_string) + return xml_dict + + +# ---------------------------------------------------------------------------- +# Helper function to ensure the directories exist for our snapshots +# ---------------------------------------------------------------------------- +def ensure_directory_exists(file_path: str): + """ + Ensure that the directory for the given file path exists. + + Creates the directory (and any necessary parent directories) if it does not already exist. + + Parameters + ---------- + file_path : str + The file path for which to ensure the directory exists. + """ + directory = os.path.dirname(file_path) + if not os.path.exists(directory): + os.makedirs(directory) + + +# ---------------------------------------------------------------------------- +# Helper function to check readiness and log the result +# ---------------------------------------------------------------------------- +def check_readiness_and_log( + result: dict, + test_name: str, + test_info: dict, +): + test_result = result.get( + test_name, {"state": False, "reason": "Test not performed"} + ) + log_message = f'{test_info["description"]} - {test_result["reason"]}' + + if test_result["state"]: + logging.info(f"Passed Readiness Check: {test_info['description']}") + else: + if test_info["log_level"] == "error": + logging.error(f"{log_message}") + if test_info["exit_on_failure"]: + sys.exit(1) + elif test_info["log_level"] == "warning": + logging.info(f"Skipped Readiness Check: {test_info['description']}") + else: + logging.debug(log_message) + + # ---------------------------------------------------------------------------- # Setting up connection to the Firewall appliance # ---------------------------------------------------------------------------- @@ -555,34 +697,6 @@ def software_update_check( return False -# ---------------------------------------------------------------------------- -# Helper function to flip XML objects into Python dictionaries -# ---------------------------------------------------------------------------- -def xml_to_dict(xml_object) -> dict: - """ - Convert an XML object into a Python dictionary. - - This function takes an XML object, typically obtained from parsing XML data, and converts it into a Python dictionary - for easier access and manipulation. The conversion is done using the xmltodict library, which transforms the XML tree - structure into a dictionary format, maintaining elements as keys and their contents as values. This is particularly useful - for processing and interacting with XML data in a more Pythonic way. - - Parameters - ---------- - xml_object : ET.Element - An XML object to convert into a Python dictionary. This is typically an ElementTree Element. - - Returns - ------- - dict - A Python dictionary representation of the XML object. The structure of the dictionary corresponds to the structure - of the XML, with tags as keys and their contents as values. - """ - xml_string = ET.tostring(xml_object) - xml_dict = xmltodict.parse(xml_string) - return xml_dict - - # ---------------------------------------------------------------------------- # Determine if the firewall is standalone, HA, or in a cluster # ---------------------------------------------------------------------------- @@ -727,76 +841,77 @@ def run_assurance( operation_type: str, actions: List[str], config: Dict[str, Union[str, int, float, bool]], -) -> Union[AssuranceReport, None]: +) -> Union[SnapshotReport, ReadinessCheckReport, None]: """ - Execute specified operational tasks on the Firewall and return the results. + Execute operational tasks on the Firewall and return the results or generate reports. - This function handles various operational tasks based on the specified 'operation_type'. - It can perform readiness checks, state snapshots, or generate reports, depending on the - action and configuration provided. The results of the operation are returned as a dictionary. - If an invalid operation type or action is specified, the function logs an error and returns None. + Handles various operational tasks on the Firewall based on 'operation_type', such as + performing readiness checks, capturing state snapshots, or generating reports. The function + operates according to the specified 'actions' and 'config'. If the operation is successful, + it returns the results or an SnapshotReport object. If an invalid operation type or action + is specified, or an error occurs, the function logs an error and returns None. Parameters ---------- firewall : Firewall An instance of the Firewall class representing the firewall to operate on. + hostname : str + Hostname of the firewall. operation_type : str - The type of operation to be executed, e.g., 'readiness_check', 'state_snapshot', 'report'. + Type of operation to be executed, e.g., 'readiness_check', 'state_snapshot', 'report'. actions : List[str] - A list of specific actions to be performed within the operation type. + List of specific actions to be performed within the operation type. config : Dict[str, Union[str, int, float, bool]] Configuration settings for the specified action. Returns ------- - Union[Dict[str, Union[str, int, float, bool]], None] - The results of the operation as a dictionary, or None if an invalid operation type or action is provided. + Union[SnapshotReport, None] + The results of the operation as an SnapshotReport object, or None if an invalid + operation type or action is specified, or if an error occurs. Raises ------ SystemExit - If an exception occurs during the operation execution. + If an invalid action is provided for the specified operation type or if an exception + occurs during the execution of the operation. Notes ----- - - For 'readiness_check', the function verifies the firewall's readiness for certain tasks. - - For 'state_snapshot', it captures the current state of the firewall. - - For 'report', it generates a report based on the specified action. + - 'readiness_check' verifies the firewall's readiness for upgrade tasks. + - 'state_snapshot' captures the current state of the firewall. + - 'report' generates a report based on the specified action. Implementation details for + report generation should be completed as per requirements. """ # setup Firewall client - assurance_firewall = FirewallProxy(firewall) + proxy_firewall = FirewallProxy(firewall) + checks_firewall = CheckFirewall(proxy_firewall) results = None if operation_type == "readiness_check": for action in actions: - if action not in AssuranceOptions.READINESS_CHECKS: + if action not in AssuranceOptions.READINESS_CHECKS.keys(): logging.error(f"Invalid action for readiness check: {action}") - return + sys.exit(1) - logging.info(f"Performing readiness check: {action}") + try: + logging.info("Checking if firewall is ready for upgrade...") + result = checks_firewall.run_readiness_checks(actions) - checks = CheckFirewall(assurance_firewall) - checks_configuration = {action: config} + for ( + test_name, + test_info, + ) in AssuranceOptions.READINESS_CHECKS.items(): + check_readiness_and_log(result, test_name, test_info) - try: - logging.info("Running readiness checks...") - result = checks.run_readiness_check(checks_configuration) - if result["state"]: - logging.info(f"Passed: {action}") - else: - logging.error(f"FAILED: {action} - {result['reason']}") - results = results or {} - results.update({action: result}) - except Exception as e: - logging.error(f"Error running readiness checks: {e}") - return + return ReadinessCheckReport(**result) - logging.info("Completed checks successfully!") + except Exception as e: + logging.error(f"Error running readiness checks: {e}") + return None elif operation_type == "state_snapshot": - snapshot_node = CheckFirewall(assurance_firewall) - # validate each type of action for action in actions: if action not in AssuranceOptions.STATE_SNAPSHOTS: @@ -806,12 +921,12 @@ def run_assurance( # take snapshots try: logging.debug("Running snapshots...") - results = snapshot_node.run_snapshots(snapshots_config=actions) + results = checks_firewall.run_snapshots(snapshots_config=actions) logging.debug(results) if results: - # Pass the results to the AssuranceReport model - return AssuranceReport(hostname=hostname, **results) + # Pass the results to the SnapshotReport model + return SnapshotReport(hostname=hostname, **results) else: return None @@ -835,22 +950,85 @@ def run_assurance( # ---------------------------------------------------------------------------- -# Make sure the directories exist for our snapshots +# Perform the snapshot of the network state # ---------------------------------------------------------------------------- -def ensure_directory_exists(file_path): - """ - Ensure that the directory for the given file path exists. +def perform_snapshot(firewall: Firewall, hostname: str, file_path: str) -> None: + logging.info("Taking a snapshot of network state information...") + + # take snapshots + network_snapshot = run_assurance( + firewall, + hostname, + operation_type="state_snapshot", + actions=[ + "arp_table", + "content_version", + "ip_sec_tunnels", + "license", + "nics", + "routes", + "session_stats", + ], + config={}, + ) - Creates the directory (and any necessary parent directories) if it does not already exist. + # Check if a readiness check was successfully created + if isinstance(network_snapshot, SnapshotReport): + logging.info("Network snapshot created successfully") + network_snapshot_json = network_snapshot.model_dump_json(indent=4) + logging.debug(network_snapshot_json) - Parameters - ---------- - file_path : str - The file path for which to ensure the directory exists. - """ - directory = os.path.dirname(file_path) - if not os.path.exists(directory): - os.makedirs(directory) + ensure_directory_exists(file_path) + + with open(file_path, "w") as file: + file.write(network_snapshot_json) + + logging.info( + f"Network state snapshot collected from {hostname}, saved to {file_path}" + ) + else: + logging.error("Failed to create snapshot") + + +# ---------------------------------------------------------------------------- +# Perform the readiness checks +# ---------------------------------------------------------------------------- +def perform_readiness_checks(firewall: Firewall, hostname: str, file_path: str) -> None: + logging.info("Performing readiness checks of target firewall...") + + readiness_check = run_assurance( + firewall, + hostname, + operation_type="readiness_check", + actions=[ + "candidate_config", + "content_version", + "expired_licenses", + "ha", + "jobs", + "free_disk_space", + "ntp_sync", + "panorama", + "planes_clock_sync", + ], + config={}, + ) + + # Check if a readiness check was successfully created + if isinstance(readiness_check, ReadinessCheckReport): + # Do something with the readiness check report, e.g., log it, save it, etc. + logging.info("Readiness Checks completed") + readiness_check_report_json = readiness_check.model_dump_json(indent=4) + logging.debug(readiness_check_report_json) + + ensure_directory_exists(file_path) + + with open(file_path, "w") as file: + file.write(readiness_check_report_json) + + logging.info(f"Readiness checks completed for {hostname}, saved to {file_path}") + else: + logging.error("Failed to create readiness check") # ---------------------------------------------------------------------------- @@ -860,35 +1038,26 @@ def main() -> None: """ Main function of the script, serving as the entry point. - Handles CLI arguments and configures logging. Establishes a connection to - the Firewall appliance using an API key or username and password credentials. - Performs operations including refreshing system information, checking for - software updates, and downloading a target PAN-OS version, if available. + Handles CLI arguments, configures logging, and establishes a connection to the Firewall appliance. + Performs a series of operations, including refreshing system information, checking for software updates, + downloading the target PAN-OS version, and collecting network state information for upgrade assurance. + It conducts readiness checks and takes a pre-upgrade snapshot of the network state. Logs progress and + status throughout the process. + + The function gracefully exits with an error if conditions for the upgrade are not met or if any critical + issues are encountered during execution. It enters a debug mode upon successful completion of all operations. Operations: - Connects to the Firewall and refreshes system information. - - Checks the deployment status (standalone, HA, cluster). - - Assesses firewall readiness for upgrade and downloads the target PAN-OS version. - - Collects network state information for upgrade assurance. - - The function logs progress and status, and enters a debug mode upon completion. - It exits with an error if conditions for upgrade are not met. - - Returns - ------- - None + - Determines deployment status (standalone, HA, cluster). + - Assesses readiness for upgrade and downloads the target PAN-OS version. + - Collects network state information and performs readiness checks. Raises ------ SystemExit If the firewall is not ready for an upgrade to the target version, or - if there are other critical issues that prevent the continuation of - the script. - - Example - ------- - To execute the script, run: - python upgrade.py --version 10.2.2-h3 + if there are other critical issues preventing the continuation of the script. """ args = parse_arguments() configure_logging(args["log_level"]) @@ -933,47 +1102,24 @@ def main() -> None: f"{args['target_version']} has been downloaded and sync'd to HA peer." ) else: - logging.info(f"{args['target_version']} has been downloaded.") + logging.info(f"PAN-OS version {args['target_version']} has been downloaded.") - # Begin collecting network state information with panos-upgrade-assurance - logging.info("Taking a pre-upgrade snapshot of network state information...") + # Begin snapshots of the network state if image_downloaded: - # Use the modified run_assurance function - assurance_report = run_assurance( + # Execute the pre-upgrade snapshot + logging.info("Taking a pre-upgrade snapshot of network state information...") + perform_snapshot( firewall, firewall_details.hostname, - operation_type="state_snapshot", - actions=[ - "arp_table", - "content_version", - "ip_sec_tunnels", - "license", - "nics", - "routes", - "session_stats", - ], - config={}, + f'assurance/snapshots/{firewall_details.hostname}/pre/{time.strftime("%Y-%m-%d_%H-%M-%S")}.json', ) - # Check if an assurance report was successfully created - if assurance_report: - # Do something with the assurance report, e.g., log it, save it, etc. - logging.info("Assurance Report created successfully") - assurance_report_json = assurance_report.model_dump_json(indent=4) - logging.debug(assurance_report_json) - - # Ensure directory exists before writing the file - file_path = f'snapshots/{firewall_details.hostname}/pre/{time.strftime("%Y-%m-%d_%H-%M-%S")}.json' - ensure_directory_exists(file_path) - - with open(file_path, "w") as file: - file.write(assurance_report_json) - - else: - logging.error("Failed to create Assurance Report") - - logging.info( - f"Pre-Upgrade network state snapshot collected from {firewall_details.hostname}, saved to {file_path}" + # Execute Readiness Checks + logging.info("Checking device to see if its ready for an upgrade...") + perform_readiness_checks( + firewall, + firewall_details.hostname, + f'assurance/readiness_checks/{firewall_details.hostname}/pre/{time.strftime("%Y-%m-%d_%H-%M-%S")}.json', )