Skip to content

Commit

Permalink
Merge pull request #41 from cdot65/10-implement-post-upgrade-snapshot…
Browse files Browse the repository at this point in the history
…s-and-readiness-checks-with-differential-analysis

Improve error handling with network transport problems
  • Loading branch information
cdot65 authored Jan 21, 2024
2 parents 4d207ea + 5d32860 commit 35b6e2f
Show file tree
Hide file tree
Showing 3 changed files with 172 additions and 61 deletions.
127 changes: 79 additions & 48 deletions pan_os_upgrade/upgrade.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,9 @@
from panos_upgrade_assurance.firewall_proxy import FirewallProxy

# third party imports
import xmltodict
import dns.resolver
import typer
import xmltodict

# project imports
from pan_os_upgrade.models import SnapshotReport, ReadinessCheckReport
Expand Down Expand Up @@ -347,51 +348,81 @@ def get_emoji(action: str) -> str:


# ----------------------------------------------------------------------------
# Helper function to validate IP addresses
# Helper function to validate either the DNS hostname or IP address
# ----------------------------------------------------------------------------
def ip_callback(ip: str) -> str:
def resolve_hostname(hostname: str) -> bool:
"""
Validates the input as a valid IP address.
Checks if a given hostname can be resolved via DNS query.
This function utilizes the ip_address function from the ipaddress standard library module to
validate the provided input. It is designed to be used as a callback function for Typer command-line
argument parsing, ensuring that only valid IP addresses are accepted as input for arguments where
this is a requirement.
This function attempts to resolve the specified hostname using DNS. It queries the DNS servers
that the operating system is configured to use. The function is designed to return a boolean
value indicating whether the hostname could be successfully resolved or not.
Parameters
----------
ip : str
A string representing the IP address to be validated.
hostname : str
The hostname (e.g., 'example.com') to be resolved.
Returns
-------
str
The validated IP address string.
bool
Returns True if the hostname can be resolved, False otherwise.
Raises
------
typer.BadParameter
If the input string is not a valid IP address, a typer.BadParameter exception is raised with
an appropriate error message.
None
This function does not raise any exceptions. It handles all exceptions internally and
returns False in case of any issues during the resolution process.
"""
try:
dns.resolver.resolve(hostname)
return True
except (dns.resolver.NoAnswer, dns.resolver.NXDOMAIN, dns.exception.Timeout) as err:
# Optionally log or handle err here if needed
logging.debug(f"Hostname resolution failed: {err}")
return False

Example

def ip_callback(value: str) -> str:
"""
Validates the input as a valid IP address or a resolvable hostname.
This function first attempts to resolve the hostname via DNS query. If it fails,
it utilizes the ip_address function from the ipaddress standard library module to
validate the provided input as an IP address. It is designed to be used as a callback
function for Typer command-line argument parsing, ensuring that only valid IP addresses
or resolvable hostnames are accepted as input.
Parameters
----------
value : str
A string representing the IP address or hostname to be validated.
Returns
-------
@app.command()
def main(ip_address: str = typer.Argument(..., callback=ip_callback)):
# ip_address will be a validated IP address here.
str
The validated IP address string or hostname.
Notes
-----
- The function assumes IPv4/IPv6 address validation is needed based on the context in which it's used.
- It's specifically tailored for use with Typer, enhancing command-line argument parsing.
Raises
------
typer.BadParameter
If the input string is not a valid IP address or a resolvable hostname, a typer.BadParameter
exception is raised with an appropriate error message.
"""

# First, try to resolve as a hostname
if resolve_hostname(value):
return value

# If hostname resolution fails, try as an IP address
try:
ipaddress.ip_address(ip)
return ip
ipaddress.ip_address(value)
return value

except ValueError as err:
raise typer.BadParameter("Please enter a valid IP address.") from err
raise typer.BadParameter(
"The value you passed for --hostname is neither a valid DNS hostname nor IP address, please check your inputs again."
) from err


# ----------------------------------------------------------------------------
Expand Down Expand Up @@ -538,7 +569,7 @@ def check_readiness_and_log(
# Setting up connection to the Firewall appliance
# ----------------------------------------------------------------------------
def connect_to_firewall(
ip_address: str,
hostname: str,
api_username: str,
api_password: str,
) -> Firewall:
Expand All @@ -552,7 +583,7 @@ def connect_to_firewall(
Parameters
----------
- 'ip_address': The IP address of the Firewall appliance.
- 'hostname': The DNS hostname or IP address of the firewall appliance.
- 'api_username': Username for authentication.
- 'api_password': Password for authentication.
Expand All @@ -575,7 +606,7 @@ def connect_to_firewall(
"""
try:
target_device = PanDevice.create_from_device(
ip_address,
hostname,
api_username,
api_password,
)
Expand All @@ -591,7 +622,7 @@ def connect_to_firewall(

except PanConnectionTimeout:
logging.error(
f"{get_emoji('error')} Connection to the firewall timed out. Please check the IP address and network connectivity."
f"{get_emoji('error')} Connection to the firewall timed out. Please check the DNS hostname or IP address and network connectivity."
)

sys.exit(1)
Expand Down Expand Up @@ -735,7 +766,7 @@ def software_update_check(
Example
--------
Checking if a specific PAN-OS version is available for download:
>>> firewall = Firewall(ip_address='192.168.1.1', api_username='admin', api_password='password')
>>> firewall = Firewall(hostname='192.168.1.1', api_username='admin', api_password='password')
>>> software_update_check(firewall, '10.1.0', ha_details={})
True or False depending on the availability of the version
"""
Expand Down Expand Up @@ -801,7 +832,7 @@ def get_ha_status(firewall: Firewall) -> Tuple[str, Optional[dict]]:
Example
-------
Retrieving HA status of a Firewall:
>>> firewall = Firewall(ip_address='192.168.1.1', api_username='admin', api_password='password')
>>> firewall = Firewall(hostname='192.168.1.1', api_username='admin', api_password='password')
>>> ha_status, ha_details = get_ha_status(firewall)
>>> print(ha_status)
'active/passive'
Expand Down Expand Up @@ -867,7 +898,7 @@ def software_download(
Example
--------
Initiating a PAN-OS version download:
>>> firewall = Firewall(ip_address='192.168.1.1', api_username='admin', api_password='password')
>>> firewall = Firewall(hostname='192.168.1.1', api_username='admin', api_password='password')
>>> software_download(firewall, '10.1.0', ha_details={})
True or False depending on the success of the download
Expand Down Expand Up @@ -964,8 +995,8 @@ def run_assurance(
----------
firewall : Firewall
The firewall instance on which to perform the operations.
ip_address : str
The ip_address of the firewall.
hostname : str
The ip address or dns hostname of the firewall.
operation_type : str
The type of operation to perform (e.g., 'readiness_check', 'state_snapshot', 'report').
actions : List[str]
Expand All @@ -986,7 +1017,7 @@ def run_assurance(
Example
--------
Performing a state snapshot operation:
>>> firewall = Firewall(ip_address='192.168.1.1', 'admin', 'password')
>>> firewall = Firewall(hostname='192.168.1.1', 'admin', 'password')
>>> run_assurance(firewall, 'firewall1', 'state_snapshot', ['arp_table', 'ip_sec_tunnels'], {})
SnapshotReport object or None
Expand Down Expand Up @@ -1106,7 +1137,7 @@ def perform_snapshot(
Example
--------
Creating a network state snapshot:
>>> firewall = Firewall(ip_address='192.168.1.1', 'admin', 'password')
>>> firewall = Firewall(hostname='192.168.1.1', 'admin', 'password')
>>> perform_snapshot(firewall, 'firewall1', '/path/to/snapshot.json')
# Snapshot file is saved to the specified path.
"""
Expand Down Expand Up @@ -1185,7 +1216,7 @@ def perform_readiness_checks(
Example
--------
Conducting readiness checks:
>>> firewall = Firewall(ip_address='192.168.1.1', 'username', 'password')
>>> firewall = Firewall(hostname='192.168.1.1', 'username', 'password')
>>> perform_readiness_checks(firewall, 'firewall1', '/path/to/readiness_report.json')
# Readiness report is saved to the specified path.
"""
Expand Down Expand Up @@ -1271,7 +1302,7 @@ def backup_configuration(
Example
--------
Backing up the firewall configuration:
>>> firewall = Firewall(ip_address='192.168.1.1', 'admin', 'password')
>>> firewall = Firewall(hostname='192.168.1.1', 'admin', 'password')
>>> backup_configuration(firewall, '/path/to/config_backup.xml')
# Configuration is backed up to the specified file.
"""
Expand Down Expand Up @@ -1368,7 +1399,7 @@ def perform_upgrade(
Example
-------
Upgrading a firewall to a specific PAN-OS version:
>>> firewall = Firewall(ip_address='192.168.1.1', api_username='admin', api_password='password')
>>> firewall = Firewall(hostname='192.168.1.1', api_username='admin', api_password='password')
>>> perform_upgrade(firewall, '192.168.1.1', '10.2.0', max_retries=2, retry_interval=30)
# The firewall is upgraded to PAN-OS version 10.2.0, with retries if necessary.
"""
Expand Down Expand Up @@ -1452,7 +1483,7 @@ def perform_reboot(firewall: Firewall, ha_details: Optional[dict] = None) -> Non
Example
-------
Rebooting a firewall and ensuring its operational status:
>>> firewall = Firewall(ip_address='192.168.1.1', api_username='admin', api_password='password')
>>> firewall = Firewall(hostname='192.168.1.1', api_username='admin', api_password='password')
>>> perform_reboot(firewall)
# The firewall undergoes a reboot and the script monitors until it's back online.
"""
Expand Down Expand Up @@ -1512,13 +1543,13 @@ def perform_reboot(firewall: Firewall, ha_details: Optional[dict] = None) -> Non
# ----------------------------------------------------------------------------
@app.command()
def main(
ip_address: Annotated[
hostname: Annotated[
str,
typer.Option(
"--ip-address",
"-i",
help="IP address of target firewall",
prompt="IP address",
"--hostname",
"-h",
help="Hostname or IP address of target firewall",
prompt="Hostname or IP address",
callback=ip_callback,
),
],
Expand Down Expand Up @@ -1589,7 +1620,7 @@ def main(
Example Usage:
```bash
python upgrade.py --ip-address 192.168.1.1 --username admin --password secret --version 10.2.7
python upgrade.py --hostname 192.168.1.1 --username admin --password secret --version 10.2.7
```
This command will start the upgrade process for the firewall at '192.168.1.1' to version '10.2.7'.
"""
Expand All @@ -1612,7 +1643,7 @@ def main(
# Create our connection to the firewall
logging.debug(f"{get_emoji('start')} Connecting to PAN-OS firewall...")
firewall = connect_to_firewall(
ip_address=ip_address,
hostname=hostname,
api_username=username,
api_password=password,
)
Expand Down
Loading

0 comments on commit 35b6e2f

Please sign in to comment.