Skip to content

Commit

Permalink
Merge pull request #1 from cdot65:dev
Browse files Browse the repository at this point in the history
Refactor AssuranceOptions class and move it to upgrade.py
  • Loading branch information
cdot65 authored Jan 15, 2024
2 parents 4022863 + 40f7d3f commit 910776d
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 94 deletions.
14 changes: 14 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"cSpell.words": [
"apikey",
"defusedxml",
"highavailability",
"levelname",
"nics",
"pydantic",
"Pydantic",
"refreshall",
"Xapi",
"xmltodict"
]
}
43 changes: 0 additions & 43 deletions assurance.py

This file was deleted.

163 changes: 112 additions & 51 deletions upgrade.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import sys
import time
from logging.handlers import RotatingFileHandler
from typing import Dict, Tuple, Union
from typing import Dict, List, Tuple, Union

# Palo Alto Networks PAN-OS imports
import panos
Expand All @@ -25,7 +25,6 @@

# project imports
from models import AssuranceReport
from assurance import AssuranceOptions


# ----------------------------------------------------------------------------
Expand All @@ -40,6 +39,63 @@
}


# ----------------------------------------------------------------------------
# Define panos-upgrade-assurance options
# ----------------------------------------------------------------------------
class AssuranceOptions:
"""
AssuranceOptions provides configuration options for the panos-upgrade-assurance process.
This class encapsulates the configurations used in the upgrade assurance process for PAN-OS appliances. It includes definitions for various readiness checks, state snapshots, and reports that are essential in the upgrade process of PAN-OS appliances.
Attributes
----------
READINESS_CHECKS : list of str
A list of readiness checks to be performed on the PAN-OS appliance. These checks include various system and network parameters like active support status, arp entry existence, candidate configuration, etc.
STATE_SNAPSHOTS : list of str
A list of state snapshot types to be taken from the PAN-OS appliance. These snapshots capture essential data regarding the appliance's current state, such as arp table, content version, IP sec tunnels, etc.
REPORTS : list of str
A list of report types that can be generated from the PAN-OS appliance. These reports include detailed information about various aspects of the appliance like arp table, content version, IP sec tunnels, license details, etc.
"""

READINESS_CHECKS = [
"active_support",
"arp_entry_exist",
"candidate_config",
"content_version",
"free_disk_space",
"expired_licenses",
"ha",
"ip_sec_tunnel_status",
"ntp_sync",
"panorama",
"planes_clock_sync",
"session_exist",
]

STATE_SNAPSHOTS = [
"arp_table",
"content_version",
"ip_sec_tunnels",
"license",
"nics",
"routes",
"session_stats",
]

REPORTS = [
"arp_table",
"content_version",
"ip_sec_tunnels",
"license",
"nics",
"routes",
"session_stats",
]


# ----------------------------------------------------------------------------
# Define models
# ----------------------------------------------------------------------------
Expand Down Expand Up @@ -417,7 +473,9 @@ def parse_version(version: str) -> Tuple[int, int, int, int]:

upgrade_needed = current_version < target_version
if upgrade_needed:
logging.info("Upgrade is required.")
logging.info(
f"Confirmed that moving from {firewall.version} to {target_major}.{target_minor}.{target_maintenance} is an upgrade"
)
return

else:
Expand Down Expand Up @@ -665,7 +723,7 @@ def run_assurance(
firewall: Firewall,
hostname: str,
operation_type: str,
action: str,
actions: List[str],
config: Dict[str, Union[str, int, float, bool]],
) -> Union[AssuranceReport, None]:
"""
Expand All @@ -682,8 +740,8 @@ def run_assurance(
An instance of the Firewall class representing the firewall to operate on.
operation_type : str
The type of operation to be executed, e.g., 'readiness_check', 'state_snapshot', 'report'.
action : str
The specific action to be performed within the operation type.
actions : List[str]
A list of specific actions to be performed within the operation type.
config : Dict[str, Union[str, int, float, bool]]
Configuration settings for the specified action.
Expand All @@ -707,48 +765,40 @@ def run_assurance(
assurance_firewall = FirewallProxy(firewall)

results = None
passed = True

if operation_type == "readiness_check":
if action not in AssuranceOptions.READINESS_CHECKS:
logging.error(f"Invalid action for readiness check: {action}")
return
logging.info(f"Performing readiness check: {action}")

checks = CheckFirewall(assurance_firewall)
for action in actions:
if action not in AssuranceOptions.READINESS_CHECKS:
logging.error(f"Invalid action for readiness check: {action}")
return

# check if arp entry exists
checks_configuration = [{action: config}]
logging.info(f"Performing readiness check: {action}")

# run checks
try:
logging.info("Running readiness checks...")
results = checks.run_readiness_checks(checks_configuration)
logging.debug(results)
for check in checks_configuration:
check_name = list(check.keys())[0]
passed = passed & results[check_name]["state"]
checks = CheckFirewall(assurance_firewall)
checks_configuration = {action: config}

if not results[check_name]["state"]:
logging.info(
"FAILED: %s - %s", check_name, results[check_name]["reason"]
)

except Exception as e:
logging.error("Error running readiness checks: %s", e)
return
try:
logging.info("Running readiness checks...")
result = checks.run_readiness_check(checks_configuration)
if result["state"]:
logging.info(f"Passed: {action}")
else:
logging.error(f"FAILED: {action} - {result['reason']}")
results = results or {}
results.update({action: result})
except Exception as e:
logging.error(f"Error running readiness checks: {e}")
return

logging.info("Completed checks successfully!")

elif operation_type == "state_snapshot":
actions = action.split(",")

snapshot_node = CheckFirewall(assurance_firewall)

# validate each type of action
for each in actions:
if each not in AssuranceOptions.STATE_SNAPSHOTS:
logging.error(f"Invalid action for state snapshot: {each}")
for action in actions:
if action not in AssuranceOptions.STATE_SNAPSHOTS:
logging.error(f"Invalid action for state snapshot: {action}")
return

# take snapshots
Expand All @@ -764,15 +814,16 @@ def run_assurance(
return None

except Exception as e:
logging.error("Error running readiness checks: %s", e)
logging.error("Error running snapshots: %s", e)
return

elif operation_type == "report":
if action not in AssuranceOptions.REPORTS:
logging.error(f"Invalid action for report: {action}")
return
logging.info(f"Generating report: {action}")
# result = getattr(Report(firewall), action)(**config)
for action in actions:
if action not in AssuranceOptions.REPORTS:
logging.error(f"Invalid action for report: {action}")
return
logging.info(f"Generating report: {action}")
# result = getattr(Report(firewall), action)(**config)

else:
logging.error(f"Invalid operation type: {operation_type}")
Expand Down Expand Up @@ -847,8 +898,10 @@ def main() -> None:

# Refresh system information to ensure we have the latest data
logging.info("Refreshing system information...")
settings = SystemSettings.refreshall(firewall)[0]
logging.info(f"{firewall.serial} {settings.hostname} {settings.ip_address}")
firewall_details = SystemSettings.refreshall(firewall)[0]
logging.info(
f"{firewall.serial} {firewall_details.hostname} {firewall_details.ip_address}"
)

# Determine if the firewall is standalone, HA, or in a cluster
logging.info("Checking if firewall is standalone, HA, or in a cluster...")
Expand Down Expand Up @@ -881,14 +934,22 @@ def main() -> None:
logging.info(f"{args['target_version']} has been downloaded.")

# Begin collecting network state information with panos-upgrade-assurance
logging.info("Collecting network state information...")
logging.info("Taking a pre-upgrade snapshot of network state information...")
if image_downloaded:
# Use the modified run_assurance function
assurance_report = run_assurance(
firewall,
settings.hostname,
firewall_details.hostname,
operation_type="state_snapshot",
action="arp_table,content_version,ip_sec_tunnels,license,nics,routes,session_stats",
actions=[
"arp_table",
"content_version",
"ip_sec_tunnels",
"license",
"nics",
"routes",
"session_stats",
],
config={},
)

Expand All @@ -900,7 +961,7 @@ def main() -> None:
logging.debug(assurance_report_json)

# Ensure directory exists before writing the file
file_path = f'snapshots/{settings.hostname}/pre/{time.strftime("%Y-%m-%d_%H-%M-%S")}.json'
file_path = f'snapshots/{firewall_details.hostname}/pre/{time.strftime("%Y-%m-%d_%H-%M-%S")}.json'
ensure_directory_exists(file_path)

with open(file_path, "w") as file:
Expand All @@ -909,9 +970,9 @@ def main() -> None:
else:
logging.error("Failed to create Assurance Report")

logging.info(f"Network state information collected from {firewall.serial}")

logging.info(f"Network state information collected from {firewall.serial}")
logging.info(
f"Pre-Upgrade network state snapshot collected from {firewall_details.hostname}, saved to {file_path}"
)


if __name__ == "__main__":
Expand Down

0 comments on commit 910776d

Please sign in to comment.