Skip to content

Commit

Permalink
feat: support rule types (#3)
Browse files Browse the repository at this point in the history
**Issue #, if available:**

## Description of changes:

When you need to split egress and inspection traffic you also need to
split the rules. By introducing the rules per type we enable this
behaviour.

**Checklist**

<!--- Leave unchecked if your change doesn't seem to apply -->

* [x] Update tests
* [ ] Update docs
* [x] PR title follows [conventional commit
semantics](https://www.conventionalcommits.org/en/v1.0.0-beta.2/#commit-message-for-a-fix-using-an-optional-issue-number)

By submitting this pull request, I confirm that you can use, modify,
copy, and redistribute this contribution, under the terms of your
choice.
  • Loading branch information
Nr18 authored Jul 24, 2023
1 parent 4338248 commit af013d5
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 1 deletion.
8 changes: 8 additions & 0 deletions aws_network_firewall/account.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,11 @@ def __enrich_rule(self, rule: Rule) -> Rule:
@property
def rules(self) -> List[Rule]:
return self.__rules

@property
def inspection_rules(self) -> List[Rule]:
return list(filter(lambda rule: rule.is_inspection_rule, self.rules))

@property
def egress_rules(self) -> List[Rule]:
return list(filter(lambda rule: rule.is_egress_rule, self.rules))
14 changes: 13 additions & 1 deletion aws_network_firewall/rule.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations

from dataclasses import dataclass
from typing import List, Optional
from typing import List, Optional, ClassVar

from aws_network_firewall.source import Source
from aws_network_firewall.destination import Destination
Expand All @@ -16,10 +16,22 @@ class Rule:

workload: str
name: str
type: str
description: str
sources: List[Source]
destinations: List[Destination]

INSPECTION: ClassVar[str] = "Inspection"
EGRESS: ClassVar[str] = "Egress"

@property
def is_inspection_rule(self) -> bool:
return self.type == self.INSPECTION

@property
def is_egress_rule(self) -> bool:
return self.type == self.EGRESS

@property
def __suricata_source(self) -> List[SuricataHost]:
def convert_source(source: Source) -> Optional[SuricataHost]:
Expand Down
1 change: 1 addition & 0 deletions aws_network_firewall/schemas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def destination_resolver(entry: dict) -> Destination:
def rule_resolver(workload: str, entry: dict) -> Rule:
return Rule(
workload=workload,
type=entry["Type"],
name=entry["Name"],
description=entry["Description"],
sources=list(map(source_resolver, entry["Sources"])),
Expand Down
3 changes: 3 additions & 0 deletions aws_network_firewall/schemas/environment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ $defs:
properties:
Name:
type: string
Type:
enum: [ "Egress", "Inspection" ]
Description:
type: string
Sources:
Expand All @@ -53,6 +55,7 @@ $defs:
additionalProperties: False
required:
- Name
- Type
- Description
- Sources
- Destinations
Expand Down
71 changes: 71 additions & 0 deletions tests/test_account.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
from aws_network_firewall.account import Account
from aws_network_firewall.cidr_range import CidrRange
from aws_network_firewall.cidr_ranges import CidrRanges
from aws_network_firewall.destination import Destination
from aws_network_firewall.rule import Rule
from aws_network_firewall.source import Source


def generate_rule(type: str) -> Rule:
return Rule(
workload="my-workload",
name="my-rule",
type=type,
description="My description",
sources=[Source(description="my source", cidr="10.0.0.0/24", region=None)],
destinations=[
Destination(
description="my destination",
protocol="TCP",
port=443,
cidr=None,
endpoint=None,
region=None,
)
],
)


def test_no_rules() -> None:
rules = []
account = Account(
name="my-account",
account_id="123412341234",
cidr_ranges=CidrRanges(
cidr_ranges=[CidrRange(region="eu-west-1", value="10.0.0.0/24")]
),
rules=rules,
)
assert len(account.rules) == 0
assert len(account.egress_rules) == 0
assert len(account.inspection_rules) == 0


def test_inspection_rules() -> None:
rules = [generate_rule(Rule.INSPECTION)]
account = Account(
name="my-account",
account_id="123412341234",
cidr_ranges=CidrRanges(
cidr_ranges=[CidrRange(region="eu-west-1", value="10.0.0.0/8")]
),
rules=rules,
)
assert len(account.rules) == 1
assert len(account.egress_rules) == 0
assert len(account.inspection_rules) == 1


def test_egress_rules() -> None:
rules = [generate_rule(Rule.EGRESS)]
account = Account(
name="my-account",
account_id="123412341234",
cidr_ranges=CidrRanges(
cidr_ranges=[CidrRange(region="eu-west-1", value="10.0.0.0/8")]
),
rules=rules,
)
assert len(account.rules) == 1
assert len(account.egress_rules) == 1
assert len(account.inspection_rules) == 0
6 changes: 6 additions & 0 deletions tests/test_rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ def test_rule_with_tls_endpoint() -> None:
rule = Rule(
workload="my-workload",
name="my-rule",
type=Rule.INSPECTION,
description="My description",
sources=[Source(description="my source", cidr="10.0.0.0/24", region=None)],
destinations=[
Expand All @@ -31,6 +32,7 @@ def test_rule_with_tls_wildcard_endpoint() -> None:
rule = Rule(
workload="my-workload",
name="my-rule",
type=Rule.INSPECTION,
description="My description",
sources=[Source(description="my source", cidr="10.0.0.0/24", region=None)],
destinations=[
Expand All @@ -55,6 +57,7 @@ def test_rule_with_tls_endpoint_non_standard_port() -> None:
rule = Rule(
workload="my-workload",
name="my-rule",
type=Rule.INSPECTION,
description="My description",
sources=[Source(description="my source", cidr="10.0.0.0/24", region=None)],
destinations=[
Expand All @@ -80,6 +83,7 @@ def test_rule_with_tcp_cidr() -> None:
rule = Rule(
workload="my-workload",
name="my-rule",
type=Rule.INSPECTION,
description="My description",
sources=[Source(description="my source", cidr="10.0.0.0/24", region=None)],
destinations=[
Expand All @@ -104,6 +108,7 @@ def test_rule_no_cidr() -> None:
rule = Rule(
workload="my-workload",
name="my-rule",
type=Rule.INSPECTION,
description="My description",
sources=[Source(description="my source", cidr="10.0.0.0/24", region=None)],
destinations=[
Expand All @@ -125,6 +130,7 @@ def test_icmp_rule() -> None:
rule = Rule(
workload="my-workload",
name="my-rule",
type=Rule.INSPECTION,
description="My description",
sources=[Source(description="my source", cidr="10.0.0.0/24", region=None)],
destinations=[
Expand Down
1 change: 1 addition & 0 deletions tests/workloads/example-workload/development.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ CidrRanges:
eu-central-1: 192.168.8.0/21
Rules:
- Name: My Rule name
Type: Egress
Description: My rule destination
Sources:
- Description: My Source
Expand Down

0 comments on commit af013d5

Please sign in to comment.