Skip to content

Commit

Permalink
Add IPInfo Privacy enrichment providers (#680)
Browse files Browse the repository at this point in the history
* ipinfo privacy
  • Loading branch information
debugmiller authored Feb 15, 2023
1 parent 1a0ed48 commit d1d2f72
Show file tree
Hide file tree
Showing 7 changed files with 695 additions and 3 deletions.
58 changes: 58 additions & 0 deletions global_helpers/global_helpers_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,64 @@ def test_precedence(self):
self.assertEqual(response, "found")


class TestIpInfoHelpersPrivacy(unittest.TestCase):
def setUp(self):
self.match_field = "clientIp"
self.event = {
"p_enrichment": {
p_i_h.IPINFO_PRIVACY_LUT_NAME: {
self.match_field: {
"hosting": False,
"proxy": False,
"tor": False,
"vpn": True,
"relay": False,
"service": "VPN Gate",
}
}
}
}
self.ip_info = p_i_h.get_ipinfo_privacy(self.event)

def test_hosting(self):
hosting = self.ip_info.hosting(self.match_field)
self.assertEqual(hosting, False)

def test_proxy(self):
proxy = self.ip_info.proxy(self.match_field)
self.assertEqual(proxy, False)

def test_tor(self):
tor = self.ip_info.tor(self.match_field)
self.assertEqual(tor, False)

def test_vpn(self):
vpn = self.ip_info.vpn(self.match_field)
self.assertEqual(vpn, True)

def test_relay(self):
relay = self.ip_info.relay(self.match_field)
self.assertEqual(relay, False)

def test_service(self):
service = self.ip_info.service(self.match_field)
self.assertEqual(service, "VPN Gate")

def test_not_found(self):
self.assertEqual(self.ip_info.service("not_found"), None)

def test_context(self):
expected = {
"Hosting": False,
"Proxy": False,
"Tor": False,
"VPN": True,
"Relay": False,
"Service": "VPN Gate",
}
self.assertEqual(expected, self.ip_info.context(self.match_field))


class TestGeoInfoFromIP(unittest.TestCase):
def setUp(self):
self.match_field = "clientIp"
Expand Down
46 changes: 45 additions & 1 deletion global_helpers/panther_ipinfo_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

IPINFO_LOCATION_LUT_NAME = "ipinfo_location"
IPINFO_ASN_LUT_NAME = "ipinfo_asn"
IPINFO_PRIVACY_LUT_NAME = "ipinfo_privacy"


class PantherIPInfoException(Exception):
Expand Down Expand Up @@ -84,6 +85,42 @@ def context(self, match_field) -> object:
}


class IPInfoPrivacy:
"""Helper to get IPInfo Privacy information for enriched fields"""

def __init__(self, event):
self.ipinfo_privacy = deep_get(event, "p_enrichment", IPINFO_PRIVACY_LUT_NAME)
self.event = event

def hosting(self, match_field) -> bool:
return deep_get(self.ipinfo_privacy, match_field, "hosting")

def proxy(self, match_field) -> bool:
return deep_get(self.ipinfo_privacy, match_field, "proxy")

def tor(self, match_field) -> bool:
return deep_get(self.ipinfo_privacy, match_field, "tor")

def vpn(self, match_field) -> bool:
return deep_get(self.ipinfo_privacy, match_field, "vpn")

def relay(self, match_field) -> bool:
return deep_get(self.ipinfo_privacy, match_field, "relay")

def service(self, match_field) -> str:
return deep_get(self.ipinfo_privacy, match_field, "service")

def context(self, match_field) -> object:
return {
"Hosting": self.hosting(match_field),
"Proxy": self.proxy(match_field),
"Tor": self.tor(match_field),
"VPN": self.vpn(match_field),
"Relay": self.relay(match_field),
"Service": self.service(match_field),
}


def get_ipinfo_location(event):
"""Returns an IPInfoLocation object for the event or None if it is not available"""
if deep_get(event, "p_enrichment", IPINFO_LOCATION_LUT_NAME):
Expand All @@ -98,11 +135,18 @@ def get_ipinfo_asn(event):
return None


def get_ipinfo_privacy(event):
"""Returns an IPInfoPrivacy object for the event or None if it is not available"""
if deep_get(event, "p_enrichment", IPINFO_PRIVACY_LUT_NAME):
return IPInfoPrivacy(event)
return None


def geoinfo_from_ip(event, match_field):
"""Returns a dictionary with geolocation information that is the same format as
panther_oss_helper.geoinfo_from_ip() with the following differences:
- instead of poviding the ip, you must provide the event and the match_field
- instead of providing the ip, you must provide the event and the match_field
- the fields "hostname" and "anycast" are not included in the return object
"""
location = get_ipinfo_location(event)
Expand Down
2 changes: 1 addition & 1 deletion lookup_tables/ipinfo/ipinfo_asn_datalake.yml
Original file line number Diff line number Diff line change
Expand Up @@ -291,4 +291,4 @@ LogTypeMap:
- 'ip_address'
- LogType: Zoom.Activity
Selectors:
- 'ip_address'
- 'ip_address'
2 changes: 1 addition & 1 deletion lookup_tables/ipinfo/ipinfo_location_datalake.yml
Original file line number Diff line number Diff line change
Expand Up @@ -291,4 +291,4 @@ LogTypeMap:
- 'ip_address'
- LogType: Zoom.Activity
Selectors:
- 'ip_address'
- 'ip_address'
Loading

0 comments on commit d1d2f72

Please sign in to comment.