Skip to content

Commit

Permalink
feature: implement cloudwatch repo, wip
Browse files Browse the repository at this point in the history
  • Loading branch information
mutantsan committed Oct 28, 2024
1 parent c576732 commit 9cf9ccd
Show file tree
Hide file tree
Showing 11 changed files with 248 additions and 15 deletions.
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ This extension will capture and retain a comprehensive record of all changes wit

Read the [documentation](https://datashades.github.io/ckanext-event-audit/) for a full user guide.

TODO:
- [ ] support batch operations for writing
- [ ] add remove_event method
- [ ] add remove_events method

## Developer installation

To install ckanext-event-audit for development, activate your CKAN virtualenv and
Expand Down
14 changes: 14 additions & 0 deletions ckanext/event_audit/config.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,22 @@
import ckan.plugins.toolkit as tk

from ckanext.event_audit import types

CONF_ACTIVE_REPO = "ckanext.event_audit.active_repo"

CONF_CLOUDWATCH_KEY = "ckanext.event_audit.cloudwatch.access_key"
CONF_CLOUDWATCH_SECRET = "ckanext.event_audit.cloudwatch.secret_key"
CONF_CLOUDWATCH_REGION = "ckanext.event_audit.cloudwatch.region"


def active_repo() -> str:
"""The active repository to store the audit logs."""
return tk.config[CONF_ACTIVE_REPO]


def get_cloudwatch_credentials() -> types.AWSCredentials:
return types.AWSCredentials(
aws_access_key_id=tk.config[CONF_CLOUDWATCH_KEY],
aws_secret_access_key=tk.config[CONF_CLOUDWATCH_SECRET],
region_name=tk.config[CONF_CLOUDWATCH_REGION],
)
15 changes: 15 additions & 0 deletions ckanext/event_audit/config_declaration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,18 @@ groups:
default: redis
validators: audit_active_repo_validator
editable: true

- key: ckanext.event_audit.cloudwatch.access_key
description: The access key for the AWS account
default: ''
editable: true

- key: ckanext.event_audit.cloudwatch.secret_key
description: The secret key for the AWS account
default: ''
editable: true

- key: ckanext.event_audit.cloudwatch.region
description: The region for the AWS account
default: ''
editable: true
8 changes: 7 additions & 1 deletion ckanext/event_audit/repositories/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
from .base import AbstractRepository
from .cloudwatch import CloudWatchRepository
from .postgres import PostgresRepository
from .redis import RedisRepository

__all__ = ["RedisRepository", "AbstractRepository", "PostgresRepository"]
__all__ = [
"RedisRepository",
"AbstractRepository",
"PostgresRepository",
"CloudWatchRepository",
]
4 changes: 1 addition & 3 deletions ckanext/event_audit/repositories/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,13 @@


class AbstractRepository(ABC):
name = "abstract"

@classmethod
@abstractmethod
def get_name(cls) -> str:
"""Return the name of the repository."""

@abstractmethod
def write_event(self, event: types.Event) -> types.WriteStatus:
def write_event(self, event: types.Event) -> types.Result:
"""Write an event to the repository.
This method accepts an Event object and writes it to the repository.
Expand Down
189 changes: 189 additions & 0 deletions ckanext/event_audit/repositories/cloudwatch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
from __future__ import annotations

import json
from datetime import datetime, timezone
from typing import Optional, TypedDict

import boto3

from ckanext.event_audit import config, types
from ckanext.event_audit.repositories import AbstractRepository


class ResponseMetadata(TypedDict):
RequestId: str
HTTPStatusCode: int
HTTPHeaders: dict[str, str]
RetryAttempts: int


class FilterEventResponse(TypedDict):
events: list[types.EventData]
searchedLogStreams: list[str]
ResponseMetadata: ResponseMetadata


class CloudWatchEvent(TypedDict):
timestamp: int
message: str


class CloudWatchRepository(AbstractRepository):
def __init__(self, credentials: types.AWSCredentials | None = None):
if not credentials:
credentials = config.get_cloudwatch_credentials()

self.session = boto3.Session(
aws_access_key_id=credentials.aws_access_key_id,
aws_secret_access_key=credentials.aws_secret_access_key,
region_name=credentials.region_name,
)

self.client = self.session.client("logs")

self.log_group = "event_audit"

# Ensure the log group exists
self._create_log_group_if_not_exists()

@classmethod
def get_name(cls) -> str:
return "cloudwatch"

def _create_log_group_if_not_exists(self):
"""Creates the log group if it doesn't already exist."""
try:
self.client.create_log_group(logGroupName=self.log_group)
except self.client.exceptions.ResourceAlreadyExistsException:
pass

def _get_log_stream_name(self, event_id: str) -> str:
"""Generates a unique stream name based on the event ID."""
return f"event-stream-{event_id}"

def write_event(self, event: types.Event) -> types.Result:
"""Writes an event to CloudWatch Logs."""
log_stream = self._get_log_stream_name(event.id)

# Create the log stream if it doesn't exist
self._create_log_stream_if_not_exists(log_stream)

# Prepare the event as JSON
event_data = json.dumps(event.model_dump())

# Write the event
try:
self.client.put_log_events(
logGroupName=self.log_group,
# logStreamName=log_stream,
logEvents=[
{
"timestamp": int(datetime.now(timezone.utc).timestamp() * 1000),
"message": event_data,
}
],
)
return types.Result(status=True)
except Exception as e:
return types.Result(status=False, message=str(e))

def _create_log_stream_if_not_exists(self, log_stream: str):
"""Creates the log stream if it doesn't already exist."""
try:
self.client.create_log_stream(
logGroupName=self.log_group,
logStreamName=log_stream,
)
except self.client.exceptions.ResourceAlreadyExistsException:
pass

def get_event(self, event_id: str) -> Optional[types.Event]:
"""Retrieves a single event by its ID."""
try:
response = self.client.get_log_events(
logGroupName=self.log_group,
# logStreamName=self._get_log_stream_name(event_id),
)
if response["events"]:
event_data = json.loads(response["events"][0]["message"])
return types.Event.model_validate(event_data)
except self.client.exceptions.ResourceNotFoundException:
return None

return None

def filter_events(
self,
filters: types.Filters,
) -> list[types.Event]:
"""Filter events from CloudWatch logs based on the given filters."""
kwargs = {
"logGroupName": self.log_group,
"startTime": (
int(filters.time_from.timestamp() * 1000) if filters.time_from else None
),
"endTime": (
int(filters.time_to.timestamp() * 1000) if filters.time_to else None
),
"filterPattern": self._build_filter_pattern(filters),
}

return [
self._parse_event(e)
for e in self._get_all_matching_events(
{k: v for k, v in kwargs.items() if v is not None}
)
]

def _build_filter_pattern(self, filters: types.Filters) -> Optional[str]:
"""Builds the CloudWatch filter pattern for querying logs."""
conditions = []

for field in [
("category", filters.category),
("action", filters.action),
("actor", filters.actor),
("action_object", filters.action_object),
("action_object_id", filters.action_object_id),
("target_type", filters.target_type),
("target_id", filters.target_id),
]:
if field[1]:
conditions.append(f'$.{field[0]} = "{field[1]}"')

if conditions:
return " && ".join(conditions)

return None

def _get_all_matching_events(self, kwargs: dict) -> list[CloudWatchEvent]:
"""Retrieve all matching events from CloudWatch using pagination."""
events = []

paginator = self.client.get_paginator("filter_log_events")

for page in paginator.paginate(**kwargs):
events.extend(page.get("events", []))

return events

def _parse_event(self, event: CloudWatchEvent) -> types.Event:
"""Parse a CloudWatch event into the Event model.
CloudWatch events store the event message as a JSON string
"""
return types.Event.model_validate(json.loads(event["message"]))


# {'events': [],
# 'searchedLogStreams': [],
# 'ResponseMetadata': {'RequestId': 'a8e80271-a375-4f5d-a74a-faf668a9140d',
# 'HTTPStatusCode': 200,
# 'HTTPHeaders': {'x-amzn-requestid': 'a8e80271-a375-4f5d-a74a-faf668a9140d',
# 'content-type': 'application/x-amz-json-1.1',
# 'content-length': '121',
# 'date': 'Fri, 25 Oct 2024 15:11:26 GMT'},
# 'RetryAttempts': 0}}


# repo.client.filter_log_events(logGroupName="event_audit", filterPattern='{ $.category = "test" }')
7 changes: 4 additions & 3 deletions ckanext/event_audit/repositories/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,22 @@
from ckan.model import Session

from ckanext.event_audit import model, types
from ckanext.event_audit.repositories.base import AbstractRepository


class PostgresRepository:
class PostgresRepository(AbstractRepository):
def __init__(self):
self.session = Session

@classmethod
def get_name(cls) -> str:
return "postgres"

def write_event(self, event: types.Event) -> types.WriteStatus:
def write_event(self, event: types.Event) -> types.Result:
db_event = model.EventModel(**event.model_dump())
db_event.save()

return types.WriteStatus(status=True)
return types.Result(status=True)

def get_event(self, event_id: str) -> types.Event | None:
result = self.session.execute(
Expand Down
6 changes: 2 additions & 4 deletions ckanext/event_audit/repositories/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,19 @@


class RedisRepository(AbstractRepository):
name = "redis"

@classmethod
def get_name(cls) -> str:
return "redis"

def __init__(self) -> None:
self.conn = connect_to_redis()

def write_event(self, event: types.Event) -> types.WriteStatus:
def write_event(self, event: types.Event) -> types.Result:
key = self._build_event_key(event)

self.conn.hset(REDIS_SET_KEY, key, event.model_dump_json())

return types.WriteStatus(status=True)
return types.Result(status=True)

def _build_event_key(self, event: types.Event) -> str:
"""Builds the key for the event in Redis.
Expand Down
4 changes: 2 additions & 2 deletions ckanext/event_audit/tests/test_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ class MyRepository(AbstractRepository):
def get_name(cls) -> str:
return "my_repository"

def write_event(self, event: types.Event) -> types.WriteStatus:
return types.WriteStatus(status=True)
def write_event(self, event: types.Event) -> types.Result:
return types.Result(status=True)

def get_event(self, event_id: str) -> types.Event | None:
return None
Expand Down
9 changes: 8 additions & 1 deletion ckanext/event_audit/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,18 @@


@dataclass
class WriteStatus:
class Result:
status: bool
message: Optional[str] = None


@dataclass
class AWSCredentials:
aws_access_key_id: str
aws_secret_access_key: str
region_name: str


class EventData(TypedDict):
id: Any
category: str
Expand Down
2 changes: 1 addition & 1 deletion docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class MyRepository(AbstractRepository):
def get_name(cls) -> str:
return "my_repository"

def write_event(self, event: types.Event) -> types.WriteStatus:
def write_event(self, event: types.Event) -> types.Result:
pass

def get_event(self, event_id: str) -> types.Event | None:
Expand Down

0 comments on commit 9cf9ccd

Please sign in to comment.