Skip to content

Commit

Permalink
increase test coverage on indirect files
Browse files Browse the repository at this point in the history
  • Loading branch information
ysindel committed Nov 27, 2024
1 parent a9ba401 commit 96def3c
Show file tree
Hide file tree
Showing 4 changed files with 234 additions and 1 deletion.
5 changes: 5 additions & 0 deletions Sekoia.io/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## Unreleased
## 2024-11-27 - 2.65.4

### Changed

- Add list, create and delete actions using asset management V2

## 2024-11-14 - 2.65.1

Expand Down
2 changes: 1 addition & 1 deletion Sekoia.io/manifest.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
"name": "Sekoia.io",
"uuid": "92d8bb47-7c51-445d-81de-ae04edbb6f0a",
"slug": "sekoia.io",
"version": "2.65.3",
"version": "2.65.4",
"categories": [
"Generic"
]
Expand Down
167 changes: 167 additions & 0 deletions Sekoia.io/tests/ic_oc_triggers/test_alerts.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,27 @@ def test_single_event_triggers(alert_created_trigger, sample_sicalertapi_mock, s
alert_created_trigger.send_event.assert_called_once()


def test_single_event_triggers_without_alert_uuid(alert_created_trigger, sample_sicalertapi_mock):
alert_created_trigger.send_event = MagicMock()
notification_without_alert_uuid = {
"metadata": {
"version": 2,
"uuid": "a8fb31cb-7310-4f59-afc2-d52033b5cf78",
"created_at": "2019-09-06T07:32:03.256679+00:00",
"community_uuid": "cc93fe3f-c26b-4eb1-82f7-082209cf1892",
},
"type": "alert",
"action": "created",
"attributes": {
"short_id": "ALakbd8NXp9W",
},
}
with sample_sicalertapi_mock:
# Calling the trigger with an alert created notification should create an event
alert_created_trigger.handle_event(notification_without_alert_uuid)
alert_created_trigger.send_event.assert_not_called()


def test_single_event_triggers_updated(
alert_created_trigger,
sample_sicalertapi_mock,
Expand Down Expand Up @@ -244,6 +265,85 @@ def test_single_event_triggers_comments_added(
trigger.send_event.assert_called_once()


def test_invalid_events_dont_triggers_comments_added(
module_configuration,
symphony_storage,
sample_sicalertapi,
samplenotif_alert_comment_created,
):
trigger = AlertCommentCreatedTrigger()
trigger.configuration = {}
trigger._data_path = symphony_storage
trigger.module.configuration = module_configuration
trigger.module._community_uuid = "cc93fe3f-c26b-4eb1-82f7-082209cf1892"
trigger.send_event = MagicMock()
trigger.log = Mock()

invalid_notification = {
"metadata": {
"version": 2,
"community_uuid": "6ffbe55b-d30a-4dc4-bc52-a213dce0af29",
"uuid": "94ef1f9d-ebad-42ba-98d7-2be3447c6bd0",
"created_at": "2019-09-06T07:07:54.830677+00:00",
},
"type": "alert-comment",
"action": "created",
"attributes": {
"content": "comment",
"created_by": "c110d686-0b45-4ae7-b917-f15486d0f8c7",
"created_by_type": "user",
"alert_short_id": "ALakbd8NXp9W",
},
}
# Calling the trigger with an alert comment missing alert_uuid should not create an event
trigger.handle_event(invalid_notification)
trigger.send_event.assert_not_called()
invalid_notification["attributes"]["alert_uuid"] = "5869b4d8-e3bb-4465-baad-95daf28267c7"
# Calling the trigger with alert comment uuid missing comment_uuid should not create an event
trigger.handle_event(invalid_notification)
trigger.send_event.assert_not_called()

# now calling the trigger with a valid notification but with a 404 response from the API
with requests_mock.Mocker() as mock, patch("tenacity.nap.time"):
mock.get("http://fake.url/api/v1/sic/alerts/5869b4d8-e3bb-4465-baad-95daf28267c7", json={}, status_code=404)

trigger.log.assert_not_called()
invalid_notification["attributes"]["uuid"] = "5869b4d8-e3bb-4465-baad-95daf28267c7"
trigger.handle_event(invalid_notification)
trigger.log.assert_called()
trigger.log.reset_mock()
# now making the second api call fail
mock.get("http://fake.url/api/v1/sic/alerts/5869b4d8-e3bb-4465-baad-95daf28267c7", json=sample_sicalertapi)
mock.get(
f"http://fake.url/api/v1/sic/alerts/5869b4d8-e3bb-4465-baad-95daf28267c7/comments/5869b4d8-e3bb-4465-baad-95daf28267c7",
json={},
status_code=404,
)
trigger.log.assert_not_called()
trigger.handle_event(invalid_notification)
trigger.log.assert_called()
trigger.log.reset_mock()
# now making the second api call return a non json response
mock.get("http://fake.url/api/v1/sic/alerts/5869b4d8-e3bb-4465-baad-95daf28267c7", json=sample_sicalertapi)
mock.get(
f"http://fake.url/api/v1/sic/alerts/5869b4d8-e3bb-4465-baad-95daf28267c7/comments/5869b4d8-e3bb-4465-baad-95daf28267c7",
text="not json",
status_code=404,
)
trigger.log.assert_not_called()
trigger.handle_event(invalid_notification)
trigger.log.assert_called()
trigger.log.reset_mock()
# now making the second api call return a non json response
mock.get(
f"http://fake.url/api/v1/sic/alerts/5869b4d8-e3bb-4465-baad-95daf28267c7/comments/5869b4d8-e3bb-4465-baad-95daf28267c7",
text="not json",
status_code=200,
)
trigger.handle_event(invalid_notification)
trigger.log.assert_called()


def test_alert_trigger_filter_by_rule(
alert_trigger, samplenotif_alert_created, sample_sicalertapi_mock, sample_sicalertapi
):
Expand All @@ -265,3 +365,70 @@ def test_alert_trigger_filter_by_rule(
alert_trigger.configuration = {"rule_filter": sample_sicalertapi["rule"]["uuid"]}
alert_trigger.handle_event(samplenotif_alert_created)
assert alert_trigger.send_event.called


def test_comment_trigger_filter_by_rule(
alert_created_trigger,
sample_sicalertapi,
module_configuration,
symphony_storage,
samplenotif_alert_comment_created,
sample_notifications,
):

trigger = AlertCommentCreatedTrigger()
trigger.configuration = {}
trigger._data_path = symphony_storage
trigger.module.configuration = module_configuration
trigger.module._community_uuid = "cc93fe3f-c26b-4eb1-82f7-082209cf1892"
trigger.send_event = MagicMock()

alert_uuid = samplenotif_alert_comment_created.get("attributes").get("alert_uuid")
comment_uuid = samplenotif_alert_comment_created.get("attributes").get("uuid")

with requests_mock.Mocker() as mock:
mock.get(f"http://fake.url/api/v1/sic/alerts/{alert_uuid}", json=sample_sicalertapi)

mock.get(
f"http://fake.url/api/v1/sic/alerts/{alert_uuid}/comments/{comment_uuid}",
json={
"uuid": "095be615-a8ad-4c33-8e9c-c7612fbf6c9f",
"content": "string",
"author": "string",
"date": 0,
"created_by": "string",
"created_by_type": "string",
"unseen": True,
},
)

trigger.configuration = {"rule_filter": "test"}
trigger.handle_event(samplenotif_alert_comment_created)
trigger.send_event.assert_not_called()

trigger.configuration = {"rule_filter": sample_sicalertapi["rule"]["uuid"]}
trigger.handle_event(samplenotif_alert_comment_created)
trigger.send_event.assert_called_once()


def test_comment_trigger_filter_notification_function(
alert_created_trigger,
sample_sicalertapi,
module_configuration,
symphony_storage,
samplenotif_alert_comment_created,
sample_notifications,
):

trigger = AlertCommentCreatedTrigger()
trigger.configuration = {}
trigger._data_path = symphony_storage
trigger.module.configuration = module_configuration
trigger.module._community_uuid = "cc93fe3f-c26b-4eb1-82f7-082209cf1892"
trigger.send_event = MagicMock()
# mock the filter function to return false
trigger._filter_notifications = MagicMock()
trigger._filter_notifications.return_value = False

trigger.handle_event(samplenotif_alert_comment_created)
trigger.send_event.assert_not_called()
61 changes: 61 additions & 0 deletions Sekoia.io/tests/operation_center_action/test_get_asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,16 @@ def test_get_asset_by_uuid(requests_mock):
}
results: dict = action.run(arguments)
assert results == expected_result
# add test for high criticity
response["criticality"] = 70
results: dict = action.run(arguments)
assert results["criticity"]["display"] == "high"
assert results["criticity"]["value"] == 70
# add test for medium criticity
response["criticality"] = 50
results = action.run(arguments)
assert results["criticity"]["display"] == "medium"
assert results["criticity"]["value"] == 50


def test_get_asset_by_uuid_returns_none_if_http_error(requests_mock):
Expand All @@ -73,3 +79,58 @@ def test_get_asset_by_uuid_returns_none_if_http_error(requests_mock):

results: dict = action.run(arguments)
assert results == None


def test_get_asset_transforms_criticity_levels(requests_mock):
action = GetAsset()
action.module.configuration = {"base_url": module_base_url, "api_key": apikey}
asset_uuid = uuid.uuid4()
arguments = {"uuid": str(asset_uuid)}
response = {
"uuid": "00000000-0000-0000-0000-000000000123",
"entity_uuid": "00000000-0000-0000-0000-000000000000",
"community_uuid": "00000000-0000-0000-0000-000000000000",
"name": "test get asset",
"type": "network",
"criticality": 10,
"created_at": "2024-10-09T17:30:11Z",
"created_by": "abcdefab-5be7-4143-b936-7dc9eab3aeaf",
"created_by_type": "avatar",
"updated_at": "2024-10-11T17:30:11Z",
"nb_atoms": 1,
"atoms": {
"cidrv6": [],
"cidrv4": ["10.100.100.0/24"],
},
"props": {
"asn": "13336",
},
"tags": [],
"revoked": False,
"reviewed": False,
"source": "manual",
"description": "test get asset action",
"pending_recommendations": [],
}
requests_mock.get(base_url + str(asset_uuid), json=response)

expected_result = {
"uuid": "00000000-0000-0000-0000-000000000123",
"name": "test get asset",
"category": {"uuid": "1c646cb3-54c3-44cb-88d3-2db24de2fcf4", "name": "technical"},
"description": "test get asset action",
"criticity": {"value": 10, "display": "low"},
"asset_type": {"uuid": "4aac4b72-14cf-4159-a4e5-32fa8c1f3da6", "name": "network"},
"community_uuid": "00000000-0000-0000-0000-000000000000",
"owners": [],
"keys": [{"uuid": "e2440ef6-02af-4da0-ac7a-511821033d74", "name": "cidr-v4", "value": "10.100.100.0/24"}],
"attributes": [{"uuid": "7a39c0cf-e470-4a06-afba-da2009fdc7d1", "name": "as", "value": "13336"}],
"created_at": datetime.fromisoformat("2024-10-09T17:30:11Z"),
"updated_at": datetime.fromisoformat("2024-10-11T17:30:11Z"),
}
results: dict = action.run(arguments)
assert results == expected_result
response["criticality"] = 70
results: dict = action.run(arguments)
assert results["criticity"]["display"] == "high"
assert results["criticity"]["value"] == 70

0 comments on commit 96def3c

Please sign in to comment.