Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
a-leonardi committed Oct 4, 2023
1 parent 0208be6 commit b15ec9a
Show file tree
Hide file tree
Showing 9 changed files with 430 additions and 298 deletions.
541 changes: 266 additions & 275 deletions poetry.lock

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions sekoia_automation/aio/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
class AsyncConnector(Connector, ABC):
"""Async version of Connector."""

configuration: DefaultConnectorConfiguration
default_configuration: DefaultConnectorConfiguration

_event_loop: AbstractEventLoop

Expand Down Expand Up @@ -96,7 +96,7 @@ async def push_data_to_intakes(
list[str]:
"""
self._last_events_time = datetime.utcnow()
batch_api = urljoin(self.configuration.intake_server, "/batch")
batch_api = urljoin(self.default_configuration.intake_server, "/batch")

self.log(f"Push {len(events)} events to intakes")

Expand All @@ -114,7 +114,7 @@ async def push_data_to_intakes(
)

request_body = {
"intake_key": self.configuration.intake_key,
"intake_key": self.default_configuration.intake_key,
"jsons": chunk,
}

Expand Down
59 changes: 54 additions & 5 deletions sekoia_automation/connector/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,24 @@

import orjson
import requests
import sentry_sdk
from pydantic import BaseModel
from requests import Response
from tenacity import Retrying, stop_after_delay, wait_exponential
from tenacity import (
Retrying,
stop_after_delay,
wait_exponential,
)

from sekoia_automation.constants import CHUNK_BYTES_MAX_SIZE, EVENT_BYTES_MAX_SIZE
from sekoia_automation.exceptions import (
TriggerConfigurationError,
)
from sekoia_automation.trigger import Trigger
from sekoia_automation.utils import (
get_annotation_for,
get_as_model,
)

# Connector are a kind of trigger that fetch events from remote sources.
# We should add the content of push_events_to_intakes
Expand All @@ -29,10 +41,44 @@ class DefaultConnectorConfiguration(BaseModel):


class Connector(Trigger, ABC):
configuration: DefaultConnectorConfiguration
CONNECTOR_CONFIGURATION_FILE_NAME = "connector-configuration"
default_configuration: DefaultConnectorConfiguration

seconds_without_events = 3600 * 6

@property
def configuration(self) -> dict | BaseModel | None:
if self._configuration is None:
try:
self.configuration = self.module.load_config(
self.CONNECTOR_CONFIGURATION_FILE_NAME, "json"
)
except FileNotFoundError:
return super().configuration
return self._configuration

@configuration.setter
def configuration(self, configuration: dict) -> None:
"""
Set the connector configuration.
Args:
configuration: dict
"""
try:
self._configuration = get_as_model(
get_annotation_for(self.__class__, "configuration"), configuration
)
except Exception as e:
raise TriggerConfigurationError(str(e))

if isinstance(self._configuration, BaseModel):
sentry_sdk.set_context(
"connector_configuration", self._configuration.dict()
)
elif self._configuration:
sentry_sdk.set_context("connector_configuration", self._configuration)

def __init__(self, *args, **kwargs):
executor_max_worker = kwargs.pop("executor_max_worker", 4)
super().__init__(*args, **kwargs)
Expand All @@ -54,7 +100,7 @@ def _retry(self):

@cached_property
def _connector_user_agent(self) -> str:
return f"sekoiaio-connector-{self.configuration.intake_key}"
return f"sekoiaio-connector-{self.default_configuration.intake_key}"

def _send_chunk(
self,
Expand All @@ -64,7 +110,10 @@ def _send_chunk(
collect_ids: dict[int, list[str]],
):
try:
request_body = {"intake_key": self.configuration.intake_key, "jsons": chunk}
request_body = {
"intake_key": self.default_configuration.intake_key,
"jsons": chunk,
}

for attempt in self._retry():
with attempt:
Expand Down Expand Up @@ -101,7 +150,7 @@ def push_events_to_intakes(
# Reset the consecutive error count
self._error_count = 0
self._last_events_time = datetime.utcnow()
intake_host = os.getenv("INTAKE_URL", self.configuration.intake_server)
intake_host = os.getenv("INTAKE_URL", self.default_configuration.intake_server)
batch_api = urljoin(intake_host, "/batch")

# Dict to collect event_ids for the API
Expand Down
44 changes: 41 additions & 3 deletions sekoia_automation/module.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import sys
import time
from abc import ABC, abstractmethod
from functools import cached_property
from pathlib import Path
from typing import Any, cast

Expand Down Expand Up @@ -32,6 +33,7 @@ class Module:
PLAYBOOK_RUN_UUID_FILE_NAME = "playbook_run_uuid"
NODE_RUN_UUID_FILE_NAME = "node_run_uuid"
TRIGGER_CONFIGURATION_UUID_FILE_NAME = "trigger_configuration_uuid"
CONNECTOR_CONFIGURATION_UUID_FILE_NAME = "connector_configuration_uuid"

SENTRY_FILE_NAME = "sentry_dsn"
ENVIRONMENT_FILE_NAME = "environment"
Expand All @@ -46,6 +48,7 @@ def __init__(self):
self._playbook_run_uuid: str | None = None
self._node_run_uuid: str | None = None
self._trigger_configuration_uuid: str | None = None
self._connector_configuration_uuid: str | None = None
self._name = None
self.init_sentry()

Expand Down Expand Up @@ -247,7 +250,17 @@ def trigger_configuration_uuid(self) -> str | None:

return self._trigger_configuration_uuid

def load_config(self, file_name: str, type_: str = "str", non_exist_ok=False):
@property
def connector_configuration_uuid(self) -> str | None:
if self._connector_configuration_uuid is None:
self._connector_configuration_uuid = self.load_config(
self.CONNECTOR_CONFIGURATION_UUID_FILE_NAME, non_exist_ok=True
)

return self._connector_configuration_uuid

@staticmethod
def load_config(file_name: str, type_: str = "str", non_exist_ok=False):
return load_config(file_name, type_, non_exist_ok=non_exist_ok)

def register(self, item: type["ModuleItem"], name: str = ""):
Expand Down Expand Up @@ -287,6 +300,10 @@ def init_sentry(self):
sentry_sdk.set_tag(
"trigger_configuration_uuid", self.trigger_configuration_uuid
)
if self.connector_configuration_uuid:
sentry_sdk.set_tag(
"connector_configuration_uuid", self.connector_configuration_uuid
)

def _load_sentry_dsn(self) -> str | None:
try:
Expand All @@ -304,6 +321,8 @@ def _load_environment(self) -> str | None:
class ModuleItem(ABC):
TOKEN_FILE_NAME = "token"
CALLBACK_URL_FILE_NAME = "url_callback"
SECRETS_URL_FILE_NAME = "url_secrets"
LOGS_URL_FILE_NAME = "url_logs"

name: str | None = None
description: str | None = None
Expand All @@ -316,6 +335,8 @@ def __init__(self, module: Module | None = None, data_path: Path | None = None):

self._token: str | None = None
self._callback_url: str | None = None
self._secrets_url: str | None = None
self._logs_url: str | None = None

# Name may be set by the action/trigger class or the module during the register
# Worse case we use the class name
Expand Down Expand Up @@ -384,13 +405,30 @@ def log_exception(self, exception: Exception, **kwargs):
scope.set_extra(key, value)
sentry_sdk.capture_exception(exception)

@property
@cached_property
def callback_url(self) -> str:
if self._callback_url is None:
self._callback_url = self.module.load_config(self.CALLBACK_URL_FILE_NAME)

return self._callback_url

@cached_property
def logs_url(self) -> str:
if self._logs_url is None:
try:
self._logs_url = self.module.load_config(self.LOGS_URL_FILE_NAME)
except FileNotFoundError:
self._logs_url = self.callback_url.replace("/callback", "/logs")
return self._logs_url

@cached_property
def secrets_url(self) -> str:
if self._secrets_url is None:
try:
self._secrets_url = self.module.load_config(self.SECRETS_URL_FILE_NAME)
except FileNotFoundError:
self._secrets_url = self.callback_url.replace("/callback", "/secrets")
return self._secrets_url

@property
def _headers(self) -> dict[str, str]:
return {"Authorization": f"Bearer {self.token}"}
Expand Down
29 changes: 29 additions & 0 deletions sekoia_automation/scripts/files_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from rich import print

from sekoia_automation.action import Action
from sekoia_automation.connector import Connector
from sekoia_automation.module import Module
from sekoia_automation.trigger import Trigger
from sekoia_automation.utils import get_annotation_for
Expand Down Expand Up @@ -70,6 +71,7 @@ def execute(self):
self.generate_main(module, actions, triggers)
self.generate_action_manifests(actions)
self.generate_trigger_manifests(triggers)
self.generate_connector_manifests(triggers)
self.update_module_manifest(module)

sys.path = _old_path
Expand Down Expand Up @@ -164,6 +166,33 @@ def generate_trigger_manifests(self, triggers: set[type[Trigger]]):

print(f"[green][+][/green] Generated {filepath}")

def generate_connector_manifests(self, connectors: set[type[Connector]]):
for connector in connectors:
name = connector.name or connector.__name__
filepath = (
self.base_path / f"connector_{name.lower().replace(' ', '_')}.json"
)

manifest: dict[str, str | dict | None] = {
"name": name,
"description": connector.description,
"uuid": str(uuid5(self.module_uuid, name)),
"docker_parameters": connector.__name__,
"arguments": {},
"results": {},
}

if connector.results_model:
manifest["results"] = connector.results_model.schema()

if configuration_model := get_annotation_for(connector, "configuration"):
manifest["arguments"] = configuration_model.schema()

with filepath.open("w") as out:
out.write(json.dumps(manifest, indent=2))

print(f"[green][+][/green] Generated {filepath}")

def update_module_manifest(self, module: type[Module]):
configuration_model = get_annotation_for(module, "configuration")

Expand Down
30 changes: 30 additions & 0 deletions sekoia_automation/scripts/sync_library.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,27 @@ def load_triggers(self, module_path: Path) -> list:

return triggers

def load_connectors(self, module_path: Path) -> list:
"""Load JSON files representing the connectors linked to a module
Args:
module_path (Path): Path of the parent module
Returns:
list: List of connectors related to the parent module
"""
connectors = []

for filename in module_path.iterdir():
if filename.name.endswith(".json") and filename.name.startswith(
"connector_"
):
connector_path = module_path / filename
with connector_path.open() as fd:
connectors.append(json.load(fd))

return connectors

def set_docker(self, manifests: list, module: dict) -> list:
"""Loops over the Docker name of objets linked to a module and adds the Docker
version if missing
Expand Down Expand Up @@ -334,6 +355,7 @@ def load_module(self, module_path: Path):
raise typer.Exit(code=1)

triggers = self.set_docker(self.load_triggers(module_path), module_info)
connectors = self.set_docker(self.load_connectors(module_path), module_info)
actions = self.set_docker(self.load_actions(module_path), module_info)

module_uuid: str = module_info["uuid"]
Expand All @@ -360,6 +382,14 @@ def load_module(self, module_path: Path):
name="action",
)
print()
if connectors:
self.sync_list(
module_name=module_name,
module_uuid=module_uuid,
list_objects=connectors,
name="connector",
)
print()

def load(self, library_path: Path):
"""Lods all modules that can be found in a given library
Expand Down
9 changes: 2 additions & 7 deletions sekoia_automation/trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from collections.abc import Generator
from contextlib import contextmanager
from datetime import datetime, timedelta
from functools import cached_property
from http.server import BaseHTTPRequestHandler, HTTPServer
from pathlib import Path
from threading import Event, Thread
Expand Down Expand Up @@ -90,7 +89,7 @@ def _get_secrets_from_server(self) -> dict[str, Any]:
if self.module.has_secrets():
try:
response = requests.get(
self.callback_url.replace("/callback", "/secrets"),
self.secrets_url,
headers=self._headers,
timeout=30,
)
Expand Down Expand Up @@ -287,10 +286,6 @@ def send_event(
remove_directory,
)

@cached_property
def _log_url(self):
return self.callback_url.replace("/callback", "/logs")

# Try to send the log record to the API
# If it can't be done, give up after 10 attempts and capture the logging error

Expand Down Expand Up @@ -332,7 +327,7 @@ def _send_logs_to_api(self):
return
data = {"logs": self._logs}
response = requests.request(
"POST", self._log_url, json=data, headers=self._headers, timeout=30
"POST", self.logs_url, json=data, headers=self._headers, timeout=30
)
response.raise_for_status()
self._logs = []
Expand Down
4 changes: 2 additions & 2 deletions tests/connectors/test_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def test_connector(storage, mocked_trigger_logs):
test_connector.send_event = Mock()

test_connector.trigger_activation = "2022-03-14T11:16:14.236930Z"
test_connector.configuration = {"intake_key": ""}
test_connector.default_configuration = {"intake_key": ""}

test_connector.log = Mock()
test_connector.log_exception = Mock()
Expand Down Expand Up @@ -181,7 +181,7 @@ def test_push_events_to_intakes_api_failed_retried(test_connector, mocked_trigge

def test_push_events_to_intake_invalid_intake_key(test_connector):
with pytest.raises(TriggerConfigurationError):
test_connector.configuration = {"intake_key": None}
test_connector.default_configuration = {"intake_key": None}


def test_query_exception_api(test_connector, requests_mock):
Expand Down
Loading

0 comments on commit b15ec9a

Please sign in to comment.