From 345498750de94ec08f35d21ee8d13893e4463dc3 Mon Sep 17 00:00:00 2001 From: whikernel Date: Thu, 4 May 2023 09:28:44 +0200 Subject: [PATCH 01/12] [ADD] Getter of an alert --- dfir_iris_client/alert.py | 47 ++++++++++++ dfir_iris_client/tests/test_alert.py | 108 +++++++++++++++++++++++++++ 2 files changed, 155 insertions(+) create mode 100644 dfir_iris_client/alert.py create mode 100644 dfir_iris_client/tests/test_alert.py diff --git a/dfir_iris_client/alert.py b/dfir_iris_client/alert.py new file mode 100644 index 0000000..f7c171b --- /dev/null +++ b/dfir_iris_client/alert.py @@ -0,0 +1,47 @@ +# IRIS Client API Source Code +# contact@dfir-iris.org +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 3 of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with this program; if not, write to the Free Software Foundation, +# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +import json +import warnings + +from requests import Response + +from dfir_iris_client.helper.utils import ApiResponse +from dfir_iris_client.session import ClientSession + + +class Alert(object): + """Handles alert operations""" + + def __init__(self, session: ClientSession): + """Init + + Args: + session (ClientSession): Client session + """ + self._s = session + + def get_alert(self, alert_id: int) -> ApiResponse: + """Get an alert + + Args: + alert_id (int): Alert id + + Returns: + Response: Response object + """ + return self._s.pi_get(f"/alerts/{alert_id}") + diff --git a/dfir_iris_client/tests/test_alert.py b/dfir_iris_client/tests/test_alert.py new file mode 100644 index 0000000..75acdd8 --- /dev/null +++ b/dfir_iris_client/tests/test_alert.py @@ -0,0 +1,108 @@ +# IRIS Client API Source Code +# contact@dfir-iris.org +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 3 of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with this program; if not, write to the Free Software Foundation, +# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +import datetime +import json +from pathlib import Path + +import pytest + +from dfir_iris_client.admin import AdminHelper +from dfir_iris_client.alert import Alert +from dfir_iris_client.case import Case +from dfir_iris_client.customer import Customer +from dfir_iris_client.helper.colors import EventWhite +from dfir_iris_client.helper.report_template_types import ReportTemplateType, ReportTemplateLanguage +from dfir_iris_client.helper.utils import assert_api_resp, get_data_from_resp, parse_api_data +from dfir_iris_client.tests.tests_helper import InitIrisClientTest + + +class AlertTest(InitIrisClientTest): + + def setUp(self) -> None: + """ """ + self.alert = Alert(self.session) + + def test_get_alert(self): + """ """ + resp = self.alert.get_alert(1) + + assert bool(assert_api_resp(resp)) is True + data = get_data_from_resp(resp) + + assert parse_api_data(data, 'alert_id') == 1 + assets = parse_api_data(data, 'assets') + + assert isinstance(assets, list) + + assert isinstance(parse_api_data(data, 'alert_severity_id'), int) + assert isinstance(parse_api_data(data, 'alert_owner_id'), int) or parse_api_data(data, 'alert_owner_id') is None + + alert_source_content = parse_api_data(data, 'alert_source_content') + assert isinstance(alert_source_content, dict) + + comments = parse_api_data(data, 'comments') + assert isinstance(comments, list) + + modification_history = parse_api_data(data, 'modification_history') + assert isinstance(modification_history, dict) + + customer = parse_api_data(data, 'customer') + assert isinstance(customer, dict) + + owner = parse_api_data(data, 'owner') + assert isinstance(owner, dict) or owner is None + + assert isinstance(parse_api_data(data, 'alert_source_link'), str) + + cases = parse_api_data(data, 'cases') + assert isinstance(cases, list) + + assert isinstance(parse_api_data(data, 'alert_classification_id'), int) + assert isinstance(parse_api_data(data, 'alert_source'), str) + assert isinstance(parse_api_data(data, 'alert_tags'), str) + assert isinstance(parse_api_data(data, 'alert_context'), dict) + assert isinstance(parse_api_data(data, 'alert_id'), int) + + severity = parse_api_data(data, 'severity') + assert isinstance(severity, dict) + + alert_creation_time = parse_api_data(data, 'alert_creation_time') + assert isinstance(alert_creation_time, str) + + classification = parse_api_data(data, 'classification') + assert isinstance(classification, dict) + + assert isinstance(parse_api_data(data, 'alert_title'), str) + assert isinstance(parse_api_data(data, 'alert_uuid'), str) + assert isinstance(parse_api_data(data, 'alert_source_ref'), str) + assert isinstance(parse_api_data(data, 'alert_note'), str) + assert isinstance(parse_api_data(data, 'alert_customer_id'), int) + + iocs = parse_api_data(data, 'iocs') + assert isinstance(iocs, list) + + assert isinstance(parse_api_data(data, 'alert_description'), str) + assert isinstance(parse_api_data(data, 'alert_status_id'), int) + + alert_source_event_time = parse_api_data(data, 'alert_source_event_time') + assert isinstance(alert_source_event_time, str) + + status = parse_api_data(data, 'status') + assert isinstance(status, dict) + + related_alerts = parse_api_data(data, 'related_alerts') + assert isinstance(related_alerts, dict) From b3488432a0ebcfb81cec16423b410125279ada67 Mon Sep 17 00:00:00 2001 From: whikernel Date: Thu, 4 May 2023 09:41:00 +0200 Subject: [PATCH 02/12] [ADD] Getter of multiple alerts --- dfir_iris_client/alert.py | 19 +++- dfir_iris_client/tests/test_alert.py | 125 +++++++++++++++------------ 2 files changed, 90 insertions(+), 54 deletions(-) diff --git a/dfir_iris_client/alert.py b/dfir_iris_client/alert.py index f7c171b..7f070d0 100644 --- a/dfir_iris_client/alert.py +++ b/dfir_iris_client/alert.py @@ -16,10 +16,11 @@ # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. import json import warnings +from typing import List from requests import Response -from dfir_iris_client.helper.utils import ApiResponse +from dfir_iris_client.helper.utils import ApiResponse, ClientApiError from dfir_iris_client.session import ClientSession @@ -45,3 +46,19 @@ def get_alert(self, alert_id: int) -> ApiResponse: """ return self._s.pi_get(f"/alerts/{alert_id}") + def get_alerts(self, alert_ids: List[int]) -> ApiResponse: + """Get alerts from their ids + + Args: + alert_ids (list): Alert ids + + Returns: + Response: Response object + """ + + if not all(isinstance(element, int) for element in alert_ids): + return ClientApiError('Expected a list of integers for alert_ids') + + return self._s.pi_get(f"/alerts/filter?alert_ids={','.join(str(element) for element in alert_ids)}") + + diff --git a/dfir_iris_client/tests/test_alert.py b/dfir_iris_client/tests/test_alert.py index 75acdd8..5e1e06d 100644 --- a/dfir_iris_client/tests/test_alert.py +++ b/dfir_iris_client/tests/test_alert.py @@ -30,79 +30,98 @@ from dfir_iris_client.tests.tests_helper import InitIrisClientTest -class AlertTest(InitIrisClientTest): +def assert_alert_isvalid(data, alert_id, has_related_alerts=False): + """Assert that the alert data is valid""" + assert parse_api_data(data, 'alert_id') == alert_id + assets = parse_api_data(data, 'assets') - def setUp(self) -> None: - """ """ - self.alert = Alert(self.session) + assert isinstance(assets, list) - def test_get_alert(self): - """ """ - resp = self.alert.get_alert(1) + assert isinstance(parse_api_data(data, 'alert_severity_id'), int) + assert isinstance(parse_api_data(data, 'alert_owner_id'), int) or parse_api_data(data, 'alert_owner_id') is None - assert bool(assert_api_resp(resp)) is True - data = get_data_from_resp(resp) + alert_source_content = parse_api_data(data, 'alert_source_content') + assert isinstance(alert_source_content, dict) - assert parse_api_data(data, 'alert_id') == 1 - assets = parse_api_data(data, 'assets') + comments = parse_api_data(data, 'comments') + assert isinstance(comments, list) - assert isinstance(assets, list) + modification_history = parse_api_data(data, 'modification_history') + assert isinstance(modification_history, dict) - assert isinstance(parse_api_data(data, 'alert_severity_id'), int) - assert isinstance(parse_api_data(data, 'alert_owner_id'), int) or parse_api_data(data, 'alert_owner_id') is None + customer = parse_api_data(data, 'customer') + assert isinstance(customer, dict) - alert_source_content = parse_api_data(data, 'alert_source_content') - assert isinstance(alert_source_content, dict) + owner = parse_api_data(data, 'owner') + assert isinstance(owner, dict) or owner is None - comments = parse_api_data(data, 'comments') - assert isinstance(comments, list) + assert isinstance(parse_api_data(data, 'alert_source_link'), str) - modification_history = parse_api_data(data, 'modification_history') - assert isinstance(modification_history, dict) + cases = parse_api_data(data, 'cases') + assert isinstance(cases, list) - customer = parse_api_data(data, 'customer') - assert isinstance(customer, dict) + assert isinstance(parse_api_data(data, 'alert_classification_id'), int) + assert isinstance(parse_api_data(data, 'alert_source'), str) + assert isinstance(parse_api_data(data, 'alert_tags'), str) + assert isinstance(parse_api_data(data, 'alert_context'), dict) + assert isinstance(parse_api_data(data, 'alert_id'), int) - owner = parse_api_data(data, 'owner') - assert isinstance(owner, dict) or owner is None + severity = parse_api_data(data, 'severity') + assert isinstance(severity, dict) - assert isinstance(parse_api_data(data, 'alert_source_link'), str) + alert_creation_time = parse_api_data(data, 'alert_creation_time') + assert isinstance(alert_creation_time, str) - cases = parse_api_data(data, 'cases') - assert isinstance(cases, list) + classification = parse_api_data(data, 'classification') + assert isinstance(classification, dict) - assert isinstance(parse_api_data(data, 'alert_classification_id'), int) - assert isinstance(parse_api_data(data, 'alert_source'), str) - assert isinstance(parse_api_data(data, 'alert_tags'), str) - assert isinstance(parse_api_data(data, 'alert_context'), dict) - assert isinstance(parse_api_data(data, 'alert_id'), int) + assert isinstance(parse_api_data(data, 'alert_title'), str) + assert isinstance(parse_api_data(data, 'alert_uuid'), str) + assert isinstance(parse_api_data(data, 'alert_source_ref'), str) + assert isinstance(parse_api_data(data, 'alert_note'), str) + assert isinstance(parse_api_data(data, 'alert_customer_id'), int) - severity = parse_api_data(data, 'severity') - assert isinstance(severity, dict) + iocs = parse_api_data(data, 'iocs') + assert isinstance(iocs, list) - alert_creation_time = parse_api_data(data, 'alert_creation_time') - assert isinstance(alert_creation_time, str) + assert isinstance(parse_api_data(data, 'alert_description'), str) + assert isinstance(parse_api_data(data, 'alert_status_id'), int) - classification = parse_api_data(data, 'classification') - assert isinstance(classification, dict) + alert_source_event_time = parse_api_data(data, 'alert_source_event_time') + assert isinstance(alert_source_event_time, str) - assert isinstance(parse_api_data(data, 'alert_title'), str) - assert isinstance(parse_api_data(data, 'alert_uuid'), str) - assert isinstance(parse_api_data(data, 'alert_source_ref'), str) - assert isinstance(parse_api_data(data, 'alert_note'), str) - assert isinstance(parse_api_data(data, 'alert_customer_id'), int) + status = parse_api_data(data, 'status') + assert isinstance(status, dict) - iocs = parse_api_data(data, 'iocs') - assert isinstance(iocs, list) + if has_related_alerts: + related_alerts = parse_api_data(data, 'related_alerts') + assert isinstance(related_alerts, dict) - assert isinstance(parse_api_data(data, 'alert_description'), str) - assert isinstance(parse_api_data(data, 'alert_status_id'), int) - alert_source_event_time = parse_api_data(data, 'alert_source_event_time') - assert isinstance(alert_source_event_time, str) +class AlertTest(InitIrisClientTest): - status = parse_api_data(data, 'status') - assert isinstance(status, dict) + def setUp(self) -> None: + """ """ + self.alert = Alert(self.session) + + def test_get_alert(self): + """ """ + resp = self.alert.get_alert(1) + + assert bool(assert_api_resp(resp)) is True + data = get_data_from_resp(resp) + + assert_alert_isvalid(data,1, has_related_alerts=True) + + def test_get_alerts(self): + """ """ + resp = self.alert.get_alerts([1, 2, 3, 4, 5, 6]) + + assert bool(assert_api_resp(resp)) is True + data = get_data_from_resp(resp) + + assert isinstance(data, dict) + + for alert in parse_api_data(data, 'alerts'): + assert_alert_isvalid(alert, parse_api_data(alert, 'alert_id')) - related_alerts = parse_api_data(data, 'related_alerts') - assert isinstance(related_alerts, dict) From fc031e7fb6c5d69ee0e361006e2beca2352e7c0f Mon Sep 17 00:00:00 2001 From: whikernel Date: Thu, 4 May 2023 09:58:15 +0200 Subject: [PATCH 03/12] [ADD] Endpoint to add an alert and delete it. Added tests --- dfir_iris_client/alert.py | 36 +++++- dfir_iris_client/tests/resources/alert.json | 119 ++++++++++++++++++++ dfir_iris_client/tests/test_alert.py | 24 ++++ 3 files changed, 177 insertions(+), 2 deletions(-) create mode 100644 dfir_iris_client/tests/resources/alert.json diff --git a/dfir_iris_client/alert.py b/dfir_iris_client/alert.py index 7f070d0..25f5caf 100644 --- a/dfir_iris_client/alert.py +++ b/dfir_iris_client/alert.py @@ -42,7 +42,7 @@ def get_alert(self, alert_id: int) -> ApiResponse: alert_id (int): Alert id Returns: - Response: Response object + ApiResponse: Response object """ return self._s.pi_get(f"/alerts/{alert_id}") @@ -53,7 +53,7 @@ def get_alerts(self, alert_ids: List[int]) -> ApiResponse: alert_ids (list): Alert ids Returns: - Response: Response object + ApiResponse: Response object """ if not all(isinstance(element, int) for element in alert_ids): @@ -61,4 +61,36 @@ def get_alerts(self, alert_ids: List[int]) -> ApiResponse: return self._s.pi_get(f"/alerts/filter?alert_ids={','.join(str(element) for element in alert_ids)}") + def add_alert(self, alert_data: dict) -> ApiResponse: + """Add an alert + Args: + alert_data (dict): Alert data - The data is defined in the API documentation + + Returns: + ApiResponse: Response object + """ + return self._s.pi_post("/alerts/add", alert_data) + + def update_alert(self, alert_id: int, alert_data: dict) -> ApiResponse: + """Update an alert + + Args: + alert_id (int): Alert id + alert_data (dict): Alert data - The data is defined in the API documentation + + Returns: + ApiResponse: Response object + """ + return self._s.pi_post(f"/alerts/update/{alert_id}", alert_data) + + def delete_alert(self, alert_id: int) -> ApiResponse: + """Delete an alert + + Args: + alert_id (int): Alert id + + Returns: + ApiResponse: Response object + """ + return self._s.pi_post(f"/alerts/delete/{alert_id}") \ No newline at end of file diff --git a/dfir_iris_client/tests/resources/alert.json b/dfir_iris_client/tests/resources/alert.json new file mode 100644 index 0000000..db79d77 --- /dev/null +++ b/dfir_iris_client/tests/resources/alert.json @@ -0,0 +1,119 @@ +{ + "alert_title": "Super alert 5", + "alert_description": "This is a test alert", + "alert_source": "Test Source", + "alert_source_ref": "Test-123", + "alert_source_link": "https://source_link.com", + "alert_source_content": { + "_id": "603f704aaf7417985bbf3b22", + "contextId": "206e2965-6533-48a6-ba9e-794364a84bf9", + "description": "Contoso user performed 11 suspicious activities MITRE Technique used Account Discovery (T1087) and subtechnique used Domain Account (T1087.002)", + "entities": [ + { + "entityRole": "Source", + "entityType": 2, + "id": "6204bdaf-ad46-4e99-a25d-374a0532c666", + "inst": 0, + "label": "user1", + "pa": "user1@contoso.com", + "saas": 11161, + "type": "account" + }, + { + "entityRole": "Related", + "id": "55017817-27af-49a7-93d6-8af6c5030fdb", + "label": "DC3", + "type": "device" + }, + { + "id": 20940, + "label": "Active Directory", + "type": "service" + }, + { + "entityRole": "Related", + "id": "95c59b48-98c1-40ff-a444-d9040f1f68f2", + "label": "DC4", + "type": "device" + }, + { + "id": "5bfd18bfab73c36ba10d38ca", + "label": "Honeytoken activity", + "policyType": "ANOMALY_DETECTION", + "type": "policyRule" + }, + { + "entityRole": "Source", + "id": "34f3ecc9-6903-4df7-af79-14fe2d0d4553", + "label": "Client1", + "type": "device" + }, + { + "entityRole": "Related", + "id": "d68772fe-1171-4124-9f73-0f410340bd54", + "label": "DC1", + "type": "device" + }, + { + "type": "groupTag", + "id": "5f759b4d106abbe4a504ea5d", + "label": "All Users" + } + ], + "idValue": 15795464, + "isSystemAlert": false, + "resolutionStatusValue": 0, + "severityValue": 5, + "statusValue": 1, + "stories": [ + 0 + ], + "threatScore": 34, + "timestamp": 1621941916475, + "title": "Honeytoken activity", + "comment": "", + "handledByUser": "administrator@contoso.com", + "resolveTime": "2021-05-13T14:02:34.904Z", + "URL": "https://contoso.portal.cloudappsecurity.com/#/alerts/603f704aaf7417985bbf3b22" + }, + "alert_severity_id": 4, + "alert_status_id": 3, + "alert_context": { + "context_key": "context_value" + }, + "alert_source_event_time": "2023-03-26T03:00:30", + "alert_note": "Test note", + "alert_tags": "defender", + "alert_iocs": [ + { + "ioc_value": "tarzan5", + "ioc_description": "description kwekwe", + "ioc_tlp_id": 1, + "ioc_type_id": 2, + "ioc_tags": "tag1,tag2", + "ioc_enrichment": { "provider_1": { "data": 2, "new_data": 3 }, "provider_3": { "enric": "true" } + } + + }, + { + "ioc_value": "tarzan2", + "ioc_description": "description_hey", + "ioc_tlp_id": 2, + "ioc_type_id": 4, + "ioc_tags": "tag1,tag2", + "ioc_enrichment": { "provider_1": { "data": "a very long\nblablablabdjsjofiasofiasjdxaisjhfaiosxhd bla\nddijwedoijwedw\ndhasdhaifuhafiassfsakjfhaskljfhaslkfjhaslkfdjhdqwleiuhxioauwedhoqwiuhzndoqwuehxdnzoiuwehfoqwiufhxnwoquhoiwefhxnqwoiuhwqomifuhqzwofuhqwofeuzhqwofeiuqhwe fifuhqwiofuh qwofuqh fuq hwfoiqwhfoiquhfe quhfqiouwhf qoufhq hufou qufhqowiufhowufih qwfuhqwioufh wqoufh wifhufdhas", "new_data": 3 }, "provider_3": { "enric": "true" }} + + } + ], + "alert_assets": [{ + "asset_name": "My super nop", + "asset_description": "Asset description", + "asset_type_id": 1, + "asset_ip": "1.1.1.1", + "asset_domain": "", + "asset_tags": "tag1,tag2", + "asset_enrichment": {"enrich": {"enrich2": "super_enrich"}} + }], + "alert_customer_id": 1, + "alert_classification_id": 1 +} \ No newline at end of file diff --git a/dfir_iris_client/tests/test_alert.py b/dfir_iris_client/tests/test_alert.py index 5e1e06d..65a3f61 100644 --- a/dfir_iris_client/tests/test_alert.py +++ b/dfir_iris_client/tests/test_alert.py @@ -125,3 +125,27 @@ def test_get_alerts(self): for alert in parse_api_data(data, 'alerts'): assert_alert_isvalid(alert, parse_api_data(alert, 'alert_id')) + def test_add_alert(self): + """ """ + with open(Path(__file__).parent / 'resources' / 'alert.json') as f: + alert_data = json.load(f) + + resp = self.alert.add_alert(alert_data) + assert bool(assert_api_resp(resp)) is True + + data = get_data_from_resp(resp) + assert_alert_isvalid(data, parse_api_data(data, 'alert_id')) + + def test_delete_alert(self): + """ """ + with open(Path(__file__).parent / 'resources' / 'alert.json') as f: + alert_data = json.load(f) + + resp = self.alert.add_alert(alert_data) + assert bool(assert_api_resp(resp)) is True + + data = get_data_from_resp(resp) + alert_id = parse_api_data(data, 'alert_id') + + resp = self.alert.delete_alert(alert_id) + assert bool(assert_api_resp(resp)) is True From a79a7e1278660f1f4ac4cdad64cfab51868ddbeb Mon Sep 17 00:00:00 2001 From: whikernel Date: Thu, 4 May 2023 10:03:34 +0200 Subject: [PATCH 04/12] [ADD] Load alert data for tests --- dfir_iris_client/tests/test_alert.py | 28 ++++++++++++++++++++++++---- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/dfir_iris_client/tests/test_alert.py b/dfir_iris_client/tests/test_alert.py index 65a3f61..6f40c4b 100644 --- a/dfir_iris_client/tests/test_alert.py +++ b/dfir_iris_client/tests/test_alert.py @@ -30,6 +30,12 @@ from dfir_iris_client.tests.tests_helper import InitIrisClientTest +def load_alert_data(): + """Load alert data from a file""" + with open(Path(__file__).parent / 'resources' / 'alert.json') as f: + return json.load(f) + + def assert_alert_isvalid(data, alert_id, has_related_alerts=False): """Assert that the alert data is valid""" assert parse_api_data(data, 'alert_id') == alert_id @@ -127,8 +133,7 @@ def test_get_alerts(self): def test_add_alert(self): """ """ - with open(Path(__file__).parent / 'resources' / 'alert.json') as f: - alert_data = json.load(f) + alert_data = load_alert_data() resp = self.alert.add_alert(alert_data) assert bool(assert_api_resp(resp)) is True @@ -138,8 +143,7 @@ def test_add_alert(self): def test_delete_alert(self): """ """ - with open(Path(__file__).parent / 'resources' / 'alert.json') as f: - alert_data = json.load(f) + alert_data = load_alert_data() resp = self.alert.add_alert(alert_data) assert bool(assert_api_resp(resp)) is True @@ -149,3 +153,19 @@ def test_delete_alert(self): resp = self.alert.delete_alert(alert_id) assert bool(assert_api_resp(resp)) is True + + def test_update_alert(self): + """ """ + alert_data = load_alert_data() + + resp = self.alert.add_alert(alert_data) + assert bool(assert_api_resp(resp)) is True + + data = get_data_from_resp(resp) + alert_id = parse_api_data(data, 'alert_id') + + resp = self.alert.update_alert(alert_id, {'alert_title': 'test'}) + assert bool(assert_api_resp(resp)) is True + + data = get_data_from_resp(resp) + assert parse_api_data(data, 'alert_title') == 'test' From a30efa4d1d1d5cce38a53019e7457b271b3e4805 Mon Sep 17 00:00:00 2001 From: whikernel Date: Thu, 4 May 2023 10:12:19 +0200 Subject: [PATCH 05/12] [ADD] Invalid tests for alert --- dfir_iris_client/tests/test_alert.py | 39 ++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/dfir_iris_client/tests/test_alert.py b/dfir_iris_client/tests/test_alert.py index 6f40c4b..3f8c71c 100644 --- a/dfir_iris_client/tests/test_alert.py +++ b/dfir_iris_client/tests/test_alert.py @@ -36,6 +36,17 @@ def load_alert_data(): return json.load(f) +def load_invalid_alert_data(): + """Load invalid alert data from a file""" + with open(Path(__file__).parent / 'resources' / 'alert.json') as f: + data = json.load(f) + + data['alert_title'] = None + del data['alert_description'] + + return data + + def assert_alert_isvalid(data, alert_id, has_related_alerts=False): """Assert that the alert data is valid""" assert parse_api_data(data, 'alert_id') == alert_id @@ -169,3 +180,31 @@ def test_update_alert(self): data = get_data_from_resp(resp) assert parse_api_data(data, 'alert_title') == 'test' + + def test_add_alert_failure(self): + """Test adding an alert with invalid data should fail.""" + alert_data = load_invalid_alert_data() + + resp = self.alert.add_alert(alert_data) + assert bool(assert_api_resp(resp)) is False + + def test_delete_alert_failure(self): + """Test deleting a non-existent alert should fail.""" + non_existent_alert_id = -1 + + resp = self.alert.delete_alert(non_existent_alert_id) + assert bool(assert_api_resp(resp)) is False + + def test_update_alert_failure(self): + """Test updating an alert with invalid data should fail.""" + alert_data = load_alert_data() + + resp = self.alert.add_alert(alert_data) + assert bool(assert_api_resp(resp)) is True + + data = get_data_from_resp(resp) + alert_id = parse_api_data(data, 'alert_id') + + invalid_update_data = {'alert_title': None} + resp = self.alert.update_alert(alert_id, invalid_update_data) + assert bool(assert_api_resp(resp)) is False From ad91fad94bc0c94a59c1abc6b37440b27c988765 Mon Sep 17 00:00:00 2001 From: whikernel Date: Thu, 4 May 2023 14:08:21 +0200 Subject: [PATCH 06/12] [ADD] Alert escalation code --- dfir_iris_client/alert.py | 32 +++++++++++++++++++++++- dfir_iris_client/helper/utils.py | 31 ++++++++++++++++++++++- dfir_iris_client/tests/test_alert.py | 37 +++++++++++++++++++++++++--- 3 files changed, 95 insertions(+), 5 deletions(-) diff --git a/dfir_iris_client/alert.py b/dfir_iris_client/alert.py index 25f5caf..962a07d 100644 --- a/dfir_iris_client/alert.py +++ b/dfir_iris_client/alert.py @@ -93,4 +93,34 @@ def delete_alert(self, alert_id: int) -> ApiResponse: Returns: ApiResponse: Response object """ - return self._s.pi_post(f"/alerts/delete/{alert_id}") \ No newline at end of file + return self._s.pi_post(f"/alerts/delete/{alert_id}") + + def escalate_alert(self, alert_id: int, iocs_import_list: List[str], assets_import_list: List[str], + escalation_note: str, case_title:str, case_tags: str, case_template_id: int = None, + import_as_event: bool = False) -> ApiResponse: + """Escalate an alert + + Args: + alert_id (int): Alert id + iocs_import_list (list): List of IOCs UUID from the alert to import + assets_import_list (list): List of assets UUIDs from the alert to import + escalation_note (str): Escalation note + case_title (str): Case title + case_tags (str): Case tags, a string of comma separated tags + case_template_id (int): Case template id + import_as_event (bool): Import as event + + Returns: + ApiResponse: Response object + """ + payload = { + "iocs_import_list": iocs_import_list, + "assets_import_list": assets_import_list, + "note": escalation_note, + "case_title": case_title, + "case_tags": case_tags, + "case_template_id": case_template_id, + "import_as_event": import_as_event + } + + return self._s.pi_post(f"/alerts/escalate/{alert_id}", data=payload) \ No newline at end of file diff --git a/dfir_iris_client/helper/utils.py b/dfir_iris_client/helper/utils.py index e161501..ef32f2d 100644 --- a/dfir_iris_client/helper/utils.py +++ b/dfir_iris_client/helper/utils.py @@ -14,7 +14,7 @@ # You should have received a copy of the GNU Lesser General Public License # along with this program; if not, write to the Free Software Foundation, # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. -from typing import Union +from typing import Union, List import logging as log import json @@ -142,6 +142,19 @@ def get_data(self): return self._response.get('data') + def get_data_field(self, field: Union[List[str], str]): + """ """ + if not hasattr(self, "_response"): + return None + + if isinstance(field, str): + return self._response.get('data').get(field) + + if isinstance(field, list): + return reduce(lambda d, key: d.get(key) if d else None, field, self._response.get('data')) + + return None + def get_msg(self): """ """ if not hasattr(self, "_response"): @@ -241,6 +254,22 @@ def parse_api_data(data: dict, path: Union[list, str], strict=True) -> any: return fdata +def get_data(api_response: ApiResponse, path: Union[list, str], strict=True) -> any: + """Parses the data field of an API response. Path describes a path to fetch a specific value in data. + If strict is set, an exception is raised, otherwise None is returned. + + Args: + api_response: ApiResponse: + path: Value to get from within data + strict: Set to true to fails if path is not found in data (default) + + Returns: + ApiResponse + + """ + return parse_api_data(get_data_from_resp(api_response), path, strict) + + def ClientApiError(error=None, msg=None): """ diff --git a/dfir_iris_client/tests/test_alert.py b/dfir_iris_client/tests/test_alert.py index 3f8c71c..c6f90fb 100644 --- a/dfir_iris_client/tests/test_alert.py +++ b/dfir_iris_client/tests/test_alert.py @@ -26,7 +26,7 @@ from dfir_iris_client.customer import Customer from dfir_iris_client.helper.colors import EventWhite from dfir_iris_client.helper.report_template_types import ReportTemplateType, ReportTemplateLanguage -from dfir_iris_client.helper.utils import assert_api_resp, get_data_from_resp, parse_api_data +from dfir_iris_client.helper.utils import assert_api_resp, get_data_from_resp, parse_api_data, get_data from dfir_iris_client.tests.tests_helper import InitIrisClientTest @@ -202,9 +202,40 @@ def test_update_alert_failure(self): resp = self.alert.add_alert(alert_data) assert bool(assert_api_resp(resp)) is True - data = get_data_from_resp(resp) - alert_id = parse_api_data(data, 'alert_id') + alert_id = get_data(resp, 'alert_id') invalid_update_data = {'alert_title': None} resp = self.alert.update_alert(alert_id, invalid_update_data) assert bool(assert_api_resp(resp)) is False + + def test_escalate_alert(self): + """ Test escalating an alert """ + alert_data = load_alert_data() + + resp = self.alert.add_alert(alert_data) + assert bool(assert_api_resp(resp)) is True + + alert_id = resp.get_data_field('alert_id') + + iocs = resp.get_data_field('iocs') + ioc_uid = iocs[0].get('ioc_uid') + + asset = resp.get_data_field('assets') + asset_uid = asset[0].get('asset_uid') + + resp = self.alert.escalate_alert(alert_id, iocs_import_list=[ioc_uid], + assets_import_list=[asset_uid], escalation_note='test', + case_title='test', case_tags='defender,test', import_as_event=True) + + assert bool(assert_api_resp(resp)) is True + + data = get_data_from_resp(resp) + assert 'classification_id' in data + assert 'case_uuid' in data + assert 'case_name' in data and 'test' in data['case_name'] + assert 'case_id' in data + assert 'case_customer' in data + assert 'modification_history' in data + assert 'case_description' in data + assert 'case_soc_id' in data + assert 'status_id' in data From e04cb06219f66befa57b87f4eeebf9e01dc27d3f Mon Sep 17 00:00:00 2001 From: whikernel Date: Thu, 4 May 2023 14:22:17 +0200 Subject: [PATCH 07/12] [ADD] Alerts merging --- dfir_iris_client/alert.py | 38 +++++++++++++++++++++++----- dfir_iris_client/tests/test_alert.py | 32 +++++++++++++++++++++++ 2 files changed, 64 insertions(+), 6 deletions(-) diff --git a/dfir_iris_client/alert.py b/dfir_iris_client/alert.py index 962a07d..9b53e3f 100644 --- a/dfir_iris_client/alert.py +++ b/dfir_iris_client/alert.py @@ -44,7 +44,7 @@ def get_alert(self, alert_id: int) -> ApiResponse: Returns: ApiResponse: Response object """ - return self._s.pi_get(f"/alerts/{alert_id}") + return self._s.pi_get(f"alerts/{alert_id}") def get_alerts(self, alert_ids: List[int]) -> ApiResponse: """Get alerts from their ids @@ -59,7 +59,7 @@ def get_alerts(self, alert_ids: List[int]) -> ApiResponse: if not all(isinstance(element, int) for element in alert_ids): return ClientApiError('Expected a list of integers for alert_ids') - return self._s.pi_get(f"/alerts/filter?alert_ids={','.join(str(element) for element in alert_ids)}") + return self._s.pi_get(f"alerts/filter?alert_ids={','.join(str(element) for element in alert_ids)}") def add_alert(self, alert_data: dict) -> ApiResponse: """Add an alert @@ -70,7 +70,7 @@ def add_alert(self, alert_data: dict) -> ApiResponse: Returns: ApiResponse: Response object """ - return self._s.pi_post("/alerts/add", alert_data) + return self._s.pi_post("alerts/add", alert_data) def update_alert(self, alert_id: int, alert_data: dict) -> ApiResponse: """Update an alert @@ -82,7 +82,7 @@ def update_alert(self, alert_id: int, alert_data: dict) -> ApiResponse: Returns: ApiResponse: Response object """ - return self._s.pi_post(f"/alerts/update/{alert_id}", alert_data) + return self._s.pi_post(f"alerts/update/{alert_id}", alert_data) def delete_alert(self, alert_id: int) -> ApiResponse: """Delete an alert @@ -93,7 +93,7 @@ def delete_alert(self, alert_id: int) -> ApiResponse: Returns: ApiResponse: Response object """ - return self._s.pi_post(f"/alerts/delete/{alert_id}") + return self._s.pi_post(f"alerts/delete/{alert_id}") def escalate_alert(self, alert_id: int, iocs_import_list: List[str], assets_import_list: List[str], escalation_note: str, case_title:str, case_tags: str, case_template_id: int = None, @@ -123,4 +123,30 @@ def escalate_alert(self, alert_id: int, iocs_import_list: List[str], assets_impo "import_as_event": import_as_event } - return self._s.pi_post(f"/alerts/escalate/{alert_id}", data=payload) \ No newline at end of file + return self._s.pi_post(f"alerts/escalate/{alert_id}", data=payload) + + def merge_alert(self, alert_id: int, target_case_id: int, iocs_import_list: List[str], + assets_import_list: List[str], merge_note: str, import_as_event: bool = False) -> ApiResponse: + """Merge an alert + + Args: + alert_id (int): Alert id + target_case_id (int): Target case id + iocs_import_list (list): List of IOCs UUID from the alert to import + assets_import_list (list): List of assets UUIDs from the alert to import + merge_note (str): Merge note + import_as_event (bool): Import as event + + Returns: + ApiResponse: Response object + """ + payload = { + "target_case_id": target_case_id, + "iocs_import_list": iocs_import_list, + "assets_import_list": assets_import_list, + "note": merge_note, + "import_as_event": import_as_event + } + + return self._s.pi_post(f"alerts/merge/{alert_id}", data=payload) + diff --git a/dfir_iris_client/tests/test_alert.py b/dfir_iris_client/tests/test_alert.py index c6f90fb..8df17c5 100644 --- a/dfir_iris_client/tests/test_alert.py +++ b/dfir_iris_client/tests/test_alert.py @@ -239,3 +239,35 @@ def test_escalate_alert(self): assert 'case_description' in data assert 'case_soc_id' in data assert 'status_id' in data + + def test_merge_alert(self): + """ Test merging an alert """ + alert_data = load_alert_data() + + resp = self.alert.add_alert(alert_data) + assert bool(assert_api_resp(resp)) is True + + alert_id = resp.get_data_field('alert_id') + + iocs = resp.get_data_field('iocs') + ioc_uid = iocs[0].get('ioc_uid') + + asset = resp.get_data_field('assets') + asset_uid = asset[0].get('asset_uid') + + resp = self.alert.merge_alert(alert_id, iocs_import_list=[ioc_uid], + assets_import_list=[asset_uid], merge_note='test', + import_as_event=True, target_case_id=1) + + assert bool(assert_api_resp(resp)) is True + + data = get_data_from_resp(resp) + assert 'classification_id' in data + assert 'case_uuid' in data + assert 'case_name' in data and 'test' in data['case_name'] + assert 'case_id' in data + assert 'case_customer' in data + assert 'modification_history' in data + assert 'case_description' in data + assert 'case_soc_id' in data + assert 'status_id' in data \ No newline at end of file From ec0bcd96890bd9d2c62bf313dbb1092a4c76fa05 Mon Sep 17 00:00:00 2001 From: whikernel Date: Thu, 4 May 2023 14:26:53 +0200 Subject: [PATCH 08/12] [ADD] Invalid test cases for alerts merging --- dfir_iris_client/tests/test_alert.py | 50 +++++++++++++++++++++++++++- 1 file changed, 49 insertions(+), 1 deletion(-) diff --git a/dfir_iris_client/tests/test_alert.py b/dfir_iris_client/tests/test_alert.py index 8df17c5..3cc8428 100644 --- a/dfir_iris_client/tests/test_alert.py +++ b/dfir_iris_client/tests/test_alert.py @@ -270,4 +270,52 @@ def test_merge_alert(self): assert 'modification_history' in data assert 'case_description' in data assert 'case_soc_id' in data - assert 'status_id' in data \ No newline at end of file + assert 'status_id' in data + + def test_merge_alert_invalid_target_case_id(self): + """ Test merging an alert with an invalid target case ID """ + alert_data = load_alert_data() + + resp = self.alert.add_alert(alert_data) + assert bool(assert_api_resp(resp)) is True + + alert_id = resp.get_data_field('alert_id') + + iocs = resp.get_data_field('iocs') + ioc_uid = iocs[0].get('ioc_uid') + + asset = resp.get_data_field('assets') + asset_uid = asset[0].get('asset_uid') + + invalid_target_case_id = -1 + resp = self.alert.merge_alert(alert_id, iocs_import_list=[ioc_uid], + assets_import_list=[asset_uid], merge_note='test', + import_as_event=True, target_case_id=invalid_target_case_id) + + assert bool(assert_api_resp(resp)) is False + + def test_merge_alert_without_iocs_assets(self): + """ Test merging an alert with invalid IOCs or assets """ + alert_data = load_alert_data() + + resp = self.alert.add_alert(alert_data) + assert bool(assert_api_resp(resp)) is True + + alert_id = resp.get_data_field('alert_id') + + resp = self.alert.merge_alert(alert_id, iocs_import_list=['INVALID'], + assets_import_list=['INVALID'], merge_note='test', + import_as_event=True, target_case_id=1) + + assert bool(assert_api_resp(resp)) is True + + def test_merge_alert_non_existent_alert(self): + """ Test merging a non-existent alert """ + non_existent_alert_id = -1 + + resp = self.alert.merge_alert(non_existent_alert_id, iocs_import_list=['dummy_ioc_uid'], + assets_import_list=['dummy_asset_uid'], merge_note='test', + import_as_event=True, target_case_id=1) + + assert bool(assert_api_resp(resp)) is False + From 2f39b71a7e04f79e0fd2c6e3c7f11ad05575a9ce Mon Sep 17 00:00:00 2001 From: whikernel Date: Thu, 4 May 2023 14:31:21 +0200 Subject: [PATCH 09/12] [ADD] Unmerge of an alert --- dfir_iris_client/alert.py | 16 ++++++++++++++++ dfir_iris_client/tests/test_alert.py | 21 +++++++++++++++++++++ 2 files changed, 37 insertions(+) diff --git a/dfir_iris_client/alert.py b/dfir_iris_client/alert.py index 9b53e3f..ba5c8f7 100644 --- a/dfir_iris_client/alert.py +++ b/dfir_iris_client/alert.py @@ -150,3 +150,19 @@ def merge_alert(self, alert_id: int, target_case_id: int, iocs_import_list: List return self._s.pi_post(f"alerts/merge/{alert_id}", data=payload) + def unmerge_alert(self, alert_id: int, target_case_id: int) -> ApiResponse: + """ Unmerge an alert + + Args: + alert_id (int): Alert id + target_case_id (int): Target case id + + Returns: + ApiResponse: Response object + """ + payload = { + "target_case_id": target_case_id + } + + return self._s.pi_post(f"alerts/unmerge/{alert_id}", data=payload) + diff --git a/dfir_iris_client/tests/test_alert.py b/dfir_iris_client/tests/test_alert.py index 3cc8428..222ca42 100644 --- a/dfir_iris_client/tests/test_alert.py +++ b/dfir_iris_client/tests/test_alert.py @@ -319,3 +319,24 @@ def test_merge_alert_non_existent_alert(self): assert bool(assert_api_resp(resp)) is False + def test_unmerge_alert(self): + """ Test unmerging an alert """ + alert_data = load_alert_data() + + resp = self.alert.add_alert(alert_data) + assert bool(assert_api_resp(resp)) is True + + alert_id = resp.get_data_field('alert_id') + + iocs = resp.get_data_field('iocs') + + asset = resp.get_data_field('assets') + + resp = self.alert.merge_alert(alert_id, iocs_import_list=[iocs[0].get('ioc_uid')], + assets_import_list=[asset[0].get('asset_uid')], merge_note='test', + import_as_event=True, target_case_id=1) + + assert bool(assert_api_resp(resp)) is True + + resp = self.alert.unmerge_alert(alert_id, 1) + assert bool(assert_api_resp(resp)) is True From de90ee0ad360e35d1225fcbc2a8d2e381b0ff317 Mon Sep 17 00:00:00 2001 From: whikernel Date: Thu, 4 May 2023 15:08:06 +0200 Subject: [PATCH 10/12] [ADD] Filtering of alerts --- dfir_iris_client/alert.py | 79 ++++++++ dfir_iris_client/tests/resources/alert.json | 73 +------- dfir_iris_client/tests/test_alert.py | 190 +++++++++++++++++++- 3 files changed, 269 insertions(+), 73 deletions(-) diff --git a/dfir_iris_client/alert.py b/dfir_iris_client/alert.py index ba5c8f7..ce2d4ff 100644 --- a/dfir_iris_client/alert.py +++ b/dfir_iris_client/alert.py @@ -166,3 +166,82 @@ def unmerge_alert(self, alert_id: int, target_case_id: int) -> ApiResponse: return self._s.pi_post(f"alerts/unmerge/{alert_id}", data=payload) + def filter_alerts(self, alert_title: str = None, alert_description: str = None, alert_source: str = None, + alert_tags: str = None, alert_status_id: int = None, alert_severity_id: int = None, + alert_classification_id: int = None, alert_customer_id: int = None, alert_start_date: str = None, + alert_end_date: str = None, alert_assets: str = None, alert_iocs: str = None, alert_ids: str = None, + case_id: int = None, alert_owner_id: int = None, + page: int = 1, per_page: int = 20, sort: str = 'desc') -> ApiResponse: + """ Filter alerts + + Args: + alert_title (str): Alert title + alert_description (str): Alert description + alert_source (str): Alert source + alert_tags (str): Alert tags + alert_status_id (int): Alert status id + alert_severity_id (int): Alert severity id + alert_classification_id (int): Alert classification id + alert_customer_id (int): Alert customer id + alert_start_date (str): Alert start date + alert_end_date (str): Alert end date + alert_assets (str): Alert assets + alert_iocs (str): Alert IOCs + alert_ids (str): Alert ids + case_id (int): Case id + alert_owner_id (int): Alert owner id + page (int): Page number + per_page (int): Number of alerts per page + sort (str): Sort order + + + Returns: + ApiResponse: Response object + """ + uri = f"alerts/filter?page={page}&per_page={per_page}&sort={sort}" + if alert_title: + uri += f"&alert_title={alert_title}" + + if alert_description: + uri += f"&alert_description={alert_description}" + + if alert_source: + uri += f"&alert_source={alert_source}" + + if alert_tags: + uri += f"&alert_tags={alert_tags}" + + if alert_status_id: + uri += f"&alert_status_id={alert_status_id}" + + if alert_severity_id: + uri += f"&alert_severity_id={alert_severity_id}" + + if alert_classification_id: + uri += f"&alert_classification_id={alert_classification_id}" + + if alert_customer_id: + uri += f"&alert_customer_id={alert_customer_id}" + + if alert_start_date: + uri += f"&alert_start_date={alert_start_date}" + + if alert_end_date: + uri += f"&alert_end_date={alert_end_date}" + + if alert_assets: + uri += f"&alert_assets={alert_assets}" + + if alert_iocs: + uri += f"&alert_iocs={alert_iocs}" + + if alert_ids: + uri += f"&alert_ids={alert_ids}" + + if case_id: + uri += f"&case_id={case_id}" + + if alert_owner_id: + uri += f"&alert_owner_id={alert_owner_id}" + + return self._s.pi_get(uri) diff --git a/dfir_iris_client/tests/resources/alert.json b/dfir_iris_client/tests/resources/alert.json index db79d77..a006f7c 100644 --- a/dfir_iris_client/tests/resources/alert.json +++ b/dfir_iris_client/tests/resources/alert.json @@ -4,78 +4,7 @@ "alert_source": "Test Source", "alert_source_ref": "Test-123", "alert_source_link": "https://source_link.com", - "alert_source_content": { - "_id": "603f704aaf7417985bbf3b22", - "contextId": "206e2965-6533-48a6-ba9e-794364a84bf9", - "description": "Contoso user performed 11 suspicious activities MITRE Technique used Account Discovery (T1087) and subtechnique used Domain Account (T1087.002)", - "entities": [ - { - "entityRole": "Source", - "entityType": 2, - "id": "6204bdaf-ad46-4e99-a25d-374a0532c666", - "inst": 0, - "label": "user1", - "pa": "user1@contoso.com", - "saas": 11161, - "type": "account" - }, - { - "entityRole": "Related", - "id": "55017817-27af-49a7-93d6-8af6c5030fdb", - "label": "DC3", - "type": "device" - }, - { - "id": 20940, - "label": "Active Directory", - "type": "service" - }, - { - "entityRole": "Related", - "id": "95c59b48-98c1-40ff-a444-d9040f1f68f2", - "label": "DC4", - "type": "device" - }, - { - "id": "5bfd18bfab73c36ba10d38ca", - "label": "Honeytoken activity", - "policyType": "ANOMALY_DETECTION", - "type": "policyRule" - }, - { - "entityRole": "Source", - "id": "34f3ecc9-6903-4df7-af79-14fe2d0d4553", - "label": "Client1", - "type": "device" - }, - { - "entityRole": "Related", - "id": "d68772fe-1171-4124-9f73-0f410340bd54", - "label": "DC1", - "type": "device" - }, - { - "type": "groupTag", - "id": "5f759b4d106abbe4a504ea5d", - "label": "All Users" - } - ], - "idValue": 15795464, - "isSystemAlert": false, - "resolutionStatusValue": 0, - "severityValue": 5, - "statusValue": 1, - "stories": [ - 0 - ], - "threatScore": 34, - "timestamp": 1621941916475, - "title": "Honeytoken activity", - "comment": "", - "handledByUser": "administrator@contoso.com", - "resolveTime": "2021-05-13T14:02:34.904Z", - "URL": "https://contoso.portal.cloudappsecurity.com/#/alerts/603f704aaf7417985bbf3b22" - }, + "alert_source_content": {}, "alert_severity_id": 4, "alert_status_id": 3, "alert_context": { diff --git a/dfir_iris_client/tests/test_alert.py b/dfir_iris_client/tests/test_alert.py index 222ca42..a0cf557 100644 --- a/dfir_iris_client/tests/test_alert.py +++ b/dfir_iris_client/tests/test_alert.py @@ -264,7 +264,6 @@ def test_merge_alert(self): data = get_data_from_resp(resp) assert 'classification_id' in data assert 'case_uuid' in data - assert 'case_name' in data and 'test' in data['case_name'] assert 'case_id' in data assert 'case_customer' in data assert 'modification_history' in data @@ -340,3 +339,192 @@ def test_unmerge_alert(self): resp = self.alert.unmerge_alert(alert_id, 1) assert bool(assert_api_resp(resp)) is True + + def test_unmerge_alert_non_existent_alert(self): + """ Test unmerging a non-existent alert """ + non_existent_alert_id = -1 + + resp = self.alert.unmerge_alert(non_existent_alert_id, 1) + assert bool(assert_api_resp(resp)) is False + + def test_unmerge_alert_non_existent_case(self): + """ Test unmerging a non-existent case """ + alert_data = load_alert_data() + + resp = self.alert.add_alert(alert_data) + assert bool(assert_api_resp(resp)) is True + + alert_id = resp.get_data_field('alert_id') + + resp = self.alert.unmerge_alert(alert_id, -1) + assert bool(assert_api_resp(resp)) is False + + def test_filter_alerts(self): + """ Test filtering alerts """ + resp = self.alert.filter_alerts() + assert bool(assert_api_resp(resp)) is True + + def test_filter_alerts_with_alert_title(self): + """ Test filtering alerts with filter """ + alert_data = load_alert_data() + + resp = self.alert.add_alert(alert_data) + assert bool(assert_api_resp(resp)) is True + + resp = self.alert.filter_alerts(alert_title='test') + + assert bool(assert_api_resp(resp)) is True + + def test_filter_alerts_with_invalid_alert_title(self): + """ Test filtering alerts with invalid filter """ + resp = self.alert.filter_alerts(alert_title='INVALID') + + assert bool(assert_api_resp(resp)) is True + + def test_filter_alerts_with_alert_description(self): + """ Test filtering alerts with alert_description filter """ + alert_data = load_alert_data() + + resp = self.alert.add_alert(alert_data) + assert bool(assert_api_resp(resp)) is True + + resp = self.alert.filter_alerts(alert_description='This is a test alert') + + assert bool(assert_api_resp(resp)) is True + + def test_filter_alerts_with_alert_source(self): + """ Test filtering alerts with alert_source filter """ + alert_data = load_alert_data() + + resp = self.alert.add_alert(alert_data) + assert bool(assert_api_resp(resp)) is True + + resp = self.alert.filter_alerts(alert_source='Test Source') + + assert bool(assert_api_resp(resp)) is True + + def test_filter_alerts_with_alert_tags(self): + """ Test filtering alerts with alert_tags filter """ + alert_data = load_alert_data() + + resp = self.alert.add_alert(alert_data) + assert bool(assert_api_resp(resp)) is True + + resp = self.alert.filter_alerts(alert_tags='defender') + + assert bool(assert_api_resp(resp)) is True + + def test_filter_alerts_with_alert_status_id(self): + """ Test filtering alerts with alert_status_id filter """ + alert_data = load_alert_data() + + resp = self.alert.add_alert(alert_data) + assert bool(assert_api_resp(resp)) is True + + resp = self.alert.filter_alerts(alert_status_id=3) + + assert bool(assert_api_resp(resp)) is True + + def test_filter_alerts_with_alert_severity_id(self): + """ Test filtering alerts with alert_severity_id filter """ + alert_data = load_alert_data() + + resp = self.alert.add_alert(alert_data) + assert bool(assert_api_resp(resp)) is True + + resp = self.alert.filter_alerts(alert_severity_id=4) + + assert bool(assert_api_resp(resp)) is True + + def test_filter_alerts_with_alert_classification_id(self): + """ Test filtering alerts with alert_classification_id filter """ + alert_data = load_alert_data() + + resp = self.alert.add_alert(alert_data) + assert bool(assert_api_resp(resp)) is True + + resp = self.alert.filter_alerts(alert_classification_id=1) + + assert bool(assert_api_resp(resp)) is True + + def test_filter_alerts_with_alert_customer_id(self): + """ Test filtering alerts with alert_customer_id filter """ + alert_data = load_alert_data() + + resp = self.alert.add_alert(alert_data) + assert bool(assert_api_resp(resp)) is True + + resp = self.alert.filter_alerts(alert_customer_id=1) + + assert bool(assert_api_resp(resp)) is True + + def test_filter_alerts_with_alert_start_date_end_date(self): + """ Test filtering alerts with alert_start_date and alert_end_date filter """ + alert_data = load_alert_data() + + resp = self.alert.add_alert(alert_data) + assert bool(assert_api_resp(resp)) is True + + alert_start_date = "2023-03-25" + alert_end_date = "2023-03-27" + resp = self.alert.filter_alerts(alert_start_date=alert_start_date, alert_end_date=alert_end_date) + + assert bool(assert_api_resp(resp)) is True + + def test_filter_alerts_with_alert_assets(self): + """ Test filtering alerts with alert_assets filter """ + alert_data = load_alert_data() + + resp = self.alert.add_alert(alert_data) + assert bool(assert_api_resp(resp)) is True + + resp = self.alert.filter_alerts(alert_assets="My super nop") + + assert bool(assert_api_resp(resp)) is True + + def test_filter_alerts_with_alert_iocs(self): + """ Test filtering alerts with alert_iocs filter """ + alert_data = load_alert_data() + + resp = self.alert.add_alert(alert_data) + assert bool(assert_api_resp(resp)) is True + + resp = self.alert.filter_alerts(alert_iocs="tarzan5") + + assert bool(assert_api_resp(resp)) is True + + def test_filter_alerts_with_alert_ids(self): + """ Test filtering alerts with alert_ids filter """ + alert_data = load_alert_data() + + resp = self.alert.add_alert(alert_data) + assert bool(assert_api_resp(resp)) is True + + alert_id = resp.get_data_field('alert_id') + resp = self.alert.filter_alerts(alert_ids=str(alert_id)) + + assert bool(assert_api_resp(resp)) is True + + def test_filter_alerts_with_case_id(self): + """ Test filtering alerts with case_id filter """ + alert_data = load_alert_data() + + resp = self.alert.add_alert(alert_data) + assert bool(assert_api_resp(resp)) is True + + case_id = 1 + resp = self.alert.filter_alerts(case_id=case_id) + + assert bool(assert_api_resp(resp)) is True + + def test_filter_alerts_with_alert_owner_id(self): + """ Test filtering alerts with alert_owner_id filter """ + alert_data = load_alert_data() + + resp = self.alert.add_alert(alert_data) + assert bool(assert_api_resp(resp)) is True + + alert_owner_id = 1 + resp = self.alert.filter_alerts(alert_owner_id=alert_owner_id) + + assert bool(assert_api_resp(resp)) is True From 940cf9aa7b153e09363fd50d13b57844b80892e5 Mon Sep 17 00:00:00 2001 From: whikernel Date: Thu, 4 May 2023 15:28:54 +0200 Subject: [PATCH 11/12] [ADD] Alerts status --- dfir_iris_client/helper/alert_status.py | 68 +++++++++++++++++++++ dfir_iris_client/helper/utils.py | 20 +++++- dfir_iris_client/tests/test_alert_status.py | 60 ++++++++++++++++++ 3 files changed, 146 insertions(+), 2 deletions(-) create mode 100644 dfir_iris_client/helper/alert_status.py create mode 100644 dfir_iris_client/tests/test_alert_status.py diff --git a/dfir_iris_client/helper/alert_status.py b/dfir_iris_client/helper/alert_status.py new file mode 100644 index 0000000..3bd46bf --- /dev/null +++ b/dfir_iris_client/helper/alert_status.py @@ -0,0 +1,68 @@ +# IRIS Client API Source Code +# contact@dfir-iris.org +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 3 of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with this program; if not, write to the Free Software Foundation, +# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +from typing import Union + +from dfir_iris_client.helper.utils import ApiResponse + + +class AlertStatusHelper(object): + """Handles the alert status methods""" + + def __init__(self, session): + self._s = session + + def list_alert_status_types(self) -> ApiResponse: + """ + Returns a list of all alert statuses + + Args: + + Returns: + APIResponse object + """ + return self._s.pi_get('manage/alert-status/list') + + def lookup_alert_status_name(self, alert_status_name: str) -> Union[int, None]: + """ + Returns an alert status ID from its name otherwise None + + Args: + alert_status_name: str: + + Returns: + Union[int, None] - alert status ID matching provided alert status name or None if not found + + """ + ast_list = self.list_alert_status_types() + for ast in ast_list.get_data(): + if ast.get('status_name').lower() == alert_status_name.lower(): + return ast.get('status_id') + + return None + + def get_alert_status(self, alert_status_id: int) -> ApiResponse: + """ + Returns an alert status from its ID + + Args: + alert_status_id: int: + + Returns: + APIResponse object + + """ + return self._s.pi_get('manage/alert-status/{}'.format(alert_status_id)) \ No newline at end of file diff --git a/dfir_iris_client/helper/utils.py b/dfir_iris_client/helper/utils.py index ef32f2d..06dc8db 100644 --- a/dfir_iris_client/helper/utils.py +++ b/dfir_iris_client/helper/utils.py @@ -142,11 +142,27 @@ def get_data(self): return self._response.get('data') - def get_data_field(self, field: Union[List[str], str]): - """ """ + def get_data_field(self, field: Union[List[str], str], index: int = None): + """ + Return the value of a field in the data section of the response + + Args: + field: Field to return + index: Index of the data section to return (Default value = None). Allows to iterate over a list of data + + Returns: + Value of the field + """ if not hasattr(self, "_response"): return None + if index is not None: + if isinstance(field, str): + return self._response.get('data')[index].get(field) + + if isinstance(field, list): + return reduce(lambda d, key: d.get(key) if d else None, field, self._response.get('data')[index]) + if isinstance(field, str): return self._response.get('data').get(field) diff --git a/dfir_iris_client/tests/test_alert_status.py b/dfir_iris_client/tests/test_alert_status.py new file mode 100644 index 0000000..91e1446 --- /dev/null +++ b/dfir_iris_client/tests/test_alert_status.py @@ -0,0 +1,60 @@ +# IRIS Client API Source Code +# contact@dfir-iris.org +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 3 of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with this program; if not, write to the Free Software Foundation, +# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + +from dfir_iris_client.helper.alert_status import AlertStatusHelper +from dfir_iris_client.helper.utils import assert_api_resp, get_data_from_resp, parse_api_data +from dfir_iris_client.tests.tests_helper import InitIrisClientTest + + +class AlertStatusTest(InitIrisClientTest): + """ """ + + def setUp(self) -> None: + """ """ + self.asset_type = AlertStatusHelper(self.session) + + def test_list_alert_status(self): + """ """ + ret = self.asset_type.list_alert_status_types() + + assert assert_api_resp(ret) + + assert ret.get_data_field('status_name', index=0) is not None + assert ret.get_data_field('status_description', index=0) is not None + assert ret.get_data_field('status_id', index=0) is not None + + def test_get_alert_status_by_id(self): + """ """ + ret = self.asset_type.list_alert_status_types() + + assert assert_api_resp(ret, soft_fail=False) + + ret = self.asset_type.get_alert_status(ret.get_data_field('status_id', index=0)) + assert assert_api_resp(ret, soft_fail=False) + + assert ret.get_data_field('status_name') is not None + assert ret.get_data_field('status_description') is not None + assert ret.get_data_field('status_id') is not None + + def test_get_alert_status_by_name(self): + """ """ + ret = self.asset_type.list_alert_status_types() + + assert assert_api_resp(ret, soft_fail=False) + + ret = self.asset_type.lookup_alert_status_name(ret.get_data_field('status_name', index=0)) + assert ret is not None From b3896eda9813c8581c0c08e3b59ab118aae17e03 Mon Sep 17 00:00:00 2001 From: whikernel Date: Mon, 8 May 2023 09:38:16 +0200 Subject: [PATCH 12/12] [UPD] Updated for API v2.0.1 --- dfir_iris_client/admin.py | 18 ++++++++++++++---- dfir_iris_client/session.py | 2 +- dfir_iris_client/tests/test_admin.py | 2 +- 3 files changed, 16 insertions(+), 6 deletions(-) diff --git a/dfir_iris_client/admin.py b/dfir_iris_client/admin.py index 17b318b..d94525b 100644 --- a/dfir_iris_client/admin.py +++ b/dfir_iris_client/admin.py @@ -98,7 +98,8 @@ def get_user(self, user: Union[int, str], **kwargs) -> ApiResponse: return self._s.pi_get(f'manage/users/{user}') - def add_user(self, login: str, name: str, password: str, email: str, **kwargs) -> ApiResponse: + def add_user(self, login: str, name: str, password: str, email: str, is_service_account: bool = False, + **kwargs) -> ApiResponse: """ Adds a new user. A new user can be successfully added if @@ -113,6 +114,7 @@ def add_user(self, login: str, name: str, password: str, email: str, **kwargs) - name: Full name of the user password: Password of the user email: Email of the user + is_service_account: True if the user is a service account Returns: ApiResponse @@ -123,6 +125,7 @@ def add_user(self, login: str, name: str, password: str, email: str, **kwargs) - "user_name": name, "user_password": password, "user_email": email, + "user_is_service_account": is_service_account, "cid": 1 } @@ -562,21 +565,28 @@ def update_asset_type(self, asset_type_id: int, name: str = None, } return self._s.pi_post(f'manage/asset-type/update/{asset_type_id}', data=body) - def add_customer(self, customer_name: str): + def add_customer(self, customer_name: str, customer_description: str = None, + customer_sla: str = None, custom_attributes: dict = {}) -> ApiResponse: """ Creates a new customer. A new customer can be added if: - customer_name is unique Args: - customer_name: Name of the customer to add. + customer_name: Name of the customer to add. + customer_description: Description of the customer + customer_sla: SLA of the customer + custom_attributes: Custom attributes of the customer Returns: ApiResponse object """ body = { - "customer_name": customer_name.lower() + "customer_name": customer_name.lower(), + "customer_description": customer_description, + "customer_sla": customer_sla, + "custom_attributes": custom_attributes } resp = self._s.pi_post('manage/customers/add', data=body) diff --git a/dfir_iris_client/session.py b/dfir_iris_client/session.py index f85a487..e523b60 100644 --- a/dfir_iris_client/session.py +++ b/dfir_iris_client/session.py @@ -34,7 +34,7 @@ Server has an endpoint /api/versions which should returns the API compatible versions it can handles. """ -API_VERSION = "2.0.0" +API_VERSION = "2.0.1" """client_session Defines a global session, accessible by all classes. client_session is of type ClientSession. diff --git a/dfir_iris_client/tests/test_admin.py b/dfir_iris_client/tests/test_admin.py index fcd1a70..cc01ada 100644 --- a/dfir_iris_client/tests/test_admin.py +++ b/dfir_iris_client/tests/test_admin.py @@ -206,7 +206,7 @@ def test_update_case_classification_invalid(self): def test_add_customer_valid(self): """ """ - ret = self.adm.add_customer('dummy customer') + ret = self.adm.add_customer('dummy customer', 'dummy description', 'dummy sla') if parse_api_data(ret.get_data(), 'customer_name') == ['Customer already exists']: ret = self.adm.delete_customer('dummy customer')