Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WatchGuard - add connector #1166

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .complianceignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@

Sekoia.io/main.py

# In VadeSecure trigger and connector are different
# In these modules trigger and connector are different
VadeSecure/connector_m365_events.json
PandaSecurity/connector_security_events.json
6 changes: 6 additions & 0 deletions PandaSecurity/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

## 2024-11-13 - 1.24.0

### Added

- Added connector

## 2024-10-08 - 1.23.1

### Changed
Expand Down
39 changes: 39 additions & 0 deletions PandaSecurity/aether_endpoint_security_api/client/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import requests
from requests_ratelimiter import LimiterAdapter
from urllib3 import Retry

from .auth import ApiKeyAuthentication


class ApiClient(requests.Session):
def __init__(
self,
base_url: str,
api_key: str,
access_id: str,
access_secret: str,
audience: str | None = None,
nb_retries: int = 5,
# https://www.watchguard.com/help/docs/API/Content/en-US/api_get_started/api_limits.html
rate_limit_per_second: int = 500,
):
super().__init__()
self.auth = ApiKeyAuthentication(
base_url=base_url,
api_key=api_key,
access_id=access_id,
access_secret=access_secret,
audience=audience,
ratelimit_per_second=rate_limit_per_second,
nb_retries=nb_retries,
)
self.mount(
"https://",
LimiterAdapter(
per_second=rate_limit_per_second,
max_retries=Retry(
total=nb_retries,
backoff_factor=1,
),
),
)
103 changes: 103 additions & 0 deletions PandaSecurity/aether_endpoint_security_api/client/auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import base64
from datetime import datetime, timedelta
from urllib.parse import urljoin
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest using from posixpath import join as urljoin instead (see SEKOIA-IO/sekoia-automation-sdk#139)


import requests
from requests.auth import AuthBase
from requests_ratelimiter import LimiterAdapter
from urllib3 import Retry


class ApiCredentials:
token_type: str
access_token: str
expires_at: datetime

@property
def authorization(self) -> str:
return f"{self.token_type.title()} {self.access_token}"


class ApiKeyAuthentication(AuthBase):
def __init__(
self,
base_url: str,
api_key: str,
access_id: str,
access_secret: str,
audience: str | None = None,
nb_retries: int = 5,
ratelimit_per_second: int = 20,
):
self.__base_url = base_url
self.__audience = audience

# Used in requests as is
self.__api_key = api_key

# Used to get auth token
self.__access_id = access_id
self.__access_secret = access_secret

self.__api_credentials: ApiCredentials | None = None
self.__http_session = requests.Session()
self.__authorization_url = urljoin(self.__base_url, "oauth/token")

self.__http_session.mount(
"https://",
LimiterAdapter(
per_second=ratelimit_per_second,
max_retries=Retry(
total=nb_retries,
backoff_factor=1,
),
),
)

def __get_authorization_headers(self) -> dict[str, str]:
digest = base64.b64encode(f"{self.__access_id}:{self.__access_secret}".encode()).decode("utf-8")

return {
"Accept": "application/json",
"Content-Type": "application/x-www-form-urlencoded",
"Authorization": f"Basic {digest}",
}

def __get_authorization_payload(self) -> dict[str, str]:
data = {"grant_type": "client_credentials", "scope": "api-access"}

if self.__audience:
data["audience"] = self.__audience

Check warning on line 70 in PandaSecurity/aether_endpoint_security_api/client/auth.py

View check run for this annotation

Codecov / codecov/patch

PandaSecurity/aether_endpoint_security_api/client/auth.py#L70

Added line #L70 was not covered by tests

return data

def get_credentials(self) -> ApiCredentials:
"""
Return CrowdStrike Credentials for the API
"""
current_dt = datetime.utcnow()

if self.__api_credentials is None or current_dt + timedelta(seconds=300) >= self.__api_credentials.expires_at:
response = self.__http_session.post(
url=self.__authorization_url,
headers=self.__get_authorization_headers(),
data=self.__get_authorization_payload(),
)

response.raise_for_status()

credentials = ApiCredentials()

api_credentials: dict = response.json()
credentials.token_type = api_credentials["token_type"]
credentials.access_token = api_credentials["access_token"]
credentials.expires_at = current_dt + timedelta(seconds=api_credentials["expires_in"])

self.__api_credentials = credentials

return self.__api_credentials

def __call__(self, request):
request.headers["Authorization"] = self.get_credentials().authorization
request.headers["WatchGuard-API-Key"] = self.__api_key
return request

Check warning on line 103 in PandaSecurity/aether_endpoint_security_api/client/auth.py

View check run for this annotation

Codecov / codecov/patch

PandaSecurity/aether_endpoint_security_api/client/auth.py#L101-L103

Added lines #L101 - L103 were not covered by tests
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import collections
import time
from datetime import datetime
from typing import Deque

import orjson
from sekoia_automation.connector import Connector, DefaultConnectorConfiguration

from .metrics import EVENTS_LAG, FORWARD_EVENTS_DURATION, INCOMING_MESSAGES, OUTCOMING_EVENTS
from .security_events_mixin import EVENT_TYPES, SecurityEventsMixin


class AetherSecurityEventsConnectorConfiguration(DefaultConnectorConfiguration):
frequency: int = 43200


class AetherSecurityEventsConnector(SecurityEventsMixin, Connector):
configuration: AetherSecurityEventsConnectorConfiguration
seconds_without_events = 0 # disable check

def _fetch_events(self) -> None:
"""
Successively queries the watchguard aether events pages while more are available
and the current batch is not too big.
"""
for event_type, event_type_name in EVENT_TYPES.items():
# save the starting time
batch_start_time = time.time()

message_batch: Deque[dict] = collections.deque()
has_more_message = True

last_message_date = self.last_message_date[event_type]

self.log(
message=f"Fetching recent Aether '{event_type_name}' messages since {last_message_date}",
level="info",
)

while has_more_message:
has_more_message = False
next_events = self._fetch_next_events(last_message_date=last_message_date, event_type=event_type)
if next_events:
last_message_date = self._get_event_date(next_events[-1])
message_batch.extend(next_events)
has_more_message = True

Check warning on line 46 in PandaSecurity/aether_endpoint_security_api/connector_security_events.py

View check run for this annotation

Codecov / codecov/patch

PandaSecurity/aether_endpoint_security_api/connector_security_events.py#L44-L46

Added lines #L44 - L46 were not covered by tests

if len(message_batch) >= self.max_batch_size:
break

Check warning on line 49 in PandaSecurity/aether_endpoint_security_api/connector_security_events.py

View check run for this annotation

Codecov / codecov/patch

PandaSecurity/aether_endpoint_security_api/connector_security_events.py#L49

Added line #L49 was not covered by tests

if message_batch:
INCOMING_MESSAGES.labels(type=event_type_name, intake_key=self.configuration.intake_key).inc(

Check warning on line 52 in PandaSecurity/aether_endpoint_security_api/connector_security_events.py

View check run for this annotation

Codecov / codecov/patch

PandaSecurity/aether_endpoint_security_api/connector_security_events.py#L52

Added line #L52 was not covered by tests
len(message_batch)
)

self.log(

Check warning on line 56 in PandaSecurity/aether_endpoint_security_api/connector_security_events.py

View check run for this annotation

Codecov / codecov/patch

PandaSecurity/aether_endpoint_security_api/connector_security_events.py#L56

Added line #L56 was not covered by tests
message=f"Send a batch of {len(message_batch)} {event_type} messages",
level="info",
)

# compute the events lag
last_message_timestamp = datetime.strptime(last_message_date, self.RFC3339_STRICT_FORMAT)
events_lag = (datetime.utcnow() - last_message_timestamp).total_seconds()
EVENTS_LAG.labels(type=event_type_name, intake_key=self.configuration.intake_key).set(events_lag)

Check warning on line 64 in PandaSecurity/aether_endpoint_security_api/connector_security_events.py

View check run for this annotation

Codecov / codecov/patch

PandaSecurity/aether_endpoint_security_api/connector_security_events.py#L62-L64

Added lines #L62 - L64 were not covered by tests

batch_of_events = [orjson.dumps(event).decode("utf-8") for event in message_batch]
self.push_events_to_intakes(batch_of_events)

Check warning on line 67 in PandaSecurity/aether_endpoint_security_api/connector_security_events.py

View check run for this annotation

Codecov / codecov/patch

PandaSecurity/aether_endpoint_security_api/connector_security_events.py#L66-L67

Added lines #L66 - L67 were not covered by tests

OUTCOMING_EVENTS.labels(type=event_type_name, intake_key=self.configuration.intake_key).inc(

Check warning on line 69 in PandaSecurity/aether_endpoint_security_api/connector_security_events.py

View check run for this annotation

Codecov / codecov/patch

PandaSecurity/aether_endpoint_security_api/connector_security_events.py#L69

Added line #L69 was not covered by tests
len(message_batch)
)

else:
self.log(
message=f"No {event_type_name} events to forward",
level="info",
)
EVENTS_LAG.labels(type=event_type_name, intake_key=self.configuration.intake_key).set(0)

# get the ending time and compute the duration to fetch the events
batch_end_time = time.time()
batch_duration = int(batch_end_time - batch_start_time)
FORWARD_EVENTS_DURATION.labels(type=event_type_name, intake_key=self.configuration.intake_key).observe(
batch_duration
)

self.log(
message=f"Set last_message_date for Aether '{event_type_name}' to {last_message_date}",
level="info",
)
self.last_message_date[event_type] = last_message_date
10 changes: 5 additions & 5 deletions PandaSecurity/aether_endpoint_security_api/metrics.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from prometheus_client import Counter, Histogram, Gauge
from prometheus_client import Counter, Gauge, Histogram

# Declare prometheus metrics
prom_namespace = "symphony_module_panda_security"
Expand All @@ -7,7 +7,7 @@
name="collected_messages",
documentation="Number of messages consumed",
namespace=prom_namespace,
labelnames=["type"],
labelnames=["type", "intake_key"],
)

# Declare common prometheus metrics
Expand All @@ -17,19 +17,19 @@
name="forwarded_events",
documentation="Number of events forwarded to Sekoia.io",
namespace=prom_namespace,
labelnames=["type"],
labelnames=["type", "intake_key"],
)

FORWARD_EVENTS_DURATION = Histogram(
name="forward_events_duration",
documentation="Duration to collect and forward events",
namespace=prom_namespace,
labelnames=["type"],
labelnames=["type", "intake_key"],
)

EVENTS_LAG = Gauge(
name="events_lags",
documentation="The delay, in seconds, from the date of the last event",
namespace=prom_namespace,
labelnames=["type"],
labelnames=["type", "intake_key"],
)
Loading
Loading