From b089566df82603c5c676791732ebddca1ace4cdb Mon Sep 17 00:00:00 2001 From: Joris Conijn Date: Fri, 28 Jul 2023 19:34:04 +0200 Subject: [PATCH] feat: display rules per region (#8) **Issue #, if available:** ## Description of changes: By adding the ability to list rules per region, we make it possible to render documentation per region. This is useful when you have a firewall per region. **Checklist** * [x] Update tests * [ ] Update docs * [x] PR title follows [conventional commit semantics](https://www.conventionalcommits.org/en/v1.0.0-beta.2/#commit-message-for-a-fix-using-an-optional-issue-number) By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice. --- aws_network_firewall/account.py | 47 ++++--- aws_network_firewall/destination.py | 8 -- aws_network_firewall/rule.py | 1 + aws_network_firewall/rule_set.py | 31 +++++ aws_network_firewall/schemas/__init__.py | 3 +- aws_network_firewall/schemas/environment.yaml | 12 +- aws_network_firewall/source.py | 8 -- tests/test_account.py | 125 +++++++++++++++--- tests/test_destination.py | 31 ----- tests/test_rule.py | 28 ++-- tests/test_source.py | 18 +-- tests/workload-firewall-rules.jinja | 12 +- tests/workloads/example-workload/README.md | 14 +- .../example-workload/development.yaml | 3 +- 14 files changed, 201 insertions(+), 140 deletions(-) create mode 100644 aws_network_firewall/rule_set.py diff --git a/aws_network_firewall/account.py b/aws_network_firewall/account.py index f490163..ce04481 100644 --- a/aws_network_firewall/account.py +++ b/aws_network_firewall/account.py @@ -1,12 +1,17 @@ from __future__ import annotations -from typing import List, Optional + +import itertools +from typing import List, Union from landingzone_organization import Account as LandingZoneAccount from aws_network_firewall.cidr_ranges import CidrRanges +from aws_network_firewall.destination import Destination from aws_network_firewall.rule import Rule +from aws_network_firewall.rule_set import RuleSet +from aws_network_firewall.source import Source class Account(LandingZoneAccount): - __rules: List[Rule] + __rules: RuleSet __cidr_ranges: CidrRanges def __init__( @@ -14,34 +19,36 @@ def __init__( ) -> None: super().__init__(name, account_id) self.__cidr_ranges = cidr_ranges - self.__rules = list(map(self.__enrich_rule, rules)) + self.__rules = RuleSet(rules=list(map(self.__enrich_rule, rules))) def __enrich_rule(self, rule: Rule) -> Rule: - list( - map( - lambda source: source.resolve_region_cidr_ranges(self.__cidr_ranges), - rule.sources, - ) - ) - list( - map( - lambda destination: destination.resolve_region_cidr_ranges( - self.__cidr_ranges - ), - rule.destinations, - ) - ) + cidr_range = self.__cidr_ranges.by_region(rule.region) + + def update_cidr_if_not_set(entry: Source) -> None: + if cidr_range and not entry.cidr: + entry.cidr = cidr_range.value + + list(map(update_cidr_if_not_set, rule.sources)) return rule @property - def rules(self) -> List[Rule]: + def regions(self) -> List[str]: + return list(set(filter(None, map(lambda rule: rule.region, self.rules.all)))) + + def rules_by_region(self, region: str) -> RuleSet: + return RuleSet( + rules=list(filter(lambda rule: region == rule.region, self.rules.all)) + ) + + @property + def rules(self) -> RuleSet: return self.__rules @property def inspection_rules(self) -> List[Rule]: - return list(filter(lambda rule: rule.is_inspection_rule, self.rules)) + return list(filter(lambda rule: rule.is_inspection_rule, self.rules.all)) @property def egress_rules(self) -> List[Rule]: - return list(filter(lambda rule: rule.is_egress_rule, self.rules)) + return list(filter(lambda rule: rule.is_egress_rule, self.rules.all)) diff --git a/aws_network_firewall/destination.py b/aws_network_firewall/destination.py index a5f083f..2b27887 100644 --- a/aws_network_firewall/destination.py +++ b/aws_network_firewall/destination.py @@ -2,8 +2,6 @@ from dataclasses import dataclass from typing import Optional -from aws_network_firewall.cidr_ranges import CidrRanges - @dataclass class Destination: @@ -15,11 +13,5 @@ class Destination: protocol: str port: Optional[int] endpoint: Optional[str] - region: Optional[str] cidr: Optional[str] message: Optional[str] - - def resolve_region_cidr_ranges(self, ranges: CidrRanges) -> None: - if self.region and not self.cidr: - cidr = ranges.by_region(self.region) - self.cidr = cidr.value if cidr else None diff --git a/aws_network_firewall/rule.py b/aws_network_firewall/rule.py index 1f7e1e2..4f3f3a3 100644 --- a/aws_network_firewall/rule.py +++ b/aws_network_firewall/rule.py @@ -17,6 +17,7 @@ class Rule: workload: str name: str type: str + region: str description: str sources: List[Source] destinations: List[Destination] diff --git a/aws_network_firewall/rule_set.py b/aws_network_firewall/rule_set.py new file mode 100644 index 0000000..e16b9ce --- /dev/null +++ b/aws_network_firewall/rule_set.py @@ -0,0 +1,31 @@ +from __future__ import annotations + +from typing import List + +from aws_network_firewall.rule import Rule + + +class RuleSet: + __rules: List[Rule] + + def __init__(self, rules: List[Rule]) -> None: + self.__rules = rules + + def __len__(self) -> int: + return len(self.all) + + def __iter__(self): + for value in self.all: + yield value + + @property + def all(self) -> List[Rule]: + return self.__rules + + @property + def inspection_rules(self) -> List[Rule]: + return list(filter(lambda rule: rule.is_inspection_rule, self.all)) + + @property + def egress_rules(self) -> List[Rule]: + return list(filter(lambda rule: rule.is_egress_rule, self.all)) diff --git a/aws_network_firewall/schemas/__init__.py b/aws_network_firewall/schemas/__init__.py index 08d6bd6..f13acf6 100644 --- a/aws_network_firewall/schemas/__init__.py +++ b/aws_network_firewall/schemas/__init__.py @@ -16,7 +16,6 @@ def source_resolver(entry: dict) -> Source: return Source( description=entry["Description"], cidr=entry.get("Cidr"), - region=entry.get("Region"), ) @@ -26,7 +25,6 @@ def destination_resolver(entry: dict) -> Destination: protocol=entry["Protocol"], port=entry.get("Port"), endpoint=entry.get("Endpoint"), - region=entry.get("Region"), cidr=entry.get("Cidr"), message=entry.get("Message"), ) @@ -36,6 +34,7 @@ def rule_resolver(workload: str, entry: dict) -> Rule: return Rule( workload=workload, type=entry["Type"], + region=entry["Region"], name=entry["Name"], description=entry["Description"], sources=list(map(source_resolver, entry["Sources"])), diff --git a/aws_network_firewall/schemas/environment.yaml b/aws_network_firewall/schemas/environment.yaml index 0c92ab1..56065b7 100644 --- a/aws_network_firewall/schemas/environment.yaml +++ b/aws_network_firewall/schemas/environment.yaml @@ -49,6 +49,7 @@ definitions: required: - Name - Type + - Region - Description - Sources - Destinations @@ -59,6 +60,8 @@ definitions: enum: [ "Egress", "Inspection" ] Description: type: string + Region: + type: string Sources: type: array items: @@ -78,8 +81,6 @@ definitions: type: string Cidr: type: string - Region: - type: string examples: - Description: Allow access from `10.0.0.0/8` to the defined destinations. Cidr: 10.0.0.0/8 @@ -93,12 +94,11 @@ definitions: - Description - Protocol anyOf: + - required: ["Endpoint", "Cidr"] - required: ["Endpoint"] - not: { required: ["Region", "Cidr"] } + not: { required: ["Cidr"] } - required: ["Cidr"] - not: { required: ["Endpoint", "Region"] } - - required: ["Region"] - not: { required: ["Endpoint", "Cidr"] } + not: { required: ["Endpoint"] } # Port is not required when Protocol is ICMP properties: Description: diff --git a/aws_network_firewall/source.py b/aws_network_firewall/source.py index bb9746f..2e14db8 100644 --- a/aws_network_firewall/source.py +++ b/aws_network_firewall/source.py @@ -2,8 +2,6 @@ from dataclasses import dataclass from typing import Optional -from aws_network_firewall.cidr_ranges import CidrRanges - @dataclass class Source: @@ -13,9 +11,3 @@ class Source: description: str cidr: Optional[str] - region: Optional[str] - - def resolve_region_cidr_ranges(self, ranges: CidrRanges) -> None: - if self.region and not self.cidr: - cidr = ranges.by_region(self.region) - self.cidr = cidr.value if cidr else None diff --git a/tests/test_account.py b/tests/test_account.py index 60c5000..505f910 100644 --- a/tests/test_account.py +++ b/tests/test_account.py @@ -1,3 +1,5 @@ +from typing import List + from aws_network_firewall.account import Account from aws_network_firewall.cidr_range import CidrRange from aws_network_firewall.cidr_ranges import CidrRanges @@ -6,13 +8,25 @@ from aws_network_firewall.source import Source -def generate_rule(type: str) -> Rule: +def generate_account(rules: List[Rule]) -> Account: + return Account( + name="my-account", + account_id="123412341234", + cidr_ranges=CidrRanges( + cidr_ranges=[CidrRange(region="eu-west-1", value="10.0.0.0/8")] + ), + rules=rules, + ) + + +def generate_rule(type: str, region: str) -> Rule: return Rule( workload="my-workload", name="my-rule", + region=region, type=type, description="My description", - sources=[Source(description="my source", cidr="10.0.0.0/24", region=None)], + sources=[Source(description="my source", cidr="10.0.0.0/24")], destinations=[ Destination( description="my destination", @@ -20,30 +34,32 @@ def generate_rule(type: str) -> Rule: port=443, cidr=None, endpoint=None, - region=None, message=None, ) ], ) +outbound_xebia = Destination( + description="Allow outbound connectivity to xebia.com", + protocol="TCP", + port=443, + cidr=None, + endpoint="xebia.com", + message=None, +) + + def test_no_rules() -> None: rules = [] - account = Account( - name="my-account", - account_id="123412341234", - cidr_ranges=CidrRanges( - cidr_ranges=[CidrRange(region="eu-west-1", value="10.0.0.0/24")] - ), - rules=rules, - ) + account = generate_account(rules=rules) assert len(account.rules) == 0 assert len(account.egress_rules) == 0 assert len(account.inspection_rules) == 0 def test_inspection_rules() -> None: - rules = [generate_rule(Rule.INSPECTION)] + rules = [generate_rule(Rule.INSPECTION, region="eu-west-1")] account = Account( name="my-account", account_id="123412341234", @@ -58,15 +74,82 @@ def test_inspection_rules() -> None: def test_egress_rules() -> None: - rules = [generate_rule(Rule.EGRESS)] - account = Account( - name="my-account", - account_id="123412341234", - cidr_ranges=CidrRanges( - cidr_ranges=[CidrRange(region="eu-west-1", value="10.0.0.0/8")] - ), - rules=rules, - ) + rules = [generate_rule(Rule.EGRESS, region="eu-west-1")] + account = generate_account(rules=rules) + assert len(account.rules) == 1 + assert len(account.egress_rules) == 1 + assert len(account.inspection_rules) == 0 + + +def test_rules_resolve_single_region_egress() -> None: + rules = [generate_rule(Rule.EGRESS, region="eu-west-1")] + account = generate_account(rules=rules) assert len(account.rules) == 1 assert len(account.egress_rules) == 1 assert len(account.inspection_rules) == 0 + assert "eu-west-1" in account.regions + + +def test_rules_resolve_2_regions_egress() -> None: + rules = [ + generate_rule(Rule.EGRESS, region="eu-west-1"), + generate_rule(Rule.EGRESS, region="eu-central-1"), + ] + account = generate_account(rules=rules) + assert len(account.rules) == 2 + assert len(account.egress_rules) == 2 + assert len(account.inspection_rules) == 0 + assert "eu-west-1" in account.regions + assert "eu-central-1" in account.regions + + rules = account.rules_by_region("eu-west-1") + assert len(rules) == 1 + assert len(rules.egress_rules) == 1 + assert len(rules.inspection_rules) == 0 + + rules = account.rules_by_region("eu-central-1") + assert len(rules) == 1 + assert len(rules.egress_rules) == 1 + assert len(rules.inspection_rules) == 0 + + +def test_rules_resolve_single_source_region_inspection() -> None: + rules = [generate_rule(Rule.INSPECTION, region="eu-west-1")] + account = generate_account(rules=rules) + assert len(account.rules) == 1 + assert len(account.egress_rules) == 0 + assert len(account.inspection_rules) == 1 + assert "eu-west-1" in account.regions + + rules = account.rules_by_region("eu-west-1") + assert len(rules) == 1 + assert len(rules.egress_rules) == 0 + assert len(rules.inspection_rules) == 1 + + rules = account.rules_by_region("eu-central-1") + assert len(rules) == 0 + assert len(rules.egress_rules) == 0 + assert len(rules.inspection_rules) == 0 + + +def test_rules_resolve_2_source_regions_inspection() -> None: + rules = [ + generate_rule(Rule.INSPECTION, region="eu-west-1"), + generate_rule(Rule.INSPECTION, region="eu-central-1"), + ] + account = generate_account(rules=rules) + assert len(account.rules) == 2 + assert len(account.egress_rules) == 0 + assert len(account.inspection_rules) == 2 + assert "eu-west-1" in account.regions + assert "eu-central-1" in account.regions + + rules = account.rules_by_region("eu-west-1") + assert len(rules) == 1 + assert len(rules.egress_rules) == 0 + assert len(rules.inspection_rules) == 1 + + rules = account.rules_by_region("eu-central-1") + assert len(rules) == 1 + assert len(rules.egress_rules) == 0 + assert len(rules.inspection_rules) == 1 diff --git a/tests/test_destination.py b/tests/test_destination.py index 145eead..b12d455 100644 --- a/tests/test_destination.py +++ b/tests/test_destination.py @@ -1,41 +1,11 @@ -from aws_network_firewall.cidr_ranges import CidrRanges from aws_network_firewall.destination import Destination -def test_destination_region_to_cidr(cidr_ranges: CidrRanges) -> None: - destination = Destination( - description="", - protocol="TLS", - port=443, - region="eu-west-1", - endpoint=None, - cidr=None, - message=None, - ) - destination.resolve_region_cidr_ranges(cidr_ranges) - assert destination.cidr == "10.0.0.0/24" - - -def test_destination_cidr(cidr_ranges: CidrRanges) -> None: - destination = Destination( - description="", - protocol="TLS", - port=443, - region=None, - endpoint=None, - cidr="10.0.0.0/24", - message=None, - ) - destination.resolve_region_cidr_ranges(cidr_ranges) - assert destination.cidr == "10.0.0.0/24" - - def test_destination_properties() -> None: destination = Destination( description="My Description", protocol="TLS", port=443, - region="eu-west-1", endpoint="xebia.com", cidr="10.0.0.0/24", message="Important Message", @@ -43,7 +13,6 @@ def test_destination_properties() -> None: assert destination.description == "My Description" assert destination.protocol == "TLS" assert destination.port == 443 - assert destination.region == "eu-west-1" assert destination.endpoint == "xebia.com" assert destination.cidr == "10.0.0.0/24" assert destination.message == "Important Message" diff --git a/tests/test_rule.py b/tests/test_rule.py index f233ea1..cbb0b59 100644 --- a/tests/test_rule.py +++ b/tests/test_rule.py @@ -7,9 +7,10 @@ def test_rule_with_tls_endpoint() -> None: rule = Rule( workload="my-workload", name="my-rule", + region="eu-west-1", type=Rule.INSPECTION, description="My description", - sources=[Source(description="my source", cidr="10.0.0.0/24", region=None)], + sources=[Source(description="my source", cidr="10.0.0.0/24")], destinations=[ Destination( description="my destination", @@ -17,7 +18,6 @@ def test_rule_with_tls_endpoint() -> None: port=443, endpoint="xebia.com", cidr="10.0.1.0/24", - region=None, message=None, ) ], @@ -33,9 +33,10 @@ def test_rule_with_tls_wildcard_endpoint() -> None: rule = Rule( workload="my-workload", name="my-rule", + region="eu-west-1", type=Rule.INSPECTION, description="My description", - sources=[Source(description="my source", cidr="10.0.0.0/24", region=None)], + sources=[Source(description="my source", cidr="10.0.0.0/24")], destinations=[ Destination( description="my destination", @@ -43,7 +44,6 @@ def test_rule_with_tls_wildcard_endpoint() -> None: port=443, endpoint="*.xebia.com", cidr="10.0.1.0/24", - region=None, message=None, ) ], @@ -59,9 +59,10 @@ def test_rule_with_tls_endpoint_non_standard_port() -> None: rule = Rule( workload="my-workload", name="my-rule", + region="eu-west-1", type=Rule.INSPECTION, description="My description", - sources=[Source(description="my source", cidr="10.0.0.0/24", region=None)], + sources=[Source(description="my source", cidr="10.0.0.0/24")], destinations=[ Destination( description="my destination", @@ -69,7 +70,6 @@ def test_rule_with_tls_endpoint_non_standard_port() -> None: port=444, endpoint="xebia.com", cidr="10.0.1.0/24", - region=None, message=None, ) ], @@ -86,9 +86,10 @@ def test_rule_with_tcp_cidr() -> None: rule = Rule( workload="my-workload", name="my-rule", + region="eu-west-1", type=Rule.INSPECTION, description="My description", - sources=[Source(description="my source", cidr="10.0.0.0/24", region=None)], + sources=[Source(description="my source", cidr="10.0.0.0/24")], destinations=[ Destination( description="my destination", @@ -96,7 +97,6 @@ def test_rule_with_tcp_cidr() -> None: port=443, cidr="10.0.1.0/24", endpoint=None, - region=None, message=None, ) ], @@ -112,9 +112,10 @@ def test_icmp_rule() -> None: rule = Rule( workload="my-workload", name="my-rule", + region="eu-west-1", type=Rule.INSPECTION, description="My description", - sources=[Source(description="my source", cidr="10.0.0.0/24", region=None)], + sources=[Source(description="my source", cidr="10.0.0.0/24")], destinations=[ Destination( description="my destination", @@ -122,7 +123,6 @@ def test_icmp_rule() -> None: port=None, cidr="10.0.1.0/24", endpoint=None, - region=None, message=None, ) ], @@ -138,9 +138,10 @@ def test_egress_tls_rule() -> None: rule = Rule( workload="my-workload", name="my-rule", + region="eu-west-1", type=Rule.EGRESS, description="My description", - sources=[Source(description="my source", cidr=None, region="eu-west-1")], + sources=[Source(description="my source", cidr=None)], destinations=[ Destination( description="my destination", @@ -148,7 +149,6 @@ def test_egress_tls_rule() -> None: port=443, cidr=None, endpoint="xebia.com", - region=None, message=None, ) ], @@ -164,9 +164,10 @@ def test_egress_tls_rule_with_message() -> None: rule = Rule( workload="my-workload", name="my-rule", + region="eu-west-1", type=Rule.EGRESS, description="My description", - sources=[Source(description="my source", cidr=None, region="eu-west-1")], + sources=[Source(description="my source", cidr=None)], destinations=[ Destination( description="my destination", @@ -174,7 +175,6 @@ def test_egress_tls_rule_with_message() -> None: port=443, cidr=None, endpoint="xebia.com", - region=None, message="IMPORTANT BECAUSE ...", ) ], diff --git a/tests/test_source.py b/tests/test_source.py index 23aefc1..8bc2ccb 100644 --- a/tests/test_source.py +++ b/tests/test_source.py @@ -1,22 +1,10 @@ -from aws_network_firewall.cidr_ranges import CidrRanges from aws_network_firewall.source import Source -def test_source_region_to_cidr(cidr_ranges: CidrRanges) -> None: +def test_source_properties() -> None: source = Source( - description="", - region="eu-west-1", - cidr=None, - ) - source.resolve_region_cidr_ranges(cidr_ranges) - assert source.cidr == "10.0.0.0/24" - - -def test_source_cidr(cidr_ranges: CidrRanges) -> None: - source = Source( - description="", - region=None, + description="My Description", cidr="10.0.0.0/24", ) - source.resolve_region_cidr_ranges(cidr_ranges) + assert source.description == "My Description" assert source.cidr == "10.0.0.0/24" diff --git a/tests/workload-firewall-rules.jinja b/tests/workload-firewall-rules.jinja index 7522fae..12644bb 100644 --- a/tests/workload-firewall-rules.jinja +++ b/tests/workload-firewall-rules.jinja @@ -32,16 +32,16 @@ The {{ account.name }} has the following suppressions registered: #### Sources -**CIDR** | **Region** | **Description** ----------|------------|---------------- -{% for source in rule.sources %}{{ source.cidr }} | {{ source.region }} | {{ source.description }} +**CIDR** | **Description** +---------|---------------- +{% for source in rule.sources %}{{ source.cidr }} | {{ source.description }} {% endfor %} #### Destinations -**Endpoint** | **CIDR** | **Region** | **Protocol** | **Port** | **Description** --------------|----------|------------|--------------|----------|----------------- -{% for destination in rule.destinations %}{{ destination.endpoint }} | {{ destination.cidr }} | {{ destination.region }} | {{ destination.protocol }} | {{ destination.port }} | {% autoescape false %}{{ destination.description | replace("\n", "
") }}{% endautoescape %} +**Endpoint** | **CIDR** | **Protocol** | **Port** | **Description** +-------------|----------|--------------|----------|----------------- +{% for destination in rule.destinations %}{{ destination.endpoint }} | {{ destination.cidr }} | {{ destination.protocol }} | {{ destination.port }} | {% autoescape false %}{{ destination.description | replace("\n", "
") }}{% endautoescape %} {% endfor %} #### Rules diff --git a/tests/workloads/example-workload/README.md b/tests/workloads/example-workload/README.md index 284a4ee..f5aa076 100644 --- a/tests/workloads/example-workload/README.md +++ b/tests/workloads/example-workload/README.md @@ -33,16 +33,16 @@ My rule destination #### Sources -**CIDR** | **Region** | **Description** ----------|------------|---------------- -192.168.0.0/21 | eu-west-1 | My Source +**CIDR** | **Description** +---------|---------------- +192.168.0.0/21 | My Source #### Destinations -**Endpoint** | **CIDR** | **Region** | **Protocol** | **Port** | **Description** --------------|----------|------------|--------------|----------|----------------- -xebia.com | 192.168.8.0/21 | eu-central-1 | TLS | 443 | My destination +**Endpoint** | **CIDR** | **Protocol** | **Port** | **Description** +-------------|----------|--------------|----------|----------------- +xebia.com | None | TLS | 443 | My destination #### Rules @@ -50,7 +50,7 @@ xebia.com | 192.168.8.0/21 | eu-central-1 | TLS | 443 | My destination Based on the above defined sources and destination the following firewall rules are required: ``` -pass tls 192.168.0.0/21 any -> 192.168.8.0/21 443 (tls.sni; tls.version:1.2; content:"xebia.com"; nocase; startswith; endswith; msg:"binxio-example-workload-development | My Rule name"; rev:1; sid:XXX;) +pass tls 192.168.0.0/21 any -> any 443 (tls.sni; tls.version:1.2; content:"xebia.com"; nocase; startswith; endswith; msg:"binxio-example-workload-development | My Rule name"; rev:1; sid:XXX;) ``` diff --git a/tests/workloads/example-workload/development.yaml b/tests/workloads/example-workload/development.yaml index c66bcb7..49e1c5d 100644 --- a/tests/workloads/example-workload/development.yaml +++ b/tests/workloads/example-workload/development.yaml @@ -6,13 +6,12 @@ CidrRanges: Rules: - Name: My Rule name Type: Egress + Region: eu-west-1 Description: My rule destination Sources: - Description: My Source - Region: eu-west-1 Destinations: - Description: My destination Endpoint: xebia.com - Region: eu-central-1 Protocol: TLS Port: 443 \ No newline at end of file