Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Observable parsing/extraction updates #4

Open
wants to merge 8 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
__pycache__/
*.py[cod]
*$py.class
.DS_Store

# C extensions
*.so
Expand Down Expand Up @@ -89,6 +90,8 @@ venv/
ENV/
env.bak/
venv.bak/
bin/
include/

# Spyder project settings
.spyderproject
Expand Down
116 changes: 91 additions & 25 deletions cortexutils/extractor.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#!/usr/bin/env python
from builtins import str as unicode
import ipaddress
import tld

import re

Expand All @@ -25,8 +27,24 @@ def __init__(self, ignore=None):
self.ignore = ignore
self.regex = self.__init_regex()

@staticmethod
def __init_regex():
def __valid_ip(self, value):
try:
if not ipaddress.ip_address(unicode(value)).is_global:
return None
except:
return None
return value

def __valid_domain(self, value):
return tld.get_fld(value, fix_protocol=True, fail_silently=True)

def __valid_fqdn(self, value):
parts = tld.get_tld(value, fix_protocol=True, fail_silently=True, as_object=True)
if parts and len(parts.subdomain) > 1:
return parts.parsed_url.netloc
return None

def __init_regex(self):
"""
Returns compiled regex list.

Expand All @@ -35,9 +53,15 @@ def __init_regex():
"""

# IPv4
ft_r = '(?:' + \
'(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.)' + \
'{3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?' + \
')'
regex = [{
'type': 'ip',
'regex': re.compile(r'[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}')
'regex': re.compile(r'[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}'),
'ft_regex': re.compile(r'{}'.format(ft_r)),
'validator': self.__valid_ip
}]

# IPv6
Expand Down Expand Up @@ -66,21 +90,30 @@ def __init_regex():
})

# URL
ft_r = '(' + \
'(?:(?:meows?|h[Xxt]{2}ps?)://)?(?:(?:(?:[a-zA-Z0-9\-]+\[?\.\]?)+[a-z]{2,8})' + \
'|(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\[?\.\]?){3}(?:25[0-5]|2[0-4][0-9]' + \
'|[01]?[0-9][0-9]?))/[^\s\<"]+' + \
')'
regex.append({
'type': 'url',
'regex': re.compile(r'^(http://|https://)')
'regex': re.compile(r'^(http://|https://)'),
'ft_regex': re.compile(r'{}'.format(ft_r))
})

# domain
regex.append({
'type': 'domain',
'regex': re.compile(r'^(?!http://|https://)^[\w\-]+\.[a-zA-Z]+$')
'regex': re.compile(r'^(?!http://|https://)^[\w\-]+\.[a-zA-Z]+$'),
'ft_regex': re.compile(r'[\s\>\</\"\']((?:[a-zA-Z0-9\-]+\.)+[a-z]{2,8})'),
'validator': self.__valid_domain
})

# hash
regex.append({
'type': 'hash',
'regex': re.compile(r'^([0-9a-fA-F]{32}|[0-9a-fA-F]{40}|[0-9a-fA-F]{64})$')
'regex': re.compile(r'^([0-9a-fA-F]{32}|[0-9a-fA-F]{40}|[0-9a-fA-F]{64})$'),
'ft_regex': re.compile(r'([0-9a-fA-F]{32}|[0-9a-fA-F]{40}|[0-9a-fA-F]{64})[\s\>\</\"\']')
})

# user-agent
Expand All @@ -106,13 +139,16 @@ def __init_regex():
# mail
regex.append({
'type': 'mail',
'regex': re.compile(r'[\w.\-]+@\w+\.[\w.]+')
'regex': re.compile(r'[\w.\-]+@\w+\.[\w.]+'),
'ft_regex': re.compile(r'([a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+)')
})

# fqdn
regex.append({
'type': 'fqdn',
'regex': re.compile(r'^(?!http://|https://)^[\w\-.]+\.[\w\-]+\.[a-zA-Z]+$')
'regex': re.compile(r'^(?!http://|https://)^[\w\-.]+\.[\w\-]+\.[a-zA-Z]+$'),
'ft_regex': re.compile(r'(?:^|[^a-zA-Z0-9\-@])((?:[a-zA-Z0-9\-]+\.)+[a-z]{2,8})(?:[^a-z]|$)'),
'validator': self.__valid_fqdn
})

return regex
Expand All @@ -134,9 +170,34 @@ def __checktype(self, value):
if isinstance(value, (str, unicode)):
for r in self.regex:
if r.get('regex').match(value):
if r.get('validator') and not r['validator'](value):
return ''
return r.get('type')
return ''

def __check_extraction(self, value):
"""Checks if the value matches extractions
:param value: The value to check
:type value: str or number
:return: Dict of results {type: [list of extracted values], type: [list of extracted values]}
:rtype: dict
"""
observables = {}
if isinstance(value, (str, unicode)):
for r in self.regex:
ioc_type = r.get('type')
rex = r.get('ft_regex')
if not ioc_type or not rex:
continue
for observable in re.findall(rex, value):
observable = r.get('validator', lambda a: a)(observable)
if not observable:
continue
if ioc_type not in observables:
observables[ioc_type] = []
observables[ioc_type].append(observable)
return observables

def check_string(self, value):
"""
Checks if a string matches a datatype.
Expand All @@ -148,6 +209,17 @@ def check_string(self, value):
"""
return self.__checktype(value)

def extract_matches(self, value):
"""
Extracts all ioc's using extraction regex.

:param value: String to check
:type value: str
:return: Dict of results {type: [list of extracted values], type: [list of extracted values]}
:rtype: dict
"""
return self.__check_extraction(value)

def check_iterable(self, iterable):
"""
Checks values of a list or a dict on ioc's. Returns a list of dict {type, value}. Raises TypeError, if iterable
Expand All @@ -167,28 +239,22 @@ def check_iterable(self, iterable):
'dataType': dt,
'data': iterable
})
elif isinstance(iterable, list):
for item in iterable:
if isinstance(item, list) or isinstance(item, dict):
results.extend(self.check_iterable(item))
else:
dt = self.__checktype(item)
if len(dt) > 0:
else:
#no hits of string matching do we'll parse with full text
types = self.__check_extraction(iterable)
for dt in types:
for val in types[dt]:
results.append({
'dataType': dt,
'data': item
'data': val
})

elif isinstance(iterable, list):
for item in iterable:
results.extend(self.check_iterable(item))
elif isinstance(iterable, dict):
for _, item in iterable.items():
if isinstance(item, list) or isinstance(item, dict):
results.extend(self.check_iterable(item))
else:
dt = self.__checktype(item)
if len(dt) > 0:
results.append({
'dataType': dt,
'data': item
})
results.extend(self.check_iterable(item))
else:
raise TypeError('Not supported type.')

Expand Down
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ipaddress==1.0.22
tld==0.9.3
91 changes: 74 additions & 17 deletions tests/test_suite_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
This contains the unit tests for the extractor.
"""
import unittest

from cortexutils.extractor import Extractor


Expand Down Expand Up @@ -43,7 +42,7 @@ def test_single_url(self):

def test_single_ipv4(self):
self.assertEqual(
self.extractor.check_string(value='10.0.0.1'),
self.extractor.check_string(value='8.8.8.8'),
'ip',
'ipv4 single string: wrong data type.'
)
Expand Down Expand Up @@ -98,11 +97,65 @@ def test_single_regkey(self):
'registry single string: wrong data type.'
)

def test_text_ip(self):
text = 'This is a string with an IP 8.8.8.8 embedded'
self.assertEqual(
self.extractor.extract_matches(value=text),
{
'ip': ['8.8.8.8']
},
'ip in text: failed.'
)

def test_text_url(self):
text = 'This is a string with a url http://www.somebaddomain.com/badness/bad embedded'
self.assertEqual(
self.extractor.extract_matches(value=text),
{
'url': ['http://www.somebaddomain.com/badness/bad'],
'domain': [u'somebaddomain.com'],
'fqdn': [u'www.somebaddomain.com']
},
'url in text: failed.'
)

def test_text_hash(self):
text = '''b373bd6b144e7846f45a1e47eed380b7 This is a string with an hashes b373bd6b144e7846f45a1e47ced380b8 and
7ef8b3dc5bf40268f66721a89b95f4c5f0cc08e34836f8c3a007ceed193654d4 embedded
'''
self.assertEqual(
self.extractor.extract_matches(value=text),
{
'hash': [
'b373bd6b144e7846f45a1e47eed380b7',
'b373bd6b144e7846f45a1e47ced380b8',
'7ef8b3dc5bf40268f66721a89b95f4c5f0cc08e34836f8c3a007ceed193654d4'
]
},
'hash in text: failed.'
)

def test_text_email(self):
text = 'This is a string with a url [email protected] and [email protected] embedded'
self.assertEqual(
self.extractor.extract_matches(value=text),
{
'mail': [
'[email protected]',
'[email protected]'
]
},
'email in text: failed.'
)

def test_iterable(self):
l_real = self.extractor.check_iterable({
'results': [
{
'This is an totally unimportant key': '127.0.0.1'
'This is an totally unimportant key': '8.8.8.8'
},
{
'This is an IP in text': 'This is a really bad IP 8.8.8.9 serving malware'
},
{
'Totally nested!': ['https://nestedurl.verynested.com']
Expand All @@ -113,34 +166,38 @@ def test_iterable(self):
})
l_expected = [
{
'type': 'hash',
'value': '7ef8b3dc5bf40268f66721a89b95f4c5f0cc08e34836f8c3a007ceed193654d4'
'dataType': 'hash',
'data': '7ef8b3dc5bf40268f66721a89b95f4c5f0cc08e34836f8c3a007ceed193654d4'
},
{
'dataType': 'ip',
'data': '8.8.8.8'
},
{
'type': 'ip',
'value': '127.0.0.1'
'dataType': 'ip',
'data': '8.8.8.9'
},
{
'type': 'url',
'value': 'https://nestedurl.verynested.com'
'dataType': 'url',
'data': 'https://nestedurl.verynested.com'
},
{
'type': 'domain',
'value': 'google.de'
'dataType': 'domain',
'data': 'google.de'
},
{
'type': 'domain',
'value': 'bing.com'
'dataType': 'domain',
'data': 'bing.com'
},
{
'type': 'fqdn',
'value': 'www.fqdn.de'
'dataType': 'fqdn',
'data': 'www.fqdn.de'
}
]

# Sorting the lists
l_real = sorted(l_real, key=lambda k: k['value'])
l_expected = sorted(l_expected, key=lambda k: k['value'])
l_real = sorted(l_real, key=lambda k: k['data'])
l_expected = sorted(l_expected, key=lambda k: k['data'])

self.assertEqual(
l_real,
Expand Down