Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
a-leonardi committed Oct 4, 2023
1 parent a6af0ad commit 76f29db
Show file tree
Hide file tree
Showing 10 changed files with 186 additions and 35 deletions.
26 changes: 18 additions & 8 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ Jinja2 = "^3.0.3"
black = "*" # To format files in cli tools
prometheus-client = "^0.16.0"
aiohttp = { version = "^3.8.4", optional = true }
aiolimiter = { version = "^1.1.0", optional = true }
aiobotocore = { version = "^2.5.2", optional = true }
aiofiles = { version = "^23.1.0", optional = true }
aiocsv = { version = "^1.2.4", optional = true }
loguru = { version = "^0.7.0", optional = true }
aiolimiter = "^1.1.0"
aiobotocore = "^2.6.0"
aiofiles = "^23.2.1"
aiocsv = "^1.2.4"
loguru = "^0.7.2"

[tool.poetry.group.dev.dependencies]
unittest-xml-reporting = "^3"
Expand Down
6 changes: 3 additions & 3 deletions sekoia_automation/aio/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
class AsyncConnector(Connector, ABC):
"""Async version of Connector."""

configuration: DefaultConnectorConfiguration
default_configuration: DefaultConnectorConfiguration

_event_loop: AbstractEventLoop

Expand Down Expand Up @@ -96,7 +96,7 @@ async def push_data_to_intakes(
list[str]:
"""
self._last_events_time = datetime.utcnow()
batch_api = urljoin(self.configuration.intake_server, "/batch")
batch_api = urljoin(self.default_configuration.intake_server, "/batch")

self.log(f"Push {len(events)} events to intakes")

Expand All @@ -114,7 +114,7 @@ async def push_data_to_intakes(
)

request_body = {
"intake_key": self.configuration.intake_key,
"intake_key": self.default_configuration.intake_key,
"jsons": chunk,
}

Expand Down
59 changes: 54 additions & 5 deletions sekoia_automation/connector/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,24 @@

import orjson
import requests
import sentry_sdk
from pydantic import BaseModel
from requests import Response
from tenacity import Retrying, stop_after_delay, wait_exponential
from tenacity import (
Retrying,
stop_after_delay,
wait_exponential,
)

from sekoia_automation.constants import CHUNK_BYTES_MAX_SIZE, EVENT_BYTES_MAX_SIZE
from sekoia_automation.exceptions import (
TriggerConfigurationError,
)
from sekoia_automation.trigger import Trigger
from sekoia_automation.utils import (
get_annotation_for,
get_as_model,
)

# Connector are a kind of trigger that fetch events from remote sources.
# We should add the content of push_events_to_intakes
Expand All @@ -29,10 +41,44 @@ class DefaultConnectorConfiguration(BaseModel):


class Connector(Trigger, ABC):
configuration: DefaultConnectorConfiguration
CONNECTOR_CONFIGURATION_FILE_NAME = "connector-configuration"
default_configuration: DefaultConnectorConfiguration

seconds_without_events = 3600 * 6

@property
def configuration(self) -> dict | BaseModel | None:
if self._configuration is None:
try:
self.configuration = self.module.load_config(
self.CONNECTOR_CONFIGURATION_FILE_NAME, "json"
)
except FileNotFoundError:
return super().configuration
return self._configuration

@configuration.setter
def configuration(self, configuration: dict) -> None:
"""
Set the connector configuration.
Args:
configuration: dict
"""
try:
self._configuration = get_as_model(
get_annotation_for(self.__class__, "configuration"), configuration
)
except Exception as e:
raise TriggerConfigurationError(str(e))

if isinstance(self._configuration, BaseModel):
sentry_sdk.set_context(
"connector_configuration", self._configuration.dict()
)
elif self._configuration:
sentry_sdk.set_context("connector_configuration", self._configuration)

def __init__(self, *args, **kwargs):
executor_max_worker = kwargs.pop("executor_max_worker", 4)
super().__init__(*args, **kwargs)
Expand All @@ -54,7 +100,7 @@ def _retry(self):

@cached_property
def _connector_user_agent(self) -> str:
return f"sekoiaio-connector-{self.configuration.intake_key}"
return f"sekoiaio-connector-{self.default_configuration.intake_key}"

def _send_chunk(
self,
Expand All @@ -64,7 +110,10 @@ def _send_chunk(
collect_ids: dict[int, list[str]],
):
try:
request_body = {"intake_key": self.configuration.intake_key, "jsons": chunk}
request_body = {
"intake_key": self.default_configuration.intake_key,
"jsons": chunk,
}

for attempt in self._retry():
with attempt:
Expand Down Expand Up @@ -101,7 +150,7 @@ def push_events_to_intakes(
# Reset the consecutive error count
self._error_count = 0
self._last_events_time = datetime.utcnow()
intake_host = os.getenv("INTAKE_URL", self.configuration.intake_server)
intake_host = os.getenv("INTAKE_URL", self.default_configuration.intake_server)
batch_api = urljoin(intake_host, "/batch")

# Dict to collect event_ids for the API
Expand Down
44 changes: 41 additions & 3 deletions sekoia_automation/module.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import sys
import time
from abc import ABC, abstractmethod
from functools import cached_property
from pathlib import Path
from typing import Any, cast

Expand Down Expand Up @@ -32,6 +33,7 @@ class Module:
PLAYBOOK_RUN_UUID_FILE_NAME = "playbook_run_uuid"
NODE_RUN_UUID_FILE_NAME = "node_run_uuid"
TRIGGER_CONFIGURATION_UUID_FILE_NAME = "trigger_configuration_uuid"
CONNECTOR_CONFIGURATION_UUID_FILE_NAME = "connector_configuration_uuid"

SENTRY_FILE_NAME = "sentry_dsn"
ENVIRONMENT_FILE_NAME = "environment"
Expand All @@ -46,6 +48,7 @@ def __init__(self):
self._playbook_run_uuid: str | None = None
self._node_run_uuid: str | None = None
self._trigger_configuration_uuid: str | None = None
self._connector_configuration_uuid: str | None = None
self._name = None
self.init_sentry()

Expand Down Expand Up @@ -247,7 +250,17 @@ def trigger_configuration_uuid(self) -> str | None:

return self._trigger_configuration_uuid

def load_config(self, file_name: str, type_: str = "str", non_exist_ok=False):
@property
def connector_configuration_uuid(self) -> str | None:
if self._connector_configuration_uuid is None:
self._connector_configuration_uuid = self.load_config(
self.CONNECTOR_CONFIGURATION_UUID_FILE_NAME, non_exist_ok=True
)

return self._connector_configuration_uuid

@staticmethod
def load_config(file_name: str, type_: str = "str", non_exist_ok=False):
return load_config(file_name, type_, non_exist_ok=non_exist_ok)

def register(self, item: type["ModuleItem"], name: str = ""):
Expand Down Expand Up @@ -287,6 +300,10 @@ def init_sentry(self):
sentry_sdk.set_tag(
"trigger_configuration_uuid", self.trigger_configuration_uuid
)
if self.connector_configuration_uuid:
sentry_sdk.set_tag(
"connector_configuration_uuid", self.connector_configuration_uuid
)

def _load_sentry_dsn(self) -> str | None:
try:
Expand All @@ -304,6 +321,8 @@ def _load_environment(self) -> str | None:
class ModuleItem(ABC):
TOKEN_FILE_NAME = "token"
CALLBACK_URL_FILE_NAME = "url_callback"
SECRETS_URL_FILE_NAME = "url_secrets"
LOGS_URL_FILE_NAME = "url_logs"

name: str | None = None
description: str | None = None
Expand All @@ -316,6 +335,8 @@ def __init__(self, module: Module | None = None, data_path: Path | None = None):

self._token: str | None = None
self._callback_url: str | None = None
self._secrets_url: str | None = None
self._logs_url: str | None = None

# Name may be set by the action/trigger class or the module during the register
# Worse case we use the class name
Expand Down Expand Up @@ -384,13 +405,30 @@ def log_exception(self, exception: Exception, **kwargs):
scope.set_extra(key, value)
sentry_sdk.capture_exception(exception)

@property
@cached_property
def callback_url(self) -> str:
if self._callback_url is None:
self._callback_url = self.module.load_config(self.CALLBACK_URL_FILE_NAME)

return self._callback_url

@cached_property
def logs_url(self) -> str:
if self._logs_url is None:
try:
self._logs_url = self.module.load_config(self.LOGS_URL_FILE_NAME)
except FileNotFoundError:
self._logs_url = self.callback_url.replace("/callback", "/logs")
return self._logs_url

@cached_property
def secrets_url(self) -> str:
if self._secrets_url is None:
try:
self._secrets_url = self.module.load_config(self.SECRETS_URL_FILE_NAME)
except FileNotFoundError:
self._secrets_url = self.callback_url.replace("/callback", "/secrets")
return self._secrets_url

@property
def _headers(self) -> dict[str, str]:
return {"Authorization": f"Bearer {self.token}"}
Expand Down
29 changes: 29 additions & 0 deletions sekoia_automation/scripts/files_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from rich import print

from sekoia_automation.action import Action
from sekoia_automation.connector import Connector
from sekoia_automation.module import Module
from sekoia_automation.trigger import Trigger
from sekoia_automation.utils import get_annotation_for
Expand Down Expand Up @@ -70,6 +71,7 @@ def execute(self):
self.generate_main(module, actions, triggers)
self.generate_action_manifests(actions)
self.generate_trigger_manifests(triggers)
self.generate_connector_manifests(triggers)
self.update_module_manifest(module)

sys.path = _old_path
Expand Down Expand Up @@ -164,6 +166,33 @@ def generate_trigger_manifests(self, triggers: set[type[Trigger]]):

print(f"[green][+][/green] Generated {filepath}")

def generate_connector_manifests(self, connectors: set[type[Connector]]):
for connector in connectors:
name = connector.name or connector.__name__
filepath = (
self.base_path / f"connector_{name.lower().replace(' ', '_')}.json"
)

manifest: dict[str, str | dict | None] = {
"name": name,
"description": connector.description,
"uuid": str(uuid5(self.module_uuid, name)),
"docker_parameters": connector.__name__,
"arguments": {},
"results": {},
}

if connector.results_model:
manifest["results"] = connector.results_model.schema()

if configuration_model := get_annotation_for(connector, "configuration"):
manifest["arguments"] = configuration_model.schema()

with filepath.open("w") as out:
out.write(json.dumps(manifest, indent=2))

print(f"[green][+][/green] Generated {filepath}")

def update_module_manifest(self, module: type[Module]):
configuration_model = get_annotation_for(module, "configuration")

Expand Down
Loading

0 comments on commit 76f29db

Please sign in to comment.