diff --git a/.gitignore b/.gitignore index 67ed3f5..edd157e 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,7 @@ __pycache__/ *.py[cod] *$py.class +.DS_Store # C extensions *.so @@ -89,6 +90,8 @@ venv/ ENV/ env.bak/ venv.bak/ +bin/ +include/ # Spyder project settings .spyderproject diff --git a/cortexutils/extractor.py b/cortexutils/extractor.py index fed739b..7bc5b1d 100644 --- a/cortexutils/extractor.py +++ b/cortexutils/extractor.py @@ -1,5 +1,7 @@ #!/usr/bin/env python from builtins import str as unicode +import ipaddress +import tld import re @@ -25,8 +27,24 @@ def __init__(self, ignore=None): self.ignore = ignore self.regex = self.__init_regex() - @staticmethod - def __init_regex(): + def __valid_ip(self, value): + try: + if not ipaddress.ip_address(unicode(value)).is_global: + return None + except: + return None + return value + + def __valid_domain(self, value): + return tld.get_fld(value, fix_protocol=True, fail_silently=True) + + def __valid_fqdn(self, value): + parts = tld.get_tld(value, fix_protocol=True, fail_silently=True, as_object=True) + if parts and len(parts.subdomain) > 1: + return parts.parsed_url.netloc + return None + + def __init_regex(self): """ Returns compiled regex list. @@ -35,9 +53,15 @@ def __init_regex(): """ # IPv4 + ft_r = '(?:' + \ + '(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.)' + \ + '{3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?' + \ + ')' regex = [{ 'type': 'ip', - 'regex': re.compile(r'[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}') + 'regex': re.compile(r'[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}'), + 'ft_regex': re.compile(r'{}'.format(ft_r)), + 'validator': self.__valid_ip }] # IPv6 @@ -66,21 +90,30 @@ def __init_regex(): }) # URL + ft_r = '(' + \ + '(?:(?:meows?|h[Xxt]{2}ps?)://)?(?:(?:(?:[a-zA-Z0-9\-]+\[?\.\]?)+[a-z]{2,8})' + \ + '|(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\[?\.\]?){3}(?:25[0-5]|2[0-4][0-9]' + \ + '|[01]?[0-9][0-9]?))/[^\s\<"]+' + \ + ')' regex.append({ 'type': 'url', - 'regex': re.compile(r'^(http://|https://)') + 'regex': re.compile(r'^(http://|https://)'), + 'ft_regex': re.compile(r'{}'.format(ft_r)) }) # domain regex.append({ 'type': 'domain', - 'regex': re.compile(r'^(?!http://|https://)^[\w\-]+\.[a-zA-Z]+$') + 'regex': re.compile(r'^(?!http://|https://)^[\w\-]+\.[a-zA-Z]+$'), + 'ft_regex': re.compile(r'[\s\>\\ 0: + else: + #no hits of string matching do we'll parse with full text + types = self.__check_extraction(iterable) + for dt in types: + for val in types[dt]: results.append({ 'dataType': dt, - 'data': item + 'data': val }) + + elif isinstance(iterable, list): + for item in iterable: + results.extend(self.check_iterable(item)) elif isinstance(iterable, dict): for _, item in iterable.items(): - if isinstance(item, list) or isinstance(item, dict): - results.extend(self.check_iterable(item)) - else: - dt = self.__checktype(item) - if len(dt) > 0: - results.append({ - 'dataType': dt, - 'data': item - }) + results.extend(self.check_iterable(item)) else: raise TypeError('Not supported type.') diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..54a1da2 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,2 @@ +ipaddress==1.0.22 +tld==0.9.3 diff --git a/tests/test_suite_extractor.py b/tests/test_suite_extractor.py index 782e38e..7d2eec0 100644 --- a/tests/test_suite_extractor.py +++ b/tests/test_suite_extractor.py @@ -3,7 +3,6 @@ This contains the unit tests for the extractor. """ import unittest - from cortexutils.extractor import Extractor @@ -43,7 +42,7 @@ def test_single_url(self): def test_single_ipv4(self): self.assertEqual( - self.extractor.check_string(value='10.0.0.1'), + self.extractor.check_string(value='8.8.8.8'), 'ip', 'ipv4 single string: wrong data type.' ) @@ -98,11 +97,65 @@ def test_single_regkey(self): 'registry single string: wrong data type.' ) + def test_text_ip(self): + text = 'This is a string with an IP 8.8.8.8 embedded' + self.assertEqual( + self.extractor.extract_matches(value=text), + { + 'ip': ['8.8.8.8'] + }, + 'ip in text: failed.' + ) + + def test_text_url(self): + text = 'This is a string with a url http://www.somebaddomain.com/badness/bad embedded' + self.assertEqual( + self.extractor.extract_matches(value=text), + { + 'url': ['http://www.somebaddomain.com/badness/bad'], + 'domain': [u'somebaddomain.com'], + 'fqdn': [u'www.somebaddomain.com'] + }, + 'url in text: failed.' + ) + + def test_text_hash(self): + text = '''b373bd6b144e7846f45a1e47eed380b7 This is a string with an hashes b373bd6b144e7846f45a1e47ced380b8 and + 7ef8b3dc5bf40268f66721a89b95f4c5f0cc08e34836f8c3a007ceed193654d4 embedded + ''' + self.assertEqual( + self.extractor.extract_matches(value=text), + { + 'hash': [ + 'b373bd6b144e7846f45a1e47eed380b7', + 'b373bd6b144e7846f45a1e47ced380b8', + '7ef8b3dc5bf40268f66721a89b95f4c5f0cc08e34836f8c3a007ceed193654d4' + ] + }, + 'hash in text: failed.' + ) + + def test_text_email(self): + text = 'This is a string with a url myemail@gmail.com and joe.smith@somecorp.org embedded' + self.assertEqual( + self.extractor.extract_matches(value=text), + { + 'mail': [ + 'myemail@gmail.com', + 'joe.smith@somecorp.org' + ] + }, + 'email in text: failed.' + ) + def test_iterable(self): l_real = self.extractor.check_iterable({ 'results': [ { - 'This is an totally unimportant key': '127.0.0.1' + 'This is an totally unimportant key': '8.8.8.8' + }, + { + 'This is an IP in text': 'This is a really bad IP 8.8.8.9 serving malware' }, { 'Totally nested!': ['https://nestedurl.verynested.com'] @@ -113,34 +166,38 @@ def test_iterable(self): }) l_expected = [ { - 'type': 'hash', - 'value': '7ef8b3dc5bf40268f66721a89b95f4c5f0cc08e34836f8c3a007ceed193654d4' + 'dataType': 'hash', + 'data': '7ef8b3dc5bf40268f66721a89b95f4c5f0cc08e34836f8c3a007ceed193654d4' + }, + { + 'dataType': 'ip', + 'data': '8.8.8.8' }, { - 'type': 'ip', - 'value': '127.0.0.1' + 'dataType': 'ip', + 'data': '8.8.8.9' }, { - 'type': 'url', - 'value': 'https://nestedurl.verynested.com' + 'dataType': 'url', + 'data': 'https://nestedurl.verynested.com' }, { - 'type': 'domain', - 'value': 'google.de' + 'dataType': 'domain', + 'data': 'google.de' }, { - 'type': 'domain', - 'value': 'bing.com' + 'dataType': 'domain', + 'data': 'bing.com' }, { - 'type': 'fqdn', - 'value': 'www.fqdn.de' + 'dataType': 'fqdn', + 'data': 'www.fqdn.de' } ] # Sorting the lists - l_real = sorted(l_real, key=lambda k: k['value']) - l_expected = sorted(l_expected, key=lambda k: k['value']) + l_real = sorted(l_real, key=lambda k: k['data']) + l_expected = sorted(l_expected, key=lambda k: k['data']) self.assertEqual( l_real,