Skip to content

Commit

Permalink
simplify, lose 75 lines
Browse files Browse the repository at this point in the history
Signed-off-by: Matthias Büchse <[email protected]>
  • Loading branch information
mbuechse committed Nov 7, 2024
1 parent 7b5359e commit 1fe8a11
Showing 1 changed file with 63 additions and 138 deletions.
201 changes: 63 additions & 138 deletions Tests/iaas/security-groups/default-security-group-rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,16 @@
presence of default rules for egress traffic is checked.
"""

import openstack
import os
import argparse
import os

import openstack
from openstack.exceptions import ResourceNotFound

SG_NAME = "default-test-sg"
DESCRIPTION = "default-test-sg"


def connect(cloud_name: str) -> openstack.connection.Connection:
"""Create a connection to an OpenStack cloud
:param string cloud_name:
The name of the configuration to load from clouds.yaml.
:returns: openstack.connnection.Connection
"""
return openstack.connect(
cloud=cloud_name,
)


def count_ingress_egress(rules, short=False):
"""
counts all verall ingress rules and egress rules, depending on the requested testing mode
Expand All @@ -39,107 +27,60 @@ def count_ingress_egress(rules, short=False):
"""
ingress_rules = 0
egress_rules = 0
if not short:
egress_ipv4_default_sg = 0
egress_ipv4_custom_sg = 0
egress_ipv6_default_sg = 0
egress_ipv6_custom_sg = 0
else:
egress_ipv4 = 0
egress_ipv6 = 0
egress_vars = {'IPv4': {}, 'IPv6': {}}
for key, value in egress_vars.items():
value['default'] = 0
if not short:
value['custom'] = 0
if not rules:
print("No default security group rules defined.")
else:
for rule in rules:
direction = rule["direction"]
ethertype = rule["ethertype"]
for rule in rules:
direction = rule["direction"]
ethertype = rule["ethertype"]
if direction == "ingress":
if not short:
r_custom_sg = rule["used_in_non_default_sg"]
r_default_sg = rule["used_in_default_sg"]
if direction == "ingress":
ingress_rules += 1
if not short:
# we allow ingress from the same security group
# but only for the default security group
r_group_id = rule.remote_group_id
if r_group_id == "PARENT" and not r_custom_sg:
ingress_rules -= 1
elif direction == "egress" and ethertype == "IPv4":
egress_rules += 1
if not short:
if rule.remote_ip_prefix:
# this rule does not allow traffic to all external ips
continue
if r_custom_sg:
egress_ipv4_custom_sg += 1
if r_default_sg:
egress_ipv4_default_sg += 1
else:
egress_ipv4 += 1
elif direction == "egress" and ethertype == "IPv6":
egress_rules += 1
if not short:
if rule.remote_ip_prefix:
# this rule does not allow traffic to all external ips
continue
if r_custom_sg:
egress_ipv6_custom_sg += 1
if r_default_sg:
egress_ipv6_default_sg += 1
else:
egress_ipv6 += 1
if not egress_rules > 0:
raise ValueError(
f"Expected to have more than {egress_rules} egress rules present."
)
if not short:
var_list = [
egress_ipv4_default_sg,
egress_ipv4_custom_sg,
egress_ipv6_default_sg,
egress_ipv6_custom_sg,
]
else:
var_list = [
egress_ipv4,
egress_ipv6,
]
# we allow ingress from the same security group
# but only for the default security group
r_group_id = rule.remote_group_id
if r_group_id == "PARENT" and not rule["used_in_non_default_sg"]:
continue
ingress_rules += 1
elif direction == "egress" and ethertype in egress_vars:
egress_rules += 1
if short:
egress_vars[ethertype]['default'] += 1
continue
if rule.remote_ip_prefix:
# this rule does not allow traffic to all external ips
continue
# note: these two are not mutually exclusive
if rule["used_in_default_sg"]:
egress_vars[ethertype]['default'] += 1
if rule["used_in_non_default_sg"]:
egress_vars[ethertype]['custom'] += 1
# test whether there are no unallowed ingress rules
if not ingress_rules == 0:
if ingress_rules:
raise ValueError(
f"Expected no default ingress rules for security groups, "
f"But there are {ingress_rules} ingress rules. "
f"There should be only none."
)
# test whether all expected egress rules are present
if not all(var > 0 for var in var_list):
missing = [(key, key2) for key, val in egress_vars.items() for key2, val2 in val.items() if not val2]
if missing:
raise ValueError(
"Not all expected egress rules are present. "
"Expected rules for egress for IPv4 and IPv6 "
"both for default and custom security groups."
)
return ingress_rules, egress_rules


def test_rules(cloud_name: str):
try:
connection = connect(cloud_name)
rules = connection.network.default_security_group_rules()
except Exception as e:
print(str(e))
raise Exception(
f"Connection to cloud '{cloud_name}' was not successful. "
f"The default Security Group Rules could not be accessed. "
f"Please check your cloud connection and authorization."
"both for default and custom security groups. "
f"Missing rule types: {', '.join(str(x) for x in missing)}"
)
if not any(rule for rule in rules):
raise
ingress_rules, egress_rules = count_ingress_egress(rules)
result_dict = {
return {
"Unallowed Ingress Rules": ingress_rules,
"Egress Rules": egress_rules,
}
return result_dict


def test_rules(connection: openstack.connection.Connection):
rules = connection.network.default_security_group_rules()
return count_ingress_egress(rules)


def create_security_group(conn, sg_name: str = SG_NAME, description: str = DESCRIPTION):
Expand All @@ -166,30 +107,13 @@ def delete_security_group(conn, sg_id):
print(f"Security group {sg_id} was not deleted successfully" f"Exception: {e}")


def altern_test_rules(cloud_name: str):
def altern_test_rules(connection: openstack.connection.Connection):
sg_id = create_security_group(connection)
try:
connection = connect(cloud_name)
except Exception as e:
print(str(e))
raise Exception(
f"Connection to cloud '{cloud_name}' was not successful. "
f"The default Security Group Rules could not be accessed. "
f"Please check your cloud connection and authorization."
)
try:
sg_id = create_security_group(connection)
rules = connection.network.find_security_group(name_or_id=sg_id)
except Exception:
print("Security group was not created successfully.")

ingress_rules, egress_rules = count_ingress_egress(rules.security_group_rules, True)
delete_security_group(connection, sg_id)

result_dict = {
"Unallowed Ingress Rules": ingress_rules,
"Egress Rules": egress_rules,
}
return result_dict
sg = connection.network.find_security_group(name_or_id=sg_id)
return count_ingress_egress(sg.security_group_rules, True)
finally:
delete_security_group(connection, sg_id)


def main():
Expand All @@ -209,25 +133,26 @@ def main():
openstack.enable_logging(debug=args.debug)

# parse cloud name for lookup in clouds.yaml
cloud = os.environ.get("OS_CLOUD", None)
if args.os_cloud:
cloud = args.os_cloud
cloud = args.os_cloud or os.environ.get("OS_CLOUD", None)
if not cloud:
raise ValueError(
"You need to have the OS_CLOUD environment variable set to your cloud "
"name or pass it via --os-cloud"
)
try:
print(test_rules(cloud))
except ResourceNotFound as e:
print(
"Ressource could not be found. Openstack components might not be up to date. "
"Continuing with still supported test. "
f"Error: {e}"
)
print(altern_test_rules(cloud))
except Exception as e:
print(f"Error occured: {e}")

with openstack.connect(cloud) as conn:
try:
print(test_rules(conn))
except ResourceNotFound as e:
print(
"Resource could not be found. OpenStack components might not be up to date. "
"Falling back to old-style test method. "
f"Error: {e}"
)
print(altern_test_rules(conn))
except Exception as e:
print(f"Error occured: {e}")
raise


if __name__ == "__main__":
Expand Down

0 comments on commit 1fe8a11

Please sign in to comment.