Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: missing egress rules #5

Merged
merged 1 commit into from
Jul 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions aws_network_firewall/rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def is_egress_rule(self) -> bool:
@property
def __suricata_source(self) -> List[SuricataHost]:
def convert_source(source: Source) -> Optional[SuricataHost]:
return SuricataHost(address=source.cidr) if source.cidr else None
return SuricataHost(address=source.cidr, port=0) if source.cidr else None

return list(filter(None, map(convert_source, self.sources)))

Expand Down Expand Up @@ -76,15 +76,15 @@ def __resolve_options(self, destination: Destination) -> List[SuricataOption]:
SuricataOption(name="sid", value="XXX", quoted_value=False),
]

def __resolve_rule(self, destination: Destination) -> Optional[SuricataRule]:
if not destination.cidr:
return None

def __resolve_rule(self, destination: Destination) -> SuricataRule:
return SuricataRule(
action="pass",
protocol=destination.protocol,
sources=self.__suricata_source,
destination=SuricataHost(address=destination.cidr, port=destination.port),
destination=SuricataHost(
address=destination.cidr if destination.cidr else "",
port=destination.port if destination.port else 0,
),
options=self.__resolve_options(destination),
)

Expand Down
5 changes: 3 additions & 2 deletions aws_network_firewall/suricata/host.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@ class Host:
Understands a source and/or destination defenition
"""

address: str = "any"
port: Optional[int] = None
address: str
port: int

def __post_init__(self):
self.port = "any" if not self.port else self.port
self.address = "any" if not self.address else self.address

def __str__(self):
return f"{self.address} {self.port}"
29 changes: 16 additions & 13 deletions tests/test_rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def test_rule_with_tcp_cidr() -> None:
)


def test_rule_no_cidr() -> None:
def test_icmp_rule() -> None:
rule = Rule(
workload="my-workload",
name="my-rule",
Expand All @@ -114,38 +114,41 @@ def test_rule_no_cidr() -> None:
destinations=[
Destination(
description="my destination",
protocol="TCP",
port=443,
cidr=None,
protocol="ICMP",
port=None,
cidr="10.0.1.0/24",
endpoint=None,
region=None,
)
],
)

assert "" == str(rule)
assert (
'pass icmp 10.0.0.0/24 any <> 10.0.1.0/24 any (msg: "my-workload | my-rule"; rev: 1; sid: XXX;)'
== str(rule)
)


def test_icmp_rule() -> None:
def test_egress_tls_rule() -> None:
rule = Rule(
workload="my-workload",
name="my-rule",
type=Rule.INSPECTION,
type=Rule.EGRESS,
description="My description",
sources=[Source(description="my source", cidr="10.0.0.0/24", region=None)],
sources=[Source(description="my source", cidr=None, region="eu-west-1")],
destinations=[
Destination(
description="my destination",
protocol="ICMP",
port=None,
cidr="10.0.1.0/24",
endpoint=None,
protocol="TLS",
port=443,
cidr=None,
endpoint="xebia.com",
region=None,
)
],
)

assert (
'pass icmp 10.0.0.0/24 any <> 10.0.1.0/24 any (msg: "my-workload | my-rule"; rev: 1; sid: XXX;)'
'pass tls any -> any 443 (tls.sni; tls.version: 1.2; tls.version: 1.3; content: "xebia.com"; nocase; startswith; endswith; msg: "my-workload | my-rule"; rev: 1; sid: XXX;)'
== str(rule)
)
Loading