diff --git a/README.md b/README.md index 1f140137f7..44b36d427f 100644 --- a/README.md +++ b/README.md @@ -238,6 +238,7 @@ IBAN Number Extractor|Identify International Bank Account Numbers (IBANs) in any [IntelligenceX](https://intelx.io/)|Obtain information from IntelligenceX about identified IP addresses, domains, e-mail addresses and phone numbers.|Tiered API Interesting File Finder|Identifies potential files of interest, e.g. office documents, zip files.|Internal [Internet Storm Center](https://isc.sans.edu)|Check if an IP address is malicious according to SANS ISC.|Free API +[ip2location.io](https://www.ip2location.io/)|Queries ip2location.io to identify geolocation of IP Addresses using ip2location.io API|Tiered API [ipapi.co](https://ipapi.co/)|Queries ipapi.co to identify geolocation of IP Addresses using ipapi.co API|Tiered API [ipapi.com](https://ipapi.com/)|Queries ipapi.com to identify geolocation of IP Addresses using ipapi.com API|Tiered API [IPInfo.io](https://ipinfo.io)|Identifies the physical location of IP addresses identified using ipinfo.io.|Tiered API diff --git a/modules/sfp_ip2locationio.py b/modules/sfp_ip2locationio.py new file mode 100644 index 0000000000..429b8f3de1 --- /dev/null +++ b/modules/sfp_ip2locationio.py @@ -0,0 +1,137 @@ +# -*- coding: utf-8 -*- +# ------------------------------------------------------------------------------- +# Name: sfp_ip2locationio +# Purpose: SpiderFoot plug-in to identify the Geo-location of IP addresses +# identified by other modules using ip2location.io +# +# Author: IP2Location +# +# Created: 25/10/2023 +# Copyright: (c) Steve Micallef +# Licence: GPL +# ------------------------------------------------------------------------------- + +import json +import time + +from spiderfoot import SpiderFootEvent, SpiderFootPlugin + + +class sfp_ip2locationio(SpiderFootPlugin): + + meta = { + 'name': "ip2location.io", + 'summary': "Queries ip2location.io to identify geolocation of IP Addresses using ip2location.io API", + 'flags': ["apikey"], + 'useCases': ["Footprint", "Investigate", "Passive"], + 'categories': ["Real World"], + 'dataSource': { + 'website': "https://www.ip2location.io/", + 'model': "FREE_AUTH_LIMITED", + 'references': [ + "https://www.ip2location.io/ip2location-documentation" + ], + 'apiKeyInstructions': [ + "Visit https://www.ip2location.io/", + "Register a free account", + "Login from https://www.ip2location.io/log-in and go to your dashboard", + "Your API Key will be listed under API Key section.", + ], + 'favIcon': "https://www.ip2location.io/favicon.ico", + 'logo': "https://cdn.ip2location.io/assets/img/icons/apple-touch-icon.png", + 'description': "IP2Location.io provides a fast and accurate IP Geolocation API tool " + "to determine a user's location and use the geolocation information in different use cases. " + } + } + + # Default options + opts = { + 'api_key': '', + } + + # Option descriptions + optdescs = { + 'api_key': "ip2location.io API Key.", + } + + results = None + + def setup(self, sfc, userOpts=dict()): + self.sf = sfc + self.results = self.tempStorage() + + for opt in list(userOpts.keys()): + self.opts[opt] = userOpts[opt] + + # What events is this module interested in for input + def watchedEvents(self): + return [ + "IP_ADDRESS", + "IPV6_ADDRESS" + ] + + # What events this module produces + # This is to support the end user in selecting modules based on events + # produced. + def producedEvents(self): + return [ + "GEOINFO", + "RAW_RIR_DATA" + ] + + def query(self, qry): + queryString = f"https://api.ip2location.io/?key={self.opts['api_key']}&ip={qry}" + + res = self.sf.fetchUrl(queryString, + timeout=self.opts['_fetchtimeout'], + useragent=self.opts['_useragent']) + time.sleep(1.5) + + if ('error' in res): + self.info(f"No ip2locationio data found for {qry}") + return None + try: + return json.loads(res['content']) + except Exception as e: + self.debug(f"Error processing JSON response: {e}") + return None + + # Handle events sent to this module + def handleEvent(self, event): + eventName = event.eventType + srcModuleName = event.module + eventData = event.data + + self.debug(f"Received event, {eventName}, from {srcModuleName}") + + if self.errorState: + return + + if self.opts['api_key'] == "": + self.error("You enabled sfp_ip2locationio but did not set an API key!") + self.errorState = True + return + + if eventData in self.results: + self.debug(f"Skipping {eventData}, already checked.") + return + + self.results[eventData] = True + + data = self.query(eventData) + if not data: + return + + if data.get('country_name'): + location = ', '.join(filter(None, [data.get('city_name'), data.get('region_name'), data.get('country_name'), data.get('country_code')])) + evt = SpiderFootEvent('GEOINFO', location, self.__name__, event) + self.notifyListeners(evt) + + if data.get('latitude') and data.get('longitude'): + evt = SpiderFootEvent("PHYSICAL_COORDINATES", f"{data.get('latitude')}, {data.get('longitude')}", self.__name__, event) + self.notifyListeners(evt) + + evt = SpiderFootEvent('RAW_RIR_DATA', str(data), self.__name__, event) + self.notifyListeners(evt) + +# End of sfp_ip2locationio class diff --git a/test/unit/modules/test_sfp_ip2locationio.py b/test/unit/modules/test_sfp_ip2locationio.py new file mode 100644 index 0000000000..7ac74cefcc --- /dev/null +++ b/test/unit/modules/test_sfp_ip2locationio.py @@ -0,0 +1,60 @@ +# test_sfp_template.py +import pytest +import unittest + +from modules.sfp_ip2locationio import sfp_ip2locationio +from sflib import SpiderFoot +from spiderfoot import SpiderFootEvent, SpiderFootTarget + + +@pytest.mark.usefixtures +class TestModuletemplate(unittest.TestCase): + """ + Test modules.sfp_ip2locationio + """ + + def test_opts(self): + module = sfp_ip2locationio() + self.assertEqual(len(module.opts), len(module.optdescs)) + + def test_setup(self): + """ + Test setup(self, sfc, userOpts=dict()) + """ + sf = SpiderFoot(self.default_options) + + module = sfp_ip2locationio() + module.setup(sf, dict()) + + def test_watchedEvents_should_return_list(self): + module = sfp_ip2locationio() + self.assertIsInstance(module.watchedEvents(), list) + + def test_producedEvents_should_return_list(self): + module = sfp_ip2locationio() + self.assertIsInstance(module.producedEvents(), list) + + def test_handleEvent_no_api_key_should_set_errorState(self): + """ + Test handleEvent(self, event) + """ + sf = SpiderFoot(self.default_options) + + module = sfp_ip2locationio() + module.setup(sf, dict()) + + target_value = 'example target value' + target_type = 'IP_ADDRESS' + target = SpiderFootTarget(target_value, target_type) + module.setTarget(target) + + event_type = 'ROOT' + event_data = 'example data' + event_module = '' + source_event = '' + evt = SpiderFootEvent(event_type, event_data, event_module, source_event) + + result = module.handleEvent(evt) + + self.assertIsNone(result) + self.assertTrue(module.errorState)