diff --git a/Tests/iaas/security-groups/default-security-group-rules.py b/Tests/iaas/security-groups/default-security-group-rules.py index def511956..261d34211 100755 --- a/Tests/iaas/security-groups/default-security-group-rules.py +++ b/Tests/iaas/security-groups/default-security-group-rules.py @@ -28,11 +28,11 @@ def check_default_rules(rules, short=False): if short is True, the testing mode is set on short for older OpenStack versions """ ingress_rules = egress_rules = 0 - egress_vars = {'IPv4': {}, 'IPv6': {}} + egress_vars = {"IPv4": {}, "IPv6": {}} for key, value in egress_vars.items(): - value['default'] = 0 + value["default"] = 0 if not short: - value['custom'] = 0 + value["custom"] = 0 if not rules: logger.info("No default security group rules defined.") for rule in rules: @@ -42,36 +42,48 @@ def check_default_rules(rules, short=False): if not short: # we allow ingress from the same security group # but only for the default security group - if rule.remote_group_id == "PARENT" and not rule["used_in_non_default_sg"]: + if ( + rule.remote_group_id == "PARENT" + and not rule["used_in_non_default_sg"] + ): continue ingress_rules += 1 elif direction == "egress" and ethertype in egress_vars: egress_rules += 1 if short: - egress_vars[ethertype]['default'] += 1 + egress_vars[ethertype]["default"] += 1 continue if rule.remote_ip_prefix: # this rule does not allow traffic to all external ips continue # note: these two are not mutually exclusive if rule["used_in_default_sg"]: - egress_vars[ethertype]['default'] += 1 + egress_vars[ethertype]["default"] += 1 if rule["used_in_non_default_sg"]: - egress_vars[ethertype]['custom'] += 1 + egress_vars[ethertype]["custom"] += 1 # test whether there are no unallowed ingress rules if ingress_rules: logger.error(f"Expected no default ingress rules, found {ingress_rules}.") # test whether all expected egress rules are present - missing = [(key, key2) for key, val in egress_vars.items() for key2, val2 in val.items() if not val2] + missing = [ + (key, key2) + for key, val in egress_vars.items() + for key2, val2 in val.items() + if not val2 + ] if missing: logger.error( "Expected rules for egress for IPv4 and IPv6 both for default and custom security groups. " f"Missing rule types: {', '.join(str(x) for x in missing)}" ) - logger.info(str({ - "Unallowed Ingress Rules": ingress_rules, - "Egress Rules": egress_rules, - })) + logger.info( + str( + { + "Unallowed Ingress Rules": ingress_rules, + "Egress Rules": egress_rules, + } + ) + ) def create_security_group(conn, sg_name: str = SG_NAME, description: str = DESCRIPTION): @@ -139,7 +151,9 @@ def main(): "to the OS_CLOUD environment variable", ) parser.add_argument( - "--debug", action="store_true", help="Enable debug logging", + "--debug", + action="store_true", + help="Enable debug logging", ) args = parser.parse_args() openstack.enable_logging(debug=args.debug) @@ -164,10 +178,17 @@ def main(): test_rules(conn) c = counting_handler.bylevel - logger.debug(f"Total critical / error / warning: {c[logging.CRITICAL]} / {c[logging.ERROR]} / {c[logging.WARNING]}") + logger.debug( + f"Total critical / error / warning: {c[logging.CRITICAL]} / {c[logging.ERROR]} / {c[logging.WARNING]}" + ) if not c[logging.CRITICAL]: - print("security-groups-default-rules-check: " + ('PASS', 'FAIL')[min(1, c[logging.ERROR])]) - return min(127, c[logging.CRITICAL] + c[logging.ERROR]) # cap at 127 due to OS restrictions + print( + "security-groups-default-rules-check: " + + ("PASS", "FAIL")[min(1, c[logging.ERROR])] + ) + return min( + 127, c[logging.CRITICAL] + c[logging.ERROR] + ) # cap at 127 due to OS restrictions if __name__ == "__main__": diff --git a/Tests/kaas/k8s-default-storage-class/k8s-default-storage-class-check.py b/Tests/kaas/k8s-default-storage-class/k8s-default-storage-class-check.py index c42759c00..7b65603f9 100755 --- a/Tests/kaas/k8s-default-storage-class/k8s-default-storage-class-check.py +++ b/Tests/kaas/k8s-default-storage-class/k8s-default-storage-class-check.py @@ -48,6 +48,7 @@ PVC_NAME = "test-k-pvc" PV_NAME = "test-k-pv" POD_NAME = "test-k-pod" +# A list of CSI-Providers that are ALLOWED_CSI_PROV = ["cinder", "rookCeph", "longhorn"] @@ -161,8 +162,12 @@ def check_default_persistentvolumeclaim_readwriteonce( """ # 3. Check if PV got succesfully created using ReadWriteOnce logger.debug("check if the created PV supports ReadWriteOnce") - api_response = k8s_api_instance.list_persistent_volume(_preload_content=False) + if not api_response: + raise SCSTestException( + "No persistent volume found", + return_code=1, + ) pv_info = json.loads(api_response.read().decode("utf-8")) pv_list = pv_info["items"]