-
Notifications
You must be signed in to change notification settings - Fork 62
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix/sg known ip sources #97
base: dev
Are you sure you want to change the base?
Changes from 5 commits
b27b41a
e970bb1
1377437
cfad136
585e52a
b828715
9fc5fbf
35dc6fa
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,12 +8,14 @@ | |
from library.utility import jsonDumps | ||
from library.aws.s3 import S3Operations | ||
from library.aws.utility import convert_tags | ||
from library.config import Config | ||
|
||
|
||
class RestrictionStatus(Enum): | ||
Restricted = "restricted" | ||
OpenCompletely = "open_completely" | ||
OpenPartly = "open_partly" | ||
SafeIP = "safe_ips" | ||
|
||
|
||
class SecurityGroupOperations: | ||
|
@@ -372,6 +374,29 @@ def __str__(self): | |
perms = ", ".join([str(perm) for perm in self.permissions]) | ||
return f"{self.__class__.__name__}(Name={self.name}, Id={self.id}, Permissions=[{perms}])" | ||
|
||
def validate_known_ip_soource(self, source_ip): | ||
""" | ||
|
||
:param source_ip: ip address | ||
:return: boolean | ||
""" | ||
config = Config() | ||
known_ip_sources = config.sg.known_ip_sources | ||
source_cidr = ipaddress.ip_network(source_ip) | ||
|
||
for known_ip in known_ip_sources: | ||
known_ip_cidr = ipaddress.ip_network(known_ip) | ||
if known_ip_cidr == source_cidr: | ||
return True | ||
elif source_ip.endswith("/32"): | ||
for ip in known_ip_cidr: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated and tested. |
||
if str(source_cidr) == str(ipaddress.ip_network(ip)): | ||
return True | ||
# ipaddress.subnet_of() function new to Python 3.7. Not available in 3.6 | ||
"""elif source_cidr.subnet_of(known_ip_cidr): | ||
return True""" | ||
return False | ||
|
||
def restriction_status(self, cidr): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm looking on this code https://github.com/dowjones/hammer/blob/dev/hammer/identification/lambdas/sg-issues-identification/describe_sec_grps_unrestricted_access.py#L57, it checks if the group is restricted and doesn't push it to db if it is. Should we do the same for safe groups? Any reason to save them to DB? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As per testing it is not storing Safe groups in DDB. For Safe groups it is returning with Restricted status. |
||
""" | ||
Check restriction status of cidr | ||
|
@@ -380,11 +405,17 @@ def restriction_status(self, cidr): | |
|
||
:return: RestrictionStatus with check result | ||
""" | ||
is_known_ip = self.validate_known_ip_soource(cidr) | ||
|
||
status = RestrictionStatus.Restricted | ||
if cidr.endswith("/0"): | ||
status = RestrictionStatus.OpenCompletely | ||
elif ipaddress.ip_network(cidr).is_global: | ||
status = RestrictionStatus.OpenPartly | ||
|
||
if is_known_ip: | ||
status = RestrictionStatus.SafeIP | ||
|
||
logging.debug(f"Checked '{cidr}' - '{status.value}'") | ||
return status | ||
|
||
|
@@ -406,10 +437,11 @@ def check(self, restricted_ports): | |
logging.debug(f"Checking '{perm.protocol}' '{perm.from_port}-{perm.to_port}' ports for {ip_range}") | ||
# first condition - CIDR is Global/Public | ||
status = self.restriction_status(ip_range.cidr) | ||
if status == RestrictionStatus.Restricted: | ||
logging.debug(f"Skipping restricted '{ip_range}'") | ||
if status in (RestrictionStatus.Restricted, RestrictionStatus.SafeIP): | ||
logging.debug(f"Skipping restricted/safe IP address '{ip_range}'") | ||
continue | ||
# second - check if ports from `restricted_ports` list has intersection with ports from FromPort..ToPort range | ||
# second - check if ports from `restricted_ports` list has intersection with ports from FromPort.. | ||
# ToPort range | ||
if perm.from_port is None or perm.to_port is None: | ||
logging.debug(f"Marking world-wide open all ports from '{ip_range}'") | ||
ip_range.status = status | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe just
safe
to be consistent in naming?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated and tested.