Skip to content

Commit

Permalink
Implement Threat Intelligence lists for Hierarchical Firewall Policie…
Browse files Browse the repository at this point in the history
…s, as source_prefix/destination_prefix.

PiperOrigin-RevId: 557933856
  • Loading branch information
abhindes authored and Capirca Team committed Aug 17, 2023
1 parent a2adab8 commit 0393eb2
Show file tree
Hide file tree
Showing 2 changed files with 315 additions and 2 deletions.
62 changes: 60 additions & 2 deletions capirca/lib/gcp_hf.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,17 @@ class Term(gcp.Term):

_TERM_DESTINATION_PORTS_LIMIT = 256

# http://cloud/firewall/docs/firewall-policies-rule-details#threat-intelligence-fw-policy
_THREAT_INTELLIGENCE_LIST_NAME = frozenset([
'iplist-tor-exit-nodes',
'iplist-known-malicious-ips',
'iplist-search-engines-crawlers',
'iplist-public-clouds',
'iplist-public-clouds-aws',
'iplist-public-clouds-azure',
'iplist-public-clouds-gcp',
])

def __init__(self,
term,
address_family='inet',
Expand Down Expand Up @@ -117,6 +128,20 @@ def _ValidateTerm(self):
'Hierarchical firewall rule has incorrect inet_version for rule: %s' %
self.term.name)

# Validate if src/dest prefix fields are valid Threat Intelligence lists.
for item in self.term.source_prefix:
if item not in self._THREAT_INTELLIGENCE_LIST_NAME:
raise gcp.TermError(
'Hierarchical firewall rule has unrecognized threat'
'intelligence list %s as source prefix, ' % item
)
for item in self.term.destination_prefix:
if item not in self._THREAT_INTELLIGENCE_LIST_NAME:
raise gcp.TermError(
'Hierarchical firewall rule has unrecognized threat'
'intelligence list %s as destination prefix, ' % item
)

def ConvertToDict(self, priority_index):
"""Converts term to dict representation of SecurityPolicy.Rule JSON format.
Expand Down Expand Up @@ -234,6 +259,13 @@ def ConvertToDict(self, priority_index):
any_ip = [nacaddr.IPv6('::/0')]

if self.term.direction == 'EGRESS':
# Threat Intelligence lists as destination_prefix.
# These can be combined with IPs in a single rule, but discouraged.
if self.term.destination_prefix:
term_dict['match'][
'destThreatIntelligences'
] = self.term.destination_prefix

daddrs = self.term.GetAddressOfVersion('destination_address', ip_version)
daddrs = gcp.FilterIPv4InIPv6FormatAddrs(daddrs)

Expand All @@ -246,6 +278,14 @@ def ConvertToDict(self, priority_index):
'because there are no addresses of that family.', self.term.name,
self.address_family)
return []

# Return early if we only have dest_prefix set, but no dest addresses.
if not daddrs and self.term.destination_prefix:
# We only return this for the IPv4 rule
if mixed_policy_inet6_term:
return []
return [term_dict]

# This should only happen if there were no addresses set originally.
if not daddrs:
daddrs = any_ip
Expand All @@ -269,6 +309,11 @@ def ConvertToDict(self, priority_index):
rules.append(rule)
priority_index += 1
else:
# Threat Intelligence lists as source_prefix.
# These can be combined with IPs in a single rule, but discouraged.
if self.term.source_prefix:
term_dict['match']['srcThreatIntelligences'] = self.term.source_prefix

saddrs = gcp.FilterIPv4InIPv6FormatAddrs(
self.term.GetAddressOfVersion('source_address', ip_version))

Expand All @@ -281,6 +326,14 @@ def ConvertToDict(self, priority_index):
'because there are no addresses of that family.', self.term.name,
self.address_family)
return []

# Return early if we only have source_prefix set, but no source addresses.
if not saddrs and self.term.source_prefix:
# We only return this for the IPv4 rule
if mixed_policy_inet6_term:
return []
return [term_dict]

# This should only happen if there were no addresses set originally.
if not saddrs:
saddrs = any_ip
Expand Down Expand Up @@ -333,7 +386,7 @@ def _BuildTokens(self):

supported_tokens |= {
'destination_tag', 'expiration', 'source_tag', 'translated',
'target_resources', 'logging'
'target_resources', 'logging', 'source_prefix', 'destination_prefix',
}

supported_tokens -= {
Expand Down Expand Up @@ -563,10 +616,15 @@ def GetRuleTupleCount(dict_term: Dict[str, Any], api_version):
addresses_count = len(
config.get(dest_ip_range, []) + config.get(src_ip_range, []))

prefixes_count = len(
config.get('srcThreatIntelligences', [])
+ config.get('destThreatIntelligences', [])
)

for l4config in config.get(layer_4_config, []):
for _ in l4config.get('ports', []):
layer4_count += 1
if l4config.get('ipProtocol'):
layer4_count += +1

return addresses_count + layer4_count + targets_count
return addresses_count + layer4_count + targets_count + prefixes_count
Loading

0 comments on commit 0393eb2

Please sign in to comment.