Skip to content

Commit

Permalink
adding Pydantic modeling to the snapshot report
Browse files Browse the repository at this point in the history
  • Loading branch information
cdot65 committed Jan 15, 2024
1 parent 9578c62 commit ac79d28
Show file tree
Hide file tree
Showing 11 changed files with 228 additions and 8 deletions.
25 changes: 25 additions & 0 deletions models/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# models/__init__.py

# trunk-ignore(ruff/F401)
from .arp_table import ArpTableEntry

# trunk-ignore(ruff/F401)
from .content_version import ContentVersion

# trunk-ignore(ruff/F401)
from .assurance_report import AssuranceReport

# trunk-ignore(ruff/F401)
from .ip_sec_tunnel import IPSecTunnelEntry

# trunk-ignore(ruff/F401)
from .license import LicenseFeatureEntry

# trunk-ignore(ruff/F401)
from .nics import NetworkInterfaceStatus

# trunk-ignore(ruff/F401)
from .routes import RouteEntry

# trunk-ignore(ruff/F401)
from .session_stats import SessionStats
11 changes: 11 additions & 0 deletions models/arp_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# models/arp_table.py
from pydantic import BaseModel


class ArpTableEntry(BaseModel):
interface: str
ip: str
mac: str
port: str
status: str
ttl: int
22 changes: 22 additions & 0 deletions models/assurance_report.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# models/assurance_report.py

from typing import Dict, Optional
from pydantic import BaseModel
from .arp_table import ArpTableEntry
from .content_version import ContentVersion
from .ip_sec_tunnel import IPSecTunnelEntry
from .license import LicenseFeatureEntry
from .nics import NetworkInterfaceStatus
from .routes import RouteEntry
from .session_stats import SessionStats


class AssuranceReport(BaseModel):
hostname: str
arp_table: Optional[Dict[str, ArpTableEntry]] = None
content_version: Optional[ContentVersion] = None
ip_sec_tunnels: Optional[Dict[str, IPSecTunnelEntry]] = None
license: Optional[Dict[str, LicenseFeatureEntry]] = None
nics: Optional[Dict[str, NetworkInterfaceStatus]] = None
routes: Optional[Dict[str, RouteEntry]] = None
session_stats: Optional[SessionStats] = None
7 changes: 7 additions & 0 deletions models/content_version.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# models/content_version.py

from pydantic import BaseModel


class ContentVersion(BaseModel):
version: str
16 changes: 16 additions & 0 deletions models/ip_sec_tunnel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# models/ip_sec_tunnel.py

from pydantic import BaseModel, Field


class IPSecTunnelEntry(BaseModel):
peerip: str
name: str
outer_if: str = Field(..., alias="outer-if")
gwid: str
localip: str
state: str
inner_if: str = Field(..., alias="inner-if")
mon: str
owner: str
id: str
17 changes: 17 additions & 0 deletions models/license.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# models/license.py

from pydantic import BaseModel, Field
from typing import Optional
from collections import OrderedDict


class LicenseFeatureEntry(BaseModel):
feature: str
description: str
serial: str
issued: str
expires: str
expired: str
base_license_name: Optional[str] = Field(None, alias="base-license-name")
authcode: Optional[str]
custom: Optional[OrderedDict] = None
19 changes: 19 additions & 0 deletions models/nics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# models/nics.py

from pydantic import BaseModel, validator, root_validator


class NetworkInterfaceStatus(BaseModel):
status: str

@root_validator(pre=True)
def parse_root(cls, values):
if isinstance(values, str):
return {"status": values}
raise ValueError("Invalid input for network interface status")

@validator("status")
def check_status(cls, v):
if v not in ("up", "down"):
raise ValueError('Status must be "up" or "down"')
return v
15 changes: 15 additions & 0 deletions models/routes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# models/route.py

from pydantic import BaseModel, Field
from typing import Optional


class RouteEntry(BaseModel):
virtual_router: str = Field(..., alias="virtual-router")
destination: str
nexthop: str
metric: str
flags: str
age: Optional[str]
interface: Optional[str]
route_table: str = Field(..., alias="route-table")
57 changes: 57 additions & 0 deletions models/session_stats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# models/session_stats.py

from pydantic import BaseModel, Field
from typing import Optional


class SessionStats(BaseModel):
age_accel_thresh: Optional[str] = Field(..., alias="age-accel-thresh")
age_accel_tsf: Optional[str] = Field(..., alias="age-accel-tsf")
age_scan_ssf: Optional[str] = Field(..., alias="age-scan-ssf")
age_scan_thresh: Optional[str] = Field(..., alias="age-scan-thresh")
age_scan_tmo: Optional[str] = Field(..., alias="age-scan-tmo")
cps: Optional[str]
dis_def: Optional[str] = Field(..., alias="dis-def")
dis_sctp: Optional[str] = Field(..., alias="dis-sctp")
dis_tcp: Optional[str] = Field(..., alias="dis-tcp")
dis_udp: Optional[str] = Field(..., alias="dis-udp")
icmp_unreachable_rate: Optional[str] = Field(..., alias="icmp-unreachable-rate")
kbps: Optional[str]
max_pending_mcast: Optional[str] = Field(..., alias="max-pending-mcast")
num_active: Optional[str] = Field(..., alias="num-active")
num_bcast: Optional[str] = Field(..., alias="num-bcast")
num_gtpc: Optional[str] = Field(..., alias="num-gtpc")
num_gtpu_active: Optional[str] = Field(..., alias="num-gtpu-active")
num_gtpu_pending: Optional[str] = Field(..., alias="num-gtpu-pending")
num_http2_5gc: Optional[str] = Field(..., alias="num-http2-5gc")
num_icmp: Optional[str] = Field(..., alias="num-icmp")
num_imsi: Optional[str] = Field(..., alias="num-imsi")
num_installed: Optional[str] = Field(..., alias="num-installed")
num_max: Optional[str] = Field(..., alias="num-max")
num_mcast: Optional[str] = Field(..., alias="num-mcast")
num_pfcpc: Optional[str] = Field(..., alias="num-pfcpc")
num_predict: Optional[str] = Field(..., alias="num-predict")
num_sctp_assoc: Optional[str] = Field(..., alias="num-sctp-assoc")
num_sctp_sess: Optional[str] = Field(..., alias="num-sctp-sess")
num_tcp: Optional[str] = Field(..., alias="num-tcp")
num_udp: Optional[str] = Field(..., alias="num-udp")
pps: Optional[str]
tcp_cong_ctrl: Optional[str] = Field(..., alias="tcp-cong-ctrl")
tcp_reject_siw_thresh: Optional[str] = Field(..., alias="tcp-reject-siw-thresh")
tmo_5gcdelete: Optional[str] = Field(..., alias="tmo-5gcdelete")
tmo_cp: Optional[str] = Field(..., alias="tmo-cp")
tmo_def: Optional[str] = Field(..., alias="tmo-def")
tmo_icmp: Optional[str] = Field(..., alias="tmo-icmp")
tmo_sctp: Optional[str] = Field(..., alias="tmo-sctp")
tmo_sctpcookie: Optional[str] = Field(..., alias="tmo-sctpcookie")
tmo_sctpinit: Optional[str] = Field(..., alias="tmo-sctpinit")
tmo_sctpshutdown: Optional[str] = Field(..., alias="tmo-sctpshutdown")
tmo_tcp: Optional[str] = Field(..., alias="tmo-tcp")
tmo_tcp_delayed_ack: Optional[str] = Field(..., alias="tmo-tcp-delayed-ack")
tmo_tcp_unverif_rst: Optional[str] = Field(..., alias="tmo-tcp-unverif-rst")
tmo_tcphalfclosed: Optional[str] = Field(..., alias="tmo-tcphalfclosed")
tmo_tcphandshake: Optional[str] = Field(..., alias="tmo-tcphandshake")
tmo_tcpinit: Optional[str] = Field(..., alias="tmo-tcpinit")
tmo_tcptimewait: Optional[str] = Field(..., alias="tmo-tcptimewait")
tmo_udp: Optional[str] = Field(..., alias="tmo-udp")
vardata_rate: Optional[str] = Field(..., alias="vardata-rate")
47 changes: 39 additions & 8 deletions upgrade.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@
# Palo Alto Networks panos-upgrade-assurance imports
from panos_upgrade_assurance.check_firewall import CheckFirewall
from panos_upgrade_assurance.firewall_proxy import FirewallProxy
from assurance import AssuranceOptions

# third party imports
import defusedxml.ElementTree as ET
import xmltodict
from pydantic import BaseModel

# project imports
from models import AssuranceReport
from assurance import AssuranceOptions


# ----------------------------------------------------------------------------
# Define logging levels
Expand Down Expand Up @@ -677,7 +680,7 @@ def run_assurance(
operation_type: str,
action: str,
config: Dict[str, Union[str, int, float, bool]],
) -> Union[Dict[str, Union[str, int, float, bool]], None]:
) -> Union[AssuranceReport, None]:
"""
Execute specified operational tasks on the Firewall and return the results.
Expand Down Expand Up @@ -765,7 +768,13 @@ def run_assurance(
try:
logging.info("Running snapshots...")
results = snapshot_node.run_snapshots(snapshots_config=actions)
logging.info(results)
logging.debug(results)

if results:
# Pass the results to the AssuranceReport model
return AssuranceReport(hostname=firewall.hostname, **results)
else:
return None

except Exception as e:
logging.error("Error running readiness checks: %s", e)
Expand Down Expand Up @@ -857,7 +866,7 @@ def main() -> None:

# Download the target PAN-OS version
logging.info(f"Checking if {args['target_version']} is downloaded...")
software_download(firewall, args["target_version"], ha_details)
image_downloaded = software_download(firewall, args["target_version"], ha_details)
if deploy_info == "active" or deploy_info == "passive":
logging.info(
f"{args['target_version']} has been downloaded and sync'd to HA peer."
Expand All @@ -867,10 +876,32 @@ def main() -> None:

# Begin collecting network state information with panos-upgrade-assurance
logging.info("Collecting network state information...")
firewall_assurance = create_panos_assurance_connection(firewall)
logging.info(
f"Network state information collected from {firewall_assurance.serial}"
)
if image_downloaded:
# Use the modified run_assurance function
assurance_report = run_assurance(
firewall,
operation_type="state_snapshot",
action="arp_table,content_version,ip_sec_tunnels,license,nics,routes,session_stats",
config={},
)

# Check if an assurance report was successfully created
if assurance_report:
# Do something with the assurance report, e.g., log it, save it, etc.
logging.info("Assurance Report created successfully")
assurance_report_json = assurance_report.model_dump_json(indent=4)
logging.debug(assurance_report_json)

file_path = f"logs/{firewall.serial}-assurance.json"
with open(file_path, "w") as file:
file.write(assurance_report_json)

else:
logging.error("Failed to create Assurance Report")

logging.info(f"Network state information collected from {firewall.serial}")

logging.info(f"Network state information collected from {firewall.serial}")


if __name__ == "__main__":
Expand Down
Empty file added utils/__init__.py
Empty file.

0 comments on commit ac79d28

Please sign in to comment.