Skip to content

Commit

Permalink
Merge pull request #1068 from SEKOIA-IO/fix/SentinelOneCursor
Browse files Browse the repository at this point in the history
SentinelOne: Fix the usage of the cursor in the connector
  • Loading branch information
squioc authored Aug 7, 2024
2 parents 6800730 + 887bd1d commit 11c55e9
Show file tree
Hide file tree
Showing 6 changed files with 141 additions and 64 deletions.
11 changes: 11 additions & 0 deletions SentinelOne/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,17 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## 2024-08-07 - 1.17.2

### Changed

- externalize the method to get the most recent date seen from the events

### Fixed

- fix the interactions with the context to be thread-safe
- use the most recent date seen when query new events

## 2024-08-06 - 1.17.1

### Changed
Expand Down
2 changes: 1 addition & 1 deletion SentinelOne/manifest.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@
"name": "SentinelOne",
"uuid": "ff675e74-e5c1-47c8-a571-d207fc297464",
"slug": "sentinelone",
"version": "1.17.1"
"version": "1.17.2"
}
107 changes: 50 additions & 57 deletions SentinelOne/sentinelone_module/logs/connector.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import json
from datetime import UTC, datetime, timedelta
from functools import cached_property
from threading import Event, Thread
from threading import Event, Thread, Lock
from time import sleep, time

from dateutil.parser import isoparse
Expand All @@ -18,6 +18,7 @@
from sentinelone_module.logging import get_logger
from sentinelone_module.logs.configuration import SentinelOneLogsConnectorConfiguration
from sentinelone_module.logs.metrics import EVENTS_LAG, FORWARD_EVENTS_DURATION, OUTCOMING_EVENTS, INCOMING_MESSAGES
from sentinelone_module.logs.helpers import get_latest_event_timestamp

logger = get_logger()

Expand All @@ -27,15 +28,15 @@ class SentinelOneLogsConsumer(Thread):
Each endpoint of SentinelOne logs API is consumed in its own separate thread.
"""

def __init__(self, connector: "SentinelOneLogsConnector"):
def __init__(self, connector: "SentinelOneLogsConnector", consumer_type: str):
super().__init__()

self.context = PersistentJSON("context.json", connector._data_path)
self.log = connector.log
self.log_exception = connector.log_exception
self.configuration = connector.configuration
self.connector = connector
self.module = connector.module
self.consumer_type = consumer_type

self._stop_event = Event()

Expand All @@ -62,7 +63,7 @@ def management_client(self):
return Management(hostname=self.module.configuration.hostname, api_token=self.module.configuration.api_token)

@property
def _cache_last_event_date(self) -> datetime:
def most_recent_date_seen(self) -> datetime:
"""
Get last event date.
Expand All @@ -72,8 +73,8 @@ def _cache_last_event_date(self) -> datetime:
now = datetime.now(UTC)
one_day_ago = (now - timedelta(days=1)).replace(microsecond=0)

with self.context as cache:
last_event_date_str = cache.get("last_event_date")
with self.connector.context_lock, self.connector.context as cache:
last_event_date_str = cache.get(self.consumer_type, {}).get("most_recent_date_seen")

# If undefined, retrieve events from the last 1 hour
if last_event_date_str is None:
Expand All @@ -88,6 +89,14 @@ def _cache_last_event_date(self) -> datetime:

return last_event_date

@most_recent_date_seen.setter
def most_recent_date_seen(self, dt: datetime) -> None:
with self.connector.context_lock, self.connector.context as cache:
if self.consumer_type not in cache:
cache[self.consumer_type] = {}

cache[self.consumer_type]["most_recent_date_seen"] = dt.isoformat()

@staticmethod
def _serialize_events(events: list[Activity] | list[Threat] | list[dict]) -> list:
"""Serializes a list of events by generating a dict and converting it to JSON
Expand All @@ -106,32 +115,7 @@ def _serialize_events(events: list[Activity] | list[Threat] | list[dict]) -> lis
serialized_events.append(non_empty_json_str)
return serialized_events

def _get_latest_event_timestamp(self, events: list[Activity] | list[Threat] | list[dict]) -> datetime:
"""Searches for the most recent timestamp from a list of events
Args:
events (list[Activity] | list[Threat] | list[dict]): List of events to
Returns:
datetime: Timestamp of the most recent event of the list
"""
latest_event_datetime: datetime | None = None
for event in events:
event_dict = event if isinstance(event, dict) else event.__dict__
if event_dict.get("createdAt") is not None:
if latest_event_datetime is None:
latest_event_datetime = datetime.fromisoformat(event_dict["createdAt"])
else:
event_created_at = datetime.fromisoformat(event_dict["createdAt"])
if event_created_at > latest_event_datetime:
latest_event_datetime = event_created_at

if latest_event_datetime is None:
return self._cache_last_event_date
else:
return latest_event_datetime

def pull_events(self) -> list:
def pull_events(self, last_timestamp: datetime | None) -> list:
raise NotImplementedError

def next_batch(self):
Expand All @@ -140,7 +124,7 @@ def next_batch(self):

try:
# get the batch
events_id = self.pull_events()
events_id = self.pull_events(self.most_recent_date_seen)

# get the ending time and compute the duration to fetch the events
batch_end_time = time()
Expand Down Expand Up @@ -184,14 +168,20 @@ def run(self):


class SentinelOneActivityLogsConsumer(SentinelOneLogsConsumer):
def pull_events(self) -> list:
def __init__(self, connector: "SentinelOneLogsConnector"):
super().__init__(connector, "activity")

def pull_events(self, last_timestamp: datetime | None) -> list:
"""Fetches activities from SentinelOne"""
# Set filters
query_filter = ActivitiesFilter()
query_filter.apply(key="limit", val=1000)
query_filter.apply(key="sortBy", val="createdAt")
query_filter.apply(key="sortOrder", val="asc")

if last_timestamp:
query_filter.apply(key="createdAt", val=last_timestamp.isoformat(), op="gt")

events_id = []
while self.running:
# Fetch activities
Expand All @@ -206,23 +196,19 @@ def pull_events(self) -> list:
# Push events
events_id.extend(self.connector.push_events_to_intakes(self._serialize_events(activities.data)))

# Update context with latest event date
latest_event_timestamp = self._get_latest_event_timestamp(activities.data)
with self.context as cache:
cache["last_event_date"] = latest_event_timestamp.isoformat()

# Send Prometheus metrics
OUTCOMING_EVENTS.labels(intake_key=self.configuration.intake_key, datasource="sentinelone").inc(
nb_activities
)

if nb_activities > 0:
EVENTS_LAG.labels(intake_key=self.configuration.intake_key, type="activities").set(
(datetime.now(UTC) - latest_event_timestamp).total_seconds()
)
# Update context with latest event date
current_lag: int = 0
latest_event_timestamp = get_latest_event_timestamp(activities.data)
if latest_event_timestamp is not None:
self.most_recent_date_seen = latest_event_timestamp
current_lag = int((datetime.now(UTC) - latest_event_timestamp).total_seconds())

else:
EVENTS_LAG.labels(intake_key=self.configuration.intake_key, type="activities").set(0)
EVENTS_LAG.labels(intake_key=self.configuration.intake_key, type="activities").set(current_lag)

if activities.pagination["nextCursor"] is None:
break
Expand All @@ -233,13 +219,19 @@ def pull_events(self) -> list:


class SentinelOneThreatLogsConsumer(SentinelOneLogsConsumer):
def pull_events(self):
def __init__(self, connector: "SentinelOneLogsConnector"):
super().__init__(connector, "threat")

def pull_events(self, last_timestamp: datetime | None):
"""Fetches threats from SentinelOne"""
query_filter = ThreatQueryFilter()
query_filter.apply(key="limit", val=1000)
query_filter.apply(key="sortBy", val="createdAt")
query_filter.apply(key="sortOrder", val="asc")

if last_timestamp:
query_filter.apply(key="createdAt", val=last_timestamp.isoformat(), op="gt")

events_id = []
while self.running:
# Fetch threats
Expand All @@ -250,21 +242,17 @@ def pull_events(self):
# Push events
events_id.extend(self.connector.push_events_to_intakes(self._serialize_events(threats.data)))

# Update context with the latest event date
latest_event_timestamp = self._get_latest_event_timestamp(threats.data)
with self.context as cache:
cache["last_event_date"] = latest_event_timestamp.isoformat()

# Send Prometheus metrics
OUTCOMING_EVENTS.labels(intake_key=self.configuration.intake_key, datasource="sentinelone").inc(nb_threats)

if nb_threats > 0:
EVENTS_LAG.labels(intake_key=self.configuration.intake_key, type="threats").set(
(datetime.now(UTC) - latest_event_timestamp).total_seconds()
)
# Update context with the latest event date
current_lag: int = 0
latest_event_timestamp = get_latest_event_timestamp(threats.data)
if latest_event_timestamp is not None:
self.most_recent_date_seen = latest_event_timestamp
current_lag = int((datetime.now(UTC) - latest_event_timestamp).total_seconds())

else:
EVENTS_LAG.labels(intake_key=self.configuration.intake_key, type="threats").set(0)
EVENTS_LAG.labels(intake_key=self.configuration.intake_key, type="threats").set(current_lag)

if threats.pagination["nextCursor"] is None:
break
Expand All @@ -281,6 +269,11 @@ class SentinelOneLogsConnector(Connector):
module: SentinelOneModule
configuration: SentinelOneLogsConnectorConfiguration

def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.context = PersistentJSON("context.json", self._data_path)
self.context_lock = Lock()

def start_consumers(self) -> dict:
"""Starts children threads for each supported type
Expand Down
27 changes: 27 additions & 0 deletions SentinelOne/sentinelone_module/logs/helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from datetime import datetime

from management.mgmtsdk_v2.entities.activity import Activity
from management.mgmtsdk_v2.entities.threat import Threat


def get_latest_event_timestamp(events: list[Activity | Threat | dict]) -> datetime | None:
"""Searches for the most recent timestamp from a list of events
Args:
events (list[Activity | Threat | dict]): List of events to
Returns:
datetime: Timestamp of the most recent event of the list
"""
latest_event_datetime: datetime | None = None
for event in events:
event_dict = event if isinstance(event, dict) else event.__dict__
if event_dict.get("createdAt") is not None:
if latest_event_datetime is None:
latest_event_datetime = datetime.fromisoformat(event_dict["createdAt"])
else:
event_created_at = datetime.fromisoformat(event_dict["createdAt"])
if event_created_at > latest_event_datetime:
latest_event_datetime = event_created_at

return latest_event_datetime
16 changes: 10 additions & 6 deletions SentinelOne/tests/logs/test_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@ class MockResponse:
def test_pull_activities(activity_consumer, activity_1, activity_2):
OUTCOMING_EVENTS.labels = MagicMock()
EVENTS_LAG.labels = MagicMock()
most_recent_datetime_seen = datetime.datetime(2024, 1, 23, 11, 6, 34)

# Test timestamp caching
activity_1.createdAt = None

response_1 = MockResponse(pagination={"nextCursor": "foo"}, data=[activity_1])
response_2 = MockResponse(pagination={"nextCursor": None}, data=[activity_2])
activity_consumer.management_client.activities.get.side_effect = [response_1, response_2]
activity_consumer.pull_events()
activity_consumer.pull_events(most_recent_datetime_seen)

assert activity_consumer.management_client.activities.get.call_count == 2
assert activity_consumer.connector.push_events_to_intakes.call_args_list == [
Expand All @@ -43,20 +44,23 @@ def test_pull_activities(activity_consumer, activity_1, activity_2):
assert EVENTS_LAG.labels(
intake_key=activity_consumer.configuration.intake_key, type="activities"
).set.call_args_list == [
call(86400), # nb of seconds in a day
call((datetime.datetime.now(UTC) - datetime.datetime.fromisoformat(activity_2.createdAt)).total_seconds()),
call(0),
call(
int((datetime.datetime.now(UTC) - datetime.datetime.fromisoformat(activity_2.createdAt)).total_seconds())
),
]


@freeze_time("1970-01-01 00:00:00")
def test_pull_threats(threat_consumer, threat_1, threat_2):
OUTCOMING_EVENTS.labels = MagicMock()
EVENTS_LAG.labels = MagicMock()
most_recent_datetime_seen = datetime.datetime(2024, 1, 23, 11, 6, 34)

response_1 = MockResponse(pagination={"nextCursor": "foo"}, data=[threat_1])
response_2 = MockResponse(pagination={"nextCursor": None}, data=[threat_2])
threat_consumer.management_client.client.get.side_effect = [response_1, response_2]
threat_consumer.pull_events()
threat_consumer.pull_events(most_recent_datetime_seen)

assert threat_consumer.management_client.client.get.call_count == 2
assert threat_consumer.connector.push_events_to_intakes.call_args_list == [
Expand All @@ -70,8 +74,8 @@ def test_pull_threats(threat_consumer, threat_1, threat_2):
assert EVENTS_LAG.labels(
intake_key=threat_consumer.configuration.intake_key, type="threats"
).set.call_args_list == [
call((datetime.datetime.now(UTC) - datetime.datetime.fromisoformat(threat_1.createdAt)).total_seconds()),
call((datetime.datetime.now(UTC) - datetime.datetime.fromisoformat(threat_2.createdAt)).total_seconds()),
call(int((datetime.datetime.now(UTC) - datetime.datetime.fromisoformat(threat_1.createdAt)).total_seconds())),
call(int((datetime.datetime.now(UTC) - datetime.datetime.fromisoformat(threat_2.createdAt)).total_seconds())),
]


Expand Down
42 changes: 42 additions & 0 deletions SentinelOne/tests/logs/test_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from datetime import datetime, timezone

from management.mgmtsdk_v2.entities.activity import Activity
from management.mgmtsdk_v2.entities.threat import Threat
import pytest

from sentinelone_module.logs.helpers import get_latest_event_timestamp


@pytest.mark.parametrize(
"events,expected_datetime",
[
(
[
Activity(createdAt="2024-07-21T11:23:48Z"),
Activity(createdAt="2024-07-25T02:30:11Z"),
Activity(createdAt="2024-07-22T14:56:11Z"),
],
datetime(2024, 7, 25, 2, 30, 11, tzinfo=timezone.utc),
),
(
[
Threat(createdAt="2024-07-21T11:23:48Z"),
Threat(createdAt="2024-07-25T02:30:11Z"),
Threat(createdAt="2024-07-22T14:56:11Z"),
],
datetime(2024, 7, 25, 2, 30, 11, tzinfo=timezone.utc),
),
(
[
dict(createdAt="2024-07-21T11:23:48Z"),
dict(createdAt="2024-07-25T02:30:11Z"),
dict(createdAt="2024-07-22T14:56:11Z"),
],
datetime(2024, 7, 25, 2, 30, 11, tzinfo=timezone.utc),
),
([{}, {}, {}], None),
([], None),
],
)
def test_get_lastest_event_timestamp(events, expected_datetime):
assert get_latest_event_timestamp(events) == expected_datetime

0 comments on commit 11c55e9

Please sign in to comment.