Skip to content

Commit

Permalink
XDR iocs integration- 'XSOAR Comment Field Exporting To XDR' paramete…
Browse files Browse the repository at this point in the history
…r is handled incorrectly (demisto#36658)

* fixed an issue with list as a key

* RN

* revert change

* first change

* pre commit

* extract to func

* fix UTS

* pre commit

* Bump pack from version CortexXDR to 6.1.84.

* add logs

* fix a bug

* logs

* Bump pack from version CortexXDR to 6.1.85.

* remove redundant line

* fix return value

* added UTs

* pre commit

* autopep8

* Bump pack from version CortexXDR to 6.1.86.

* changing the old dummy api key in a UT to a clear dummy str

* Update Packs/CortexXDR/Integrations/XDR_iocs/XDR_iocs.py

Co-authored-by: dorschw <[email protected]>

* Update Packs/CortexXDR/Integrations/XDR_iocs/XDR_iocs.py

Co-authored-by: dorschw <[email protected]>

* improve create_an_indicator_link()

* CR

* CR

* revert some pre commit changes

* Update Packs/CortexXDR/Integrations/XDR_iocs/XDR_iocs_test.py

Co-authored-by: dorschw <[email protected]>

* fix UT

* Update Packs/CortexXDR/ReleaseNotes/6_1_86.md

Co-authored-by: dorschw <[email protected]>

* Update Packs/CortexXDR/Integrations/XDR_iocs/XDR_iocs.py

Co-authored-by: dorschw <[email protected]>

* Update Packs/CortexXDR/Integrations/XDR_iocs/XDR_iocs.py

Co-authored-by: dorschw <[email protected]>

* Update Packs/CortexXDR/Integrations/XDR_iocs/XDR_iocs.py

Co-authored-by: dorschw <[email protected]>

* fix UT

* CR

* RN

* change it to an if/else for readability

* pre-commit

* pre-commit

* change UT to pytest.raises

* new line

* RN

* naming CR

* naming CR

---------

Co-authored-by: Content Bot <[email protected]>
Co-authored-by: dorschw <[email protected]>
  • Loading branch information
3 people authored Nov 5, 2024
1 parent c010561 commit d8a9e97
Show file tree
Hide file tree
Showing 7 changed files with 191 additions and 68 deletions.
92 changes: 67 additions & 25 deletions Packs/CortexXDR/Integrations/XDR_iocs/XDR_iocs.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ class Client:
severity: str = '' # used when override_severity is True
xsoar_severity_field: str = 'sourceoriginalseverity' # used when override_severity is False
xsoar_comments_field: str = 'comments'
add_link_as_a_comment: bool = False
comments_as_tags: bool = False
tag = 'Cortex XDR'
tlp_color = None
Expand Down Expand Up @@ -294,38 +295,55 @@ def demisto_types_to_xdr(_type: str) -> str:
return xdr_type


def _parse_demisto_comments(ioc: dict, comment_field_name: list[str] | str, comments_as_tags: bool) -> list[Any] | None:
def create_an_indicator_link(ioc: dict) -> list[str]:
"""
Creates an indicator link into comments field.
Args:
ioc (dict): the IOC dict.
Returns:
A list which contains a string of indicator's link.
"""
base_url = f'{demisto.demistoUrls().get("server")}'
path = 'indicator' if is_xsoar_saas() else '#/indicator'
return [f'{base_url}/{path}/{ioc.get("id")}']


def _parse_demisto_comments(ioc: dict, comment_field_name: str, comments_as_tags: bool) -> list[Any] | None:
""""
Parsing xsoar fields to xdr from multiple fields value or a single value.
Args:
ioc (dict): the IOC dict.
comment_field_name (list[str] | str): the name of the comment field(s) to parse.
comment_field_name (str): the name of the comment field to parse.
comments_as_tags (bool): whether to return comments as XDR tags rather than notes.
Returns:
A list with the parsed comment(s) joined by commas if multiple comment fields were provided,
otherwise the parsed comment from the single provided field.
Returns None if no comments were found.
"""
# parse comments from multiple fields if specified as list
if isinstance(comment_field_name, list):
comments = []
for field in comment_field_name:
parsing = parse_demisto_single_comments(ioc, field, comments_as_tags)
if parsing:
comments.extend(parsing)
return [', '.join(comments)]

# else return single field
return parse_demisto_single_comments(ioc, comment_field_name, comments_as_tags)
comments = []
# a regular comment for the given field name, "comments" as a default
comment = parse_demisto_single_comment(ioc, comment_field_name, comments_as_tags)
if comment:
comments.extend(comment)

# if the flag is True, add a link as a comment
if Client.add_link_as_a_comment:
comments.extend(create_an_indicator_link(ioc))

if comments_as_tags:
return comments or ['']

return [', '.join(comments)]


def parse_demisto_single_comments(ioc: dict, comment_field_name: list[str] | str, comments_as_tags: bool) -> list[str] | None:
def parse_demisto_single_comment(ioc: dict, comment_field_name: str, comments_as_tags: bool) -> list[str] | None:
""""
Parsing xsoar fields to xdr from a single value.
Parsing xsoar field to xdr from a single value.
Args:
ioc (dict): the IOC dict.
comment_field_name (list[str] | str): the name of the comment field(s) to parse.
comment_field_name (str): the name of the comment field to parse.
comments_as_tags (bool): whether to return comments as XDR tags rather than notes.
Returns:
Expand All @@ -345,12 +363,6 @@ def parse_demisto_single_comments(ioc: dict, comment_field_name: list[str] | str
return None
return [comment]

elif comment_field_name == 'indicator_link':
# parse indicator link into comments field
if is_xsoar_saas():
return [f'{demisto.demistoUrls().get("server")}/indicator/{ioc.get("id")}']
return [f'{demisto.demistoUrls().get("server")}/#/indicator/{ioc.get("id")}']

else: # custom comments field
if not (raw_comment := ioc.get('CustomFields', {}).get(comment_field_name)):
return None
Expand Down Expand Up @@ -398,7 +410,8 @@ def demisto_ioc_to_xdr(ioc: dict) -> dict:

xdr_ioc['severity'] = validate_fix_severity_value(xdr_ioc['severity'], ioc['value'])

# demisto.debug(f'Processed outgoing IOC: {xdr_ioc}') # uncomment to debug, otherwise spams the log
extensive_log(f'Processed outgoing IOC: {xdr_ioc}')

return xdr_ioc

except KeyError as error:
Expand Down Expand Up @@ -675,7 +688,7 @@ def xdr_ioc_to_demisto(ioc: dict) -> dict:
if Client.tlp_color:
entry['fields']['trafficlightprotocol'] = Client.tlp_color

# demisto.debug(f'Processed incoming entry: {entry}') # uncomment to debug, otherwise it spams the log
extensive_log(f'Processed incoming entry: {entry}')
return entry


Expand Down Expand Up @@ -835,6 +848,33 @@ def validate_fix_severity_value(severity: str, indicator_value: str | None = Non
return severity_upper


def parse_xsoar_field_name_and_link(xsoar_comment_field: list[str]) -> tuple[str, bool]:
"""
Parsing the given list to two elements,
one is the xsoar field name and the second is the flag if we should add an indicator link as a comment (indicator_link).
Args:
xsoar_comment_field: list of fields.
Returns:
str: xsoar comment field name.
bool: whether to append an incident link to the comments.
"""
if len(xsoar_comment_field) == 1:
if xsoar_comment_field[0] == "indicator_link":
return "comments", True
return xsoar_comment_field[0], False

if len(xsoar_comment_field) == 2:
if "indicator_link" not in xsoar_comment_field:
raise DemistoException(
f"The parameter {xsoar_comment_field=} should only contain the field name, or the field name with the"
f" phrase indicator_link, separated by a comma.")

xsoar_comment_field.remove("indicator_link")
return xsoar_comment_field[0], True

raise DemistoException(f"The parameter {xsoar_comment_field=} cannot contain more than two values")


def main(): # pragma: no cover
params = demisto.params()
feed_fetch_interval = arg_to_number(params.get('feedFetchInterval'))
Expand All @@ -853,8 +893,10 @@ def main(): # pragma: no cover
Client.tag = tag
if xsoar_severity_field := params.get('xsoar_severity_field'):
Client.xsoar_severity_field = to_cli_name(xsoar_severity_field)
if xsoar_comment_field := params.get('xsoar_comments_field'):
Client.xsoar_comments_field = xsoar_comment_field

if xsoar_comment_param := argToList(params.get('xsoar_comments_field')):
# in case of xsoar_comment_param is an empty list -> the Client.xsoar_comments_field is defined to "comments" by default
Client.xsoar_comments_field, Client.add_link_as_a_comment = parse_xsoar_field_name_and_link(xsoar_comment_param)

client = Client(params)
commands = {
Expand Down
127 changes: 101 additions & 26 deletions Packs/CortexXDR/Integrations/XDR_iocs/XDR_iocs_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@ def test_sanity(self, mocker):
"""
params = {
"apikey_id": "7",
"apikey": "t3PkfrEhaRAD9a3r6Lq5cVPyqdMqtLd8cOJlSWUtbslkbERUgb2BTkSNRtDr3C6CWAgYqxvyzwDFJ83BLBgu1V2cxQY7rsoo2ks2u3W2aBL2BlteF8C8u75lCVUrNbv1" # noqa: E501
"apikey": "aaaaaaa"
# noqa: E501
}
headers = {
'Authorization': 'da94963b561e3c95899d843b1284cecf410606e9e809be528ec1cf03880c6e9e',
'Authorization': 'e9a63fb06148bd3a73ce93c8b44c083a147cafb0fe607e706abcac25759b3c43', # it's a dummy Authorization
'x-iocs-source': 'xsoar',
'x-xdr-auth-id': '7',
'x-xdr-nonce': '1111111111111111111111111111111111111111111111111111111111111111',
Expand Down Expand Up @@ -428,43 +429,58 @@ def test_demisto_vendors_to_xdr(self, demisto_vendor, xdr_vendor):
(
{'value': '11.11.11.11', 'indicator_type': 'IP', 'score': 2},
{'expiration_date': -1, 'indicator': '11.11.11.11', 'reputation': 'SUSPICIOUS', 'severity': 'INFO',
'type': 'IP'}
'type': 'IP', "comment": [""]}
),
(
{'value': '11.11.11.11', 'indicator_type': 100, 'score': 2},
{'expiration_date': -1, 'indicator': '11.11.11.11', 'reputation': 'SUSPICIOUS', 'severity': 'INFO', 'type': '100'}
{'expiration_date': -1, 'indicator': '11.11.11.11', 'reputation': 'SUSPICIOUS',
'severity': 'INFO', 'type': '100', "comment": [""]}
),
(
{'value': '11.11.11.11', 'indicator_type': 'IP'},
{'expiration_date': -1, 'indicator': '11.11.11.11', 'reputation': 'UNKNOWN', 'severity': 'INFO', 'type': 'IP'}
{'expiration_date': -1, 'indicator': '11.11.11.11', 'reputation': 'UNKNOWN',
'severity': 'INFO', 'type': 'IP', "comment": [""]}
),
(
{'value': '11.11.11.11', 'indicator_type': 'IP', 'expiration': '2020-06-03T00:00:00Z'},
{'expiration_date': 1591142400000, 'indicator': '11.11.11.11', 'reputation': 'UNKNOWN', 'severity': 'INFO', 'type': 'IP'} # noqa: E501
{'expiration_date': 1591142400000, 'indicator': '11.11.11.11', 'reputation': 'UNKNOWN', 'severity': 'INFO',
'type': 'IP', "comment": [""]} # noqa: E501
),
(
{'value': '11.11.11.11', 'indicator_type': 'IP', 'comments': [{'type': 'IndicatorCommentTimeLine', 'content': 'test'}]}, # noqa: E501
{'expiration_date': -1, 'indicator': '11.11.11.11', 'reputation': 'UNKNOWN', 'severity': 'INFO', 'type': 'IP'}
{'value': '11.11.11.11', 'indicator_type': 'IP',
'comments': [{'type': 'IndicatorCommentTimeLine', 'content': 'test'}]}, # noqa: E501
{'expiration_date': -1, 'indicator': '11.11.11.11', 'reputation': 'UNKNOWN',
'severity': 'INFO', 'type': 'IP', "comment": [""]}
),
(
{'value': '11.11.11.11', 'indicator_type': 'IP', 'comments': [{'type': 'IndicatorCommentRegular', 'content': 'test'}]}, # noqa: E501
{'expiration_date': -1, 'indicator': '11.11.11.11', 'reputation': 'UNKNOWN', 'severity': 'INFO', 'type': 'IP', 'comment': ['test']} # noqa: E501
{'value': '11.11.11.11', 'indicator_type': 'IP',
'comments': [{'type': 'IndicatorCommentRegular', 'content': 'test'}]}, # noqa: E501
{'expiration_date': -1, 'indicator': '11.11.11.11', 'reputation': 'UNKNOWN', 'severity': 'INFO', 'type': 'IP',
'comment': ['test']} # noqa: E501
),
(
{'value': '11.11.11.11', 'indicator_type': 'IP', 'comments': [{'type': 'IndicatorCommentRegular', 'content': 'test'}, {'type': 'IndicatorCommentRegular', 'content': 'this is the comment'}]}, # noqa: E501
{'expiration_date': -1, 'indicator': '11.11.11.11', 'reputation': 'UNKNOWN', 'severity': 'INFO', 'type': 'IP', 'comment': ['this is the comment']} # noqa: E501
{'value': '11.11.11.11', 'indicator_type': 'IP', 'comments': [{'type': 'IndicatorCommentRegular', 'content': 'test'},
{'type': 'IndicatorCommentRegular',
'content': 'this is the comment'}]}, # noqa: E501
{'expiration_date': -1, 'indicator': '11.11.11.11', 'reputation': 'UNKNOWN', 'severity': 'INFO', 'type': 'IP',
'comment': ['this is the comment']} # noqa: E501
),
(
{'value': '11.11.11.11', 'indicator_type': 'IP', 'aggregatedReliability': 'A - Completely reliable'},
{'expiration_date': -1, 'indicator': '11.11.11.11', 'reputation': 'UNKNOWN', 'severity': 'INFO', 'type': 'IP', 'reliability': 'A'} # noqa: E501
{'expiration_date': -1, 'indicator': '11.11.11.11', 'reputation': 'UNKNOWN', 'severity': 'INFO', 'type': 'IP',
'reliability': 'A', "comment": [""]} # noqa: E501
),
(
{'value': '11.11.11.11', 'indicator_type': 'IP', 'CustomFields': {'threattypes': {'threatcategory': 'Malware'}}}, # noqa: E501
{'expiration_date': -1, 'indicator': '11.11.11.11', 'reputation': 'UNKNOWN', 'severity': 'INFO', 'type': 'IP', 'class': 'Malware'} # noqa: E501
{'value': '11.11.11.11', 'indicator_type': 'IP', 'CustomFields': {'threattypes': {'threatcategory': 'Malware'}}},
# noqa: E501
{'expiration_date': -1, 'indicator': '11.11.11.11', 'reputation': 'UNKNOWN', 'severity': 'INFO', 'type': 'IP',
'class': 'Malware', "comment": [""]} # noqa: E501
),
(
{'value': '11.11.11.11', 'indicator_type': 'IP', 'moduleToFeedMap': {'module': {'sourceBrand': 'test', 'score': 2}}}, # noqa: E501
{'expiration_date': -1, 'indicator': '11.11.11.11', 'reputation': 'UNKNOWN', 'severity': 'INFO', 'type': 'IP', 'vendors': [{'vendor_name': 'test', 'reputation': 'SUSPICIOUS', 'reliability': 'F'}]} # noqa: E501
{'value': '11.11.11.11', 'indicator_type': 'IP', 'moduleToFeedMap': {'module': {'sourceBrand': 'test', 'score': 2}}},
# noqa: E501
{'expiration_date': -1, 'indicator': '11.11.11.11', 'reputation': 'UNKNOWN', 'severity': 'INFO', 'type': 'IP',
'vendors': [{'vendor_name': 'test', 'reputation': 'SUSPICIOUS', 'reliability': 'F'}], "comment": [""]} # noqa: E501
)
]

Expand Down Expand Up @@ -878,7 +894,7 @@ def test_parse_demisto_comments__default_empty():
ioc={},
comment_field_name=Client.xsoar_comments_field,
comments_as_tags=False
) is None
) == ['']


def test_parse_demisto_comments__default_as_tag():
Expand Down Expand Up @@ -912,7 +928,7 @@ def test_parse_demisto_comments__custom_field(comment_value: str, comments_as_ta
"""
from XDR_iocs import _parse_demisto_comments
comment_field = 'comment_field'

Client.xsoar_comments_field = 'comment_field'
assert _parse_demisto_comments(
ioc={'CustomFields': {comment_field: comment_value}},
comment_field_name=comment_field,
Expand All @@ -934,7 +950,7 @@ def test_parse_demisto_comments__custom_field_empty_value(comments_as_tags: bool
ioc={'CustomFields': {comment_field: ''}},
comment_field_name=comment_field,
comments_as_tags=comments_as_tags
) is None
) == ['']


@pytest.mark.parametrize('comments_as_tags', (True, False))
Expand All @@ -950,7 +966,7 @@ def test_parse_demisto_comments__custom_field_missing(comments_as_tags: bool):
ioc={'CustomFields': {}},
comment_field_name='comment_field',
comments_as_tags=comments_as_tags
) is None
) == ['']


@pytest.mark.parametrize(
Expand Down Expand Up @@ -1007,9 +1023,10 @@ def test_parse_demisto_comments_url_xsoar_6_default(mocker):
from XDR_iocs import _parse_demisto_comments
inc_id = '111111'
mocker.patch.object(demisto, 'demistoUrls', return_value={'server': 'url'})
Client.add_link_as_a_comment = True
assert _parse_demisto_comments(
ioc={'id': inc_id},
comment_field_name='indicator_link',
comment_field_name='',
comments_as_tags=False
) == [f'url/#/indicator/{inc_id}']

Expand All @@ -1028,9 +1045,10 @@ def test_parse_demisto_comments_url_xsoar_8_default(mocker):
inc_id = '111111'
mocker.patch.object(demisto, 'demistoUrls', return_value={'server': 'url'})
mocker.patch.object(XDR_iocs, 'is_xsoar_saas', return_value=True)
Client.add_link_as_a_comment = True
assert XDR_iocs._parse_demisto_comments(
ioc={'id': inc_id},
comment_field_name='indicator_link',
comment_field_name='',
comments_as_tags=False
) == [f'url/indicator/{inc_id}']

Expand All @@ -1045,12 +1063,12 @@ def test_parse_demisto_list_of_comments_default(mocker):
inc_id = '111111'
comment_value = 'here be comment'
mocker.patch.object(demisto, 'demistoUrls', return_value={'server': 'url'})
Client.xsoar_comments_field, Client.add_link_as_a_comment = "comments", True
assert _parse_demisto_comments(
ioc={Client.xsoar_comments_field: [{'type': 'IndicatorCommentRegular', 'content': comment_value}],
'id': inc_id},
comment_field_name=['indicator_link', Client.xsoar_comments_field],
comments_as_tags=False
) == [f'url/#/indicator/{inc_id}, {comment_value}']
comment_field_name=Client.xsoar_comments_field,
comments_as_tags=False) == [f'{comment_value}, url/#/indicator/{inc_id}']


@patch('XDR_iocs.demisto.params', return_value={'feed': True, 'feedFetchInterval': '14'})
Expand Down Expand Up @@ -1268,3 +1286,60 @@ def test_get_indicators(mock_IndicatorsSearcher,
mock_info.assert_called_with(expected_info)
else:
mock_info.assert_not_called()


@pytest.mark.parametrize('is_xsoar_saas, expected_link', ((True, ['url/indicator/111']), (False, ['url/#/indicator/111'])))
def test_create_an_indicator_link(mocker, is_xsoar_saas: bool, expected_link: str):
"""
Given:
- indicator id and a bool argument is_xsoar_saas which presents if xsaor is a saas version or not
When:
- creating an indicator link
Then:
- verify the link according to the XSAOR version
"""
import XDR_iocs
mocker.patch.object(demisto, 'demistoUrls', return_value={'server': 'url'})
mocker.patch.object(XDR_iocs, 'is_xsoar_saas', return_value=is_xsoar_saas)
assert XDR_iocs.create_an_indicator_link(ioc={'id': '111'}) == expected_link


@pytest.mark.parametrize('xsoar_comment_field, expected_result',
((["indicator_link"], ("comments", True)),
(["comments"], ("comments", False)),
(["comments", "indicator_link"], ("comments", True)),
))
def test_parse_xsoar_field_name_and_link(xsoar_comment_field: list[str], expected_result: tuple[str, bool]):
"""
Given:
- xsoar_comment_field
When:
- parsing xsoar_comment_field by our logic
Then:
- verify the function parses the xsoar_comment_field as expected
"""
import XDR_iocs
result = XDR_iocs.parse_xsoar_field_name_and_link(xsoar_comment_field)
assert result == expected_result


@pytest.mark.parametrize('xsoar_comment_field, informative_message',
((["comments", "not_indicator_link"],
"The parameter xsoar_comment_field=['comments', 'not_indicator_link'] "
"should only contain the field name,"
" or the field name with the phrase indicator_link, separated by a comma."),
(["a", "b", "c"], ("The parameter xsoar_comment_field=['a', 'b', 'c'] cannot contain more than "
'two values'))))
def test_parse_xsoar_field_name_and_link_exceptions(xsoar_comment_field: list[str], informative_message: str):
"""
Given:
- invalid xsoar_comment_field and the expected_result
When:
- parsing xsoar_comment_field by our logic
Then:
- verify the function throws a DemistoException with informative message
"""
import XDR_iocs
with pytest.raises(DemistoException) as de:
XDR_iocs.parse_xsoar_field_name_and_link(xsoar_comment_field)
assert de.message == informative_message
Loading

0 comments on commit d8a9e97

Please sign in to comment.