Skip to content

Commit

Permalink
Pytest cleanup (#97)
Browse files Browse the repository at this point in the history
* Update pytest for eventmgr_db_defect_check

* Update pytest for encap_already_in_use_check

* Update pytest for apic_ca_cert_validation

* Update pytest for vpc_paired_switches_check

* Update pytest for llfc_susceptibility_check

* Update pytest for telemetryStatsServerP_object_check

* Update pytest for isis_redis_metric_mpod_msite_check

* Update pytest for switch_bootflash_usage_check

* Update pytest for contract_22_defect_check

* Update pytest for internal_vlanpool_check

* Update pytest for bgp_golf_route_target_type_check

* Remove unnecessary path setting in test_IPAddress

* Update pytest for Aciversion

* Update pytest for get_vpc_node()
  • Loading branch information
takishida authored Apr 20, 2024
1 parent beabd51 commit d52fa97
Show file tree
Hide file tree
Showing 57 changed files with 1,833 additions and 1,612 deletions.
91 changes: 21 additions & 70 deletions aci-preupgrade-validation-script.py
Original file line number Diff line number Diff line change
Expand Up @@ -992,10 +992,7 @@ def switch_bootflash_usage_check(index, total_checks, **kwargs):
data = []
print_title(title, index, total_checks)

response_json = kwargs.get("eqptcapacityFSPartition.json")

if not response_json:
response_json = icurl('class',
response_json = icurl('class',
'eqptcapacityFSPartition.json?query-target-filter=eq(eqptcapacityFSPartition.path,"/bootflash")')
if not response_json:
result = ERROR
Expand Down Expand Up @@ -1179,18 +1176,13 @@ def encap_already_in_use_check(index, total_checks, **kwargs):
recommended_action = 'Resolve the overlapping encap configuration prior to upgrade'
print_title(title, index, total_checks)

faultInsts = kwargs.get("faultInst")
fvIfConns = kwargs.get("fvIfConn")

desc_regex = r'Encap is already in use by (?P<inUseEpgStr>.+);'
nwissues_dn_regex = node_regex + r'/.*epp/fv-\[(?P<faultedEpgDn>.*)\]/node.*'

if "pytest" not in sys.modules:
faultInsts = icurl('class',
faultInsts = icurl('class',
'faultInst.json?&query-target-filter=wcard(faultInst.descr,"encap-already-in-use")')
if faultInsts:
if "pytest" not in sys.modules:
fvIfConns = icurl('class', 'fvIfConn.json')
fvIfConns = icurl('class', 'fvIfConn.json')
for faultInst in faultInsts:
desc_array = re.search(desc_regex, faultInst['faultInst']['attributes']['descr'])

Expand Down Expand Up @@ -1218,11 +1210,10 @@ def encap_already_in_use_check(index, total_checks, **kwargs):
overlapping_encaps = [x for x in in_use_epg_encaps if x in faulted_epg_encaps]
data.append([faulted_epg_dn, in_use_epg_dn, nodeId, ','.join(overlapping_encaps)])
else:
unformatted_data.append(
[faultInst['faultInst']['attributes']['descr']])
unformatted_data.append([faultInst['faultInst']['attributes']['descr']])
if not data and not unformatted_data:
result = PASS
print_result(title, result, msg, headers, data,
print_result(title, result, msg, headers, data,
unformatted_headers, unformatted_data, recommended_action=recommended_action)
return result

Expand Down Expand Up @@ -2048,7 +2039,7 @@ def vmm_controller_adj_check(index, total_checks, **kwargs):
return result


def vpc_paired_switches_check(index, total_checks, vpc_node_ids=[], **kwargs):
def vpc_paired_switches_check(index, total_checks, vpc_node_ids=None, **kwargs):
title = 'VPC-paired Leaf switches'
result = FAIL_O
msg = ''
Expand All @@ -2058,15 +2049,11 @@ def vpc_paired_switches_check(index, total_checks, vpc_node_ids=[], **kwargs):
doc_url = 'https://datacenter.github.io/ACI-Pre-Upgrade-Validation-Script/validations/#vpc-paired-leaf-switches'
print_title(title, index, total_checks)

if not vpc_node_ids:
vpc_node_ids = kwargs.get("vpc_node_ids", [])

if not vpc_node_ids:
msg = 'No VPC definitions found!'
vpc_node_ids = []

top_system = kwargs.get("topSystem.json")
if not top_system:
top_system = icurl('class', 'topSystem.json')
top_system = icurl('class', 'topSystem.json')

for node in top_system:
node_id = node['topSystem']['attributes']['id']
Expand Down Expand Up @@ -2163,17 +2150,13 @@ def isis_redis_metric_mpod_msite_check(index, total_checks, **kwargs):
title = 'ISIS Redistribution metric for MPod/MSite'
result = FAIL_O
msg = ''
headers = ["ISIS Redistribution Metric", "MPod Deployment", "MSite Deployment","Recommendation" ]
headers = ["ISIS Redistribution Metric", "MPod Deployment", "MSite Deployment", "Recommendation"]
data = []
recommended_action = None
doc_url = '"ISIS Redistribution Metric" from ACI Best Practices Quick Summary - http://cs.co/9001zNNr7'
print_title(title, index, total_checks)

isis_mo = kwargs.get("uni/fabric/isisDomP-default.json", None)
mpod_msite_mo = kwargs.get("fvFabricExtConnP.json?query-target=children", None)

if not isis_mo:
isis_mo = icurl('mo', 'uni/fabric/isisDomP-default.json')
isis_mo = icurl('mo', 'uni/fabric/isisDomP-default.json')
redistribMetric = isis_mo[0]['isisDomPol']['attributes'].get('redistribMetric')

msite = False
Expand All @@ -2186,8 +2169,7 @@ def isis_redis_metric_mpod_msite_check(index, total_checks, **kwargs):
recommended_action = 'Change ISIS Redistribution Metric to less than 63'

if recommended_action:
if not mpod_msite_mo:
mpod_msite_mo = icurl('class','fvFabricExtConnP.json?query-target=children')
mpod_msite_mo = icurl('class', 'fvFabricExtConnP.json?query-target=children')
if mpod_msite_mo:
pods_list = []

Expand All @@ -2211,25 +2193,18 @@ def bgp_golf_route_target_type_check(index, total_checks, cversion=None, tversio
title = 'BGP route target type for GOLF over L2EVPN'
result = FAIL_O
msg = ''
headers = ["VRF DN","Global Name", "Route Target", "Recommendation" ]
headers = ["VRF DN", "Global Name", "Route Target", "Recommendation"]
data = []
recommended_action = "Reconfigure extended: RT with prefix route-target: "
doc_url = 'https://bst.cloudapps.cisco.com/bugsearch/bug/CSCvm23100'
print_title(title, index, total_checks)

if not cversion:
cversion = kwargs.get("cversion", None)
if not tversion:
tversion = kwargs.get("tversion", None)

if not tversion:
print_result(title, MANUAL, 'Target version not supplied. Skipping.')
return MANUAL

if cversion.older_than("4.2(1a)") and tversion.newer_than("4.2(1a)"):
fvctx_mo = kwargs.get("fvCtx.json", None)
if not fvctx_mo:
fvctx_mo = icurl('class', 'fvCtx.json?rsp-subtree=full&rsp-subtree-class=l3extGlobalCtxName,bgpRtTarget&rsp-subtree-include=required')
fvctx_mo = icurl('class', 'fvCtx.json?rsp-subtree=full&rsp-subtree-class=l3extGlobalCtxName,bgpRtTarget&rsp-subtree-include=required')

if fvctx_mo:
for vrf in fvctx_mo:
Expand Down Expand Up @@ -2382,7 +2357,7 @@ def contract_22_defect_check(index, total_checks, cversion, tversion, **kwargs):
return result


def llfc_susceptibility_check(index, total_checks, cversion=None, tversion=None, vpc_node_ids=[], **kwargs):
def llfc_susceptibility_check(index, total_checks, cversion=None, tversion=None, vpc_node_ids=None, **kwargs):
title = 'Link Level Flow Control'
result = PASS
msg = ''
Expand All @@ -2393,18 +2368,10 @@ def llfc_susceptibility_check(index, total_checks, cversion=None, tversion=None,
doc_url = 'https://bst.cloudapps.cisco.com/bugsearch/bug/CSCvo27498'
print_title(title, index, total_checks)

if not cversion:
cversion = kwargs.get("cversion", None)
if not tversion:
tversion = kwargs.get("tversion", None)

if not tversion:
print_result(title, MANUAL, 'Target version not supplied. Skipping.')
return MANUAL

if not vpc_node_ids:
vpc_node_ids = kwargs.get("vpc_node_ids", [])

if not vpc_node_ids:
print_result(title, result, 'No VPC Nodes found. Not susceptible.')
return result
Expand All @@ -2418,9 +2385,7 @@ def llfc_susceptibility_check(index, total_checks, cversion=None, tversion=None,
t_affected = True

if sx_affected or t_affected:
ethpmFcot = kwargs.get("ethpmFcot.json")
if not ethpmFcot:
ethpmFcot = icurl('class', 'ethpmFcot.json?query-target-filter=and(eq(ethpmFcot.type,"sfp"),eq(ethpmFcot.state,"inserted"))')
ethpmFcot = icurl('class', 'ethpmFcot.json?query-target-filter=and(eq(ethpmFcot.type,"sfp"),eq(ethpmFcot.state,"inserted"))')

for fcot in ethpmFcot:
typeName = fcot['ethpmFcot']['attributes']['typeName']
Expand Down Expand Up @@ -2454,19 +2419,12 @@ def telemetryStatsServerP_object_check(index, total_checks, cversion=None, tvers
doc_url = 'https://bst.cloudapps.cisco.com/bugsearch/bug/CSCvt47850'
print_title(title, index, total_checks)

telemetryStatsServerP_json = kwargs.get("telemetryStatsServerP.json", None)
if not cversion:
cversion = kwargs.get("cversion", None)
if not tversion:
tversion = kwargs.get("tversion", None)

if not tversion:
print_result(title, MANUAL, 'Target version not supplied. Skipping.')
return MANUAL

if cversion.older_than("4.2(4d)") and tversion.newer_than("5.2(2d)"):
if not isinstance(telemetryStatsServerP_json, list):
telemetryStatsServerP_json = icurl('class', 'telemetryStatsServerP.json')
telemetryStatsServerP_json = icurl('class', 'telemetryStatsServerP.json')
for serverp in telemetryStatsServerP_json:
if serverp["telemetryStatsServerP"]["attributes"].get("collectorLocation") == "apic":
result = FAIL_O
Expand All @@ -2486,18 +2444,12 @@ def internal_vlanpool_check(index, total_checks, tversion=None, **kwargs):
doc_url = 'https://bst.cloudapps.cisco.com/bugsearch/bug/CSCvw33061'
print_title(title, index, total_checks)

fvnsVlanInstP_json = kwargs.get("fvnsVlanInstP.json", None)
vmmDomP_json = kwargs.get("vmmDomP.json", None)
if not tversion:
tversion = kwargs.get("tversion", None)

if not tversion:
print_result(title, MANUAL, 'Target version not supplied. Skipping.')
return MANUAL

if tversion.newer_than("4.2(6a)"):
if not isinstance(fvnsVlanInstP_json, list):
fvnsVlanInstP_json = icurl('class', 'fvnsVlanInstP.json?rsp-subtree=children&rsp-subtree-class=fvnsRtVlanNs,fvnsEncapBlk&rsp-subtree-include=required')
fvnsVlanInstP_json = icurl('class', 'fvnsVlanInstP.json?rsp-subtree=children&rsp-subtree-class=fvnsRtVlanNs,fvnsEncapBlk&rsp-subtree-include=required')
# Dict with key = vlan pool name, values = list of associated domains
dom_rel = {}
# List of vlanInstP which contain fvnsEncapBlk.role = "internal"
Expand Down Expand Up @@ -2529,8 +2481,7 @@ def internal_vlanpool_check(index, total_checks, tversion=None, **kwargs):
if [vlanInstP_name, ', '.join(encap_blk_dict[vlanInstP_name]), dom["dn"], 'VLANs in this Block will be removed from switch Front-Panel if not corrected'] not in data:
data.append([vlanInstP_name, ', '.join(encap_blk_dict[vlanInstP_name]), dom["dn"], 'VLANs in this Block will be removed from switch Front-Panel if not corrected'])
assoc_doms.append(dom["dn"])
if not isinstance(vmmDomP_json, list):
vmmDomP_json = icurl('class', 'vmmDomP.json')
vmmDomP_json = icurl('class', 'vmmDomP.json')
for vmmDomP in vmmDomP_json:
if vmmDomP["vmmDomP"]["attributes"]["dn"] in assoc_doms:
if vmmDomP["vmmDomP"]["attributes"]["enableAVE"] != "yes":
Expand Down Expand Up @@ -2559,7 +2510,7 @@ def apic_ca_cert_validation(index, total_checks, **kwargs):

certreq_out = kwargs.get("certreq_out")
if not certreq_out:
pki_fabric_ca_mo = icurl('class','pkiFabricSelfCAEp.json')
pki_fabric_ca_mo = icurl('class', 'pkiFabricSelfCAEp.json')
if pki_fabric_ca_mo:
# Prep csr
passphrase = pki_fabric_ca_mo[0]['pkiFabricSelfCAEp']['attributes']['currCertReqPassphrase']
Expand Down Expand Up @@ -2604,8 +2555,8 @@ def apic_ca_cert_validation(index, total_checks, **kwargs):
# Perform test certreq
url = 'https://127.0.0.1/raca/certreq.json'
payload = '{"aaaCertGenReq":{"attributes":{"type":"csvc","hmac":"%s", "certreq": "%s", ' \
'"podip": "None", "podmac": "None", "podname": "None"}}}' % (hmac, certreq)
cmd = 'icurl -kX POST %s -d \' %s \'' %(url,payload)
'"podip": "None", "podmac": "None", "podname": "None"}}}' % (hmac, certreq)
cmd = 'icurl -kX POST %s -d \' %s \'' % (url, payload)
logging.debug('cmd = ' + ''.join(cmd))
certreq_proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True)
certreq_out = certreq_proc.communicate()[0].strip()
Expand Down
File renamed without changes.
File renamed without changes.
32 changes: 32 additions & 0 deletions tests/apic_ca_cert_validation/test_apic_ca_cert_validation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import os
import pytest
import logging
import importlib

script = importlib.import_module("aci-preupgrade-validation-script")

log = logging.getLogger(__name__)
dir = os.path.dirname(os.path.abspath(__file__))


@pytest.mark.parametrize(
"certreq_out_file, expected_result",
[
# FAIL - certreq returns error
(
"POS_certreq.txt",
script.FAIL_O,
),
# PASS - certreq returns cert info
(
"NEG_certreq.txt",
script.PASS,
),
],
)
def test_logic(certreq_out_file, expected_result):
data_path = os.path.join("tests", dir, certreq_out_file)
with open(data_path, "r") as file:
certreq_out = file.read()
result = script.apic_ca_cert_validation(1, 1, certreq_out=certreq_out)
assert result == expected_result
50 changes: 50 additions & 0 deletions tests/bgp_golf_route_target_type_check/fvCtx_pos.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
[
{
"fvCtx": {
"attributes": {
"annotation": "orchestrator:msc-shadow:no",
"bdEnforcedEnable": "no",
"dn": "uni/tn-welkin/ctx-qa",
"knwMcastAct": "permit",
"monPolDn": "uni/tn-common/monepg-default",
"name": "qa",
"pcEnfDir": "ingress",
"pcEnfDirUpdated": "yes",
"pcEnfPref": "enforced",
"pcTag": "16386",
"scope": "3080192",
"seg": "3080192"
},
"children": [
{
"l3extGlobalCtxName": {
"attributes": {
"name": "welkinqa",
"rn": "globalctxname"
}
}
},
{
"bgpRtTargetP": {
"attributes": {
"af": "ipv4-ucast",
"rn": "rtp-ipv4-ucast"
},
"children": [
{
"bgpRtTarget": {
"attributes": {
"rn": "rt-[extended:as2-nn2:100:2000]-import",
"rt": "extended:as2-nn2:100:2000",
"targetAf": "l2vpn-evpn",
"type": "import"
}
}
}
]
}
}
]
}
}
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import os
import pytest
import logging
import importlib
from helpers.utils import read_data

script = importlib.import_module("aci-preupgrade-validation-script")

log = logging.getLogger(__name__)
dir = os.path.dirname(os.path.abspath(__file__))


# icurl queries
fvCtxs = 'fvCtx.json?rsp-subtree=full&rsp-subtree-class=l3extGlobalCtxName,bgpRtTarget&rsp-subtree-include=required'


@pytest.mark.parametrize(
"icurl_outputs, cversion, tversion, expected_result",
[
(
{fvCtxs: read_data(dir, "fvCtx_pos.json")},
"4.2(1b)",
"5.2(2a)",
script.PASS,
),
(
{fvCtxs: read_data(dir, "fvCtx_pos.json")},
"3.2(1a)",
"4.2(4d)",
script.FAIL_O,
),
(
{fvCtxs: read_data(dir, "fvCtx_pos.json")},
"3.2(1a)",
"5.2(6a)",
script.FAIL_O,
),
(
{fvCtxs: read_data(dir, "fvCtx_pos.json")},
"4.2(3a)",
"4.2(7d)",
script.PASS,
),
(
{fvCtxs: read_data(dir, "fvCtx_pos.json")},
"2.2(3a)",
"2.2(4r)",
script.PASS,
),
(
{fvCtxs: read_data(dir, "fvCtx_pos.json")},
"5.2(1a)",
None,
script.MANUAL,
),
(
{fvCtxs: read_data(dir, "fvCtx_pos.json")},
"4.1(1a)",
"5.2(7f)",
script.FAIL_O,
),
],
)
def test_logic(mock_icurl, cversion, tversion, expected_result):
result = script.bgp_golf_route_target_type_check(
1,
1,
script.AciVersion(cversion),
script.AciVersion(tversion) if tversion else None,
)
assert result == expected_result
Loading

0 comments on commit d52fa97

Please sign in to comment.