Skip to content

Commit

Permalink
feat: support rule types
Browse files Browse the repository at this point in the history
When you need to split egress and inspection traffic you also need to split the rules. By introducing the rules per type we enable this behaviour.
  • Loading branch information
Nr18 committed Jul 24, 2023
1 parent 4338248 commit a40d9ad
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 1 deletion.
8 changes: 8 additions & 0 deletions aws_network_firewall/account.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,11 @@ def __enrich_rule(self, rule: Rule) -> Rule:
@property
def rules(self) -> List[Rule]:
return self.__rules

@property
def inspection_rules(self) -> List[Rule]:
return list(filter(lambda rule: rule.is_inspection_rule, self.rules))

@property
def egress_rules(self) -> List[Rule]:
return list(filter(lambda rule: rule.is_egress_rule, self.rules))
14 changes: 13 additions & 1 deletion aws_network_firewall/rule.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations

from dataclasses import dataclass
from typing import List, Optional
from typing import List, Optional, ClassVar

from aws_network_firewall.source import Source
from aws_network_firewall.destination import Destination
Expand All @@ -16,10 +16,22 @@ class Rule:

workload: str
name: str
type: str
description: str
sources: List[Source]
destinations: List[Destination]

INSPECTION: ClassVar[str] = "Inspection"
EGRESS: ClassVar[str] = "Egress"

@property
def is_inspection_rule(self) -> bool:
return self.type == self.INSPECTION

@property
def is_egress_rule(self) -> bool:
return self.type == self.EGRESS

@property
def __suricata_source(self) -> List[SuricataHost]:
def convert_source(source: Source) -> Optional[SuricataHost]:
Expand Down
1 change: 1 addition & 0 deletions aws_network_firewall/schemas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def destination_resolver(entry: dict) -> Destination:
def rule_resolver(workload: str, entry: dict) -> Rule:
return Rule(
workload=workload,
type=entry["Type"],
name=entry["Name"],
description=entry["Description"],
sources=list(map(source_resolver, entry["Sources"])),
Expand Down
3 changes: 3 additions & 0 deletions aws_network_firewall/schemas/environment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ $defs:
properties:
Name:
type: string
Type:
enum: [ "Egress", "Inspection" ]
Description:
type: string
Sources:
Expand All @@ -53,6 +55,7 @@ $defs:
additionalProperties: False
required:
- Name
- Type
- Description
- Sources
- Destinations
Expand Down
71 changes: 71 additions & 0 deletions tests/test_account.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
from aws_network_firewall.account import Account
from aws_network_firewall.cidr_range import CidrRange
from aws_network_firewall.cidr_ranges import CidrRanges
from aws_network_firewall.destination import Destination
from aws_network_firewall.rule import Rule
from aws_network_firewall.source import Source


def generate_rule(type: str) -> Rule:
return Rule(
workload="my-workload",
name="my-rule",
type=type,
description="My description",
sources=[Source(description="my source", cidr="10.0.0.0/24", region=None)],
destinations=[
Destination(
description="my destination",
protocol="TCP",
port=443,
cidr=None,
endpoint=None,
region=None,
)
],
)


def test_no_rules() -> None:
rules = []
account = Account(
name="my-account",
account_id="123412341234",
cidr_ranges=CidrRanges(
cidr_ranges=[CidrRange(region="eu-west-1", value="10.0.0.0/24")]
),
rules=rules,
)
assert len(account.rules) == 0
assert len(account.egress_rules) == 0
assert len(account.inspection_rules) == 0


def test_inspection_rules() -> None:
rules = [generate_rule(Rule.INSPECTION)]
account = Account(
name="my-account",
account_id="123412341234",
cidr_ranges=CidrRanges(
cidr_ranges=[CidrRange(region="eu-west-1", value="10.0.0.0/8")]
),
rules=rules,
)
assert len(account.rules) == 1
assert len(account.egress_rules) == 0
assert len(account.inspection_rules) == 1


def test_egress_rules() -> None:
rules = [generate_rule(Rule.EGRESS)]
account = Account(
name="my-account",
account_id="123412341234",
cidr_ranges=CidrRanges(
cidr_ranges=[CidrRange(region="eu-west-1", value="10.0.0.0/8")]
),
rules=rules,
)
assert len(account.rules) == 1
assert len(account.egress_rules) == 1
assert len(account.inspection_rules) == 0
6 changes: 6 additions & 0 deletions tests/test_rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ def test_rule_with_tls_endpoint() -> None:
rule = Rule(
workload="my-workload",
name="my-rule",
type=Rule.INSPECTION,
description="My description",
sources=[Source(description="my source", cidr="10.0.0.0/24", region=None)],
destinations=[
Expand All @@ -31,6 +32,7 @@ def test_rule_with_tls_wildcard_endpoint() -> None:
rule = Rule(
workload="my-workload",
name="my-rule",
type=Rule.INSPECTION,
description="My description",
sources=[Source(description="my source", cidr="10.0.0.0/24", region=None)],
destinations=[
Expand All @@ -55,6 +57,7 @@ def test_rule_with_tls_endpoint_non_standard_port() -> None:
rule = Rule(
workload="my-workload",
name="my-rule",
type=Rule.INSPECTION,
description="My description",
sources=[Source(description="my source", cidr="10.0.0.0/24", region=None)],
destinations=[
Expand All @@ -80,6 +83,7 @@ def test_rule_with_tcp_cidr() -> None:
rule = Rule(
workload="my-workload",
name="my-rule",
type=Rule.INSPECTION,
description="My description",
sources=[Source(description="my source", cidr="10.0.0.0/24", region=None)],
destinations=[
Expand All @@ -104,6 +108,7 @@ def test_rule_no_cidr() -> None:
rule = Rule(
workload="my-workload",
name="my-rule",
type=Rule.INSPECTION,
description="My description",
sources=[Source(description="my source", cidr="10.0.0.0/24", region=None)],
destinations=[
Expand All @@ -125,6 +130,7 @@ def test_icmp_rule() -> None:
rule = Rule(
workload="my-workload",
name="my-rule",
type=Rule.INSPECTION,
description="My description",
sources=[Source(description="my source", cidr="10.0.0.0/24", region=None)],
destinations=[
Expand Down
1 change: 1 addition & 0 deletions tests/workloads/example-workload/development.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ CidrRanges:
eu-central-1: 192.168.8.0/21
Rules:
- Name: My Rule name
Type: Egress
Description: My rule destination
Sources:
- Description: My Source
Expand Down

0 comments on commit a40d9ad

Please sign in to comment.