diff --git a/README.md b/README.md index 75c004c..9e6143a 100644 --- a/README.md +++ b/README.md @@ -6,6 +6,11 @@ This extension will capture and retain a comprehensive record of all changes wit Read the [documentation](https://datashades.github.io/ckanext-event-audit/) for a full user guide. +TODO: +- [ ] support batch operations for writing +- [ ] add remove_event method +- [ ] add remove_events method + ## Developer installation To install ckanext-event-audit for development, activate your CKAN virtualenv and diff --git a/ckanext/event_audit/config.py b/ckanext/event_audit/config.py index 0cbc522..068445c 100644 --- a/ckanext/event_audit/config.py +++ b/ckanext/event_audit/config.py @@ -1,8 +1,22 @@ import ckan.plugins.toolkit as tk +from ckanext.event_audit import types + CONF_ACTIVE_REPO = "ckanext.event_audit.active_repo" +CONF_CLOUDWATCH_KEY = "ckanext.event_audit.cloudwatch.access_key" +CONF_CLOUDWATCH_SECRET = "ckanext.event_audit.cloudwatch.secret_key" +CONF_CLOUDWATCH_REGION = "ckanext.event_audit.cloudwatch.region" + def active_repo() -> str: """The active repository to store the audit logs.""" return tk.config[CONF_ACTIVE_REPO] + + +def get_cloudwatch_credentials() -> types.AWSCredentials: + return types.AWSCredentials( + aws_access_key_id=tk.config[CONF_CLOUDWATCH_KEY], + aws_secret_access_key=tk.config[CONF_CLOUDWATCH_SECRET], + region_name=tk.config[CONF_CLOUDWATCH_REGION], + ) diff --git a/ckanext/event_audit/config_declaration.yaml b/ckanext/event_audit/config_declaration.yaml index c4568b2..7c1cb58 100644 --- a/ckanext/event_audit/config_declaration.yaml +++ b/ckanext/event_audit/config_declaration.yaml @@ -7,3 +7,18 @@ groups: default: redis validators: audit_active_repo_validator editable: true + + - key: ckanext.event_audit.cloudwatch.access_key + description: The access key for the AWS account + default: '' + editable: true + + - key: ckanext.event_audit.cloudwatch.secret_key + description: The secret key for the AWS account + default: '' + editable: true + + - key: ckanext.event_audit.cloudwatch.region + description: The region for the AWS account + default: '' + editable: true diff --git a/ckanext/event_audit/repositories/__init__.py b/ckanext/event_audit/repositories/__init__.py index f34c8cc..70526ec 100644 --- a/ckanext/event_audit/repositories/__init__.py +++ b/ckanext/event_audit/repositories/__init__.py @@ -1,5 +1,11 @@ from .base import AbstractRepository +from .cloudwatch import CloudWatchRepository from .postgres import PostgresRepository from .redis import RedisRepository -__all__ = ["RedisRepository", "AbstractRepository", "PostgresRepository"] +__all__ = [ + "RedisRepository", + "AbstractRepository", + "PostgresRepository", + "CloudWatchRepository", +] diff --git a/ckanext/event_audit/repositories/base.py b/ckanext/event_audit/repositories/base.py index d5dff3f..9fa99b2 100644 --- a/ckanext/event_audit/repositories/base.py +++ b/ckanext/event_audit/repositories/base.py @@ -7,15 +7,13 @@ class AbstractRepository(ABC): - name = "abstract" - @classmethod @abstractmethod def get_name(cls) -> str: """Return the name of the repository.""" @abstractmethod - def write_event(self, event: types.Event) -> types.WriteStatus: + def write_event(self, event: types.Event) -> types.Result: """Write an event to the repository. This method accepts an Event object and writes it to the repository. diff --git a/ckanext/event_audit/repositories/cloudwatch.py b/ckanext/event_audit/repositories/cloudwatch.py new file mode 100644 index 0000000..c3da7d4 --- /dev/null +++ b/ckanext/event_audit/repositories/cloudwatch.py @@ -0,0 +1,189 @@ +from __future__ import annotations + +import json +from datetime import datetime, timezone +from typing import Optional, TypedDict + +import boto3 + +from ckanext.event_audit import config, types +from ckanext.event_audit.repositories import AbstractRepository + + +class ResponseMetadata(TypedDict): + RequestId: str + HTTPStatusCode: int + HTTPHeaders: dict[str, str] + RetryAttempts: int + + +class FilterEventResponse(TypedDict): + events: list[types.EventData] + searchedLogStreams: list[str] + ResponseMetadata: ResponseMetadata + + +class CloudWatchEvent(TypedDict): + timestamp: int + message: str + + +class CloudWatchRepository(AbstractRepository): + def __init__(self, credentials: types.AWSCredentials | None = None): + if not credentials: + credentials = config.get_cloudwatch_credentials() + + self.session = boto3.Session( + aws_access_key_id=credentials.aws_access_key_id, + aws_secret_access_key=credentials.aws_secret_access_key, + region_name=credentials.region_name, + ) + + self.client = self.session.client("logs") + + self.log_group = "event_audit" + + # Ensure the log group exists + self._create_log_group_if_not_exists() + + @classmethod + def get_name(cls) -> str: + return "cloudwatch" + + def _create_log_group_if_not_exists(self): + """Creates the log group if it doesn't already exist.""" + try: + self.client.create_log_group(logGroupName=self.log_group) + except self.client.exceptions.ResourceAlreadyExistsException: + pass + + def _get_log_stream_name(self, event_id: str) -> str: + """Generates a unique stream name based on the event ID.""" + return f"event-stream-{event_id}" + + def write_event(self, event: types.Event) -> types.Result: + """Writes an event to CloudWatch Logs.""" + log_stream = self._get_log_stream_name(event.id) + + # Create the log stream if it doesn't exist + self._create_log_stream_if_not_exists(log_stream) + + # Prepare the event as JSON + event_data = json.dumps(event.model_dump()) + + # Write the event + try: + self.client.put_log_events( + logGroupName=self.log_group, + # logStreamName=log_stream, + logEvents=[ + { + "timestamp": int(datetime.now(timezone.utc).timestamp() * 1000), + "message": event_data, + } + ], + ) + return types.Result(status=True) + except Exception as e: + return types.Result(status=False, message=str(e)) + + def _create_log_stream_if_not_exists(self, log_stream: str): + """Creates the log stream if it doesn't already exist.""" + try: + self.client.create_log_stream( + logGroupName=self.log_group, + logStreamName=log_stream, + ) + except self.client.exceptions.ResourceAlreadyExistsException: + pass + + def get_event(self, event_id: str) -> Optional[types.Event]: + """Retrieves a single event by its ID.""" + try: + response = self.client.get_log_events( + logGroupName=self.log_group, + # logStreamName=self._get_log_stream_name(event_id), + ) + if response["events"]: + event_data = json.loads(response["events"][0]["message"]) + return types.Event.model_validate(event_data) + except self.client.exceptions.ResourceNotFoundException: + return None + + return None + + def filter_events( + self, + filters: types.Filters, + ) -> list[types.Event]: + """Filter events from CloudWatch logs based on the given filters.""" + kwargs = { + "logGroupName": self.log_group, + "startTime": ( + int(filters.time_from.timestamp() * 1000) if filters.time_from else None + ), + "endTime": ( + int(filters.time_to.timestamp() * 1000) if filters.time_to else None + ), + "filterPattern": self._build_filter_pattern(filters), + } + + return [ + self._parse_event(e) + for e in self._get_all_matching_events( + {k: v for k, v in kwargs.items() if v is not None} + ) + ] + + def _build_filter_pattern(self, filters: types.Filters) -> Optional[str]: + """Builds the CloudWatch filter pattern for querying logs.""" + conditions = [] + + for field in [ + ("category", filters.category), + ("action", filters.action), + ("actor", filters.actor), + ("action_object", filters.action_object), + ("action_object_id", filters.action_object_id), + ("target_type", filters.target_type), + ("target_id", filters.target_id), + ]: + if field[1]: + conditions.append(f'$.{field[0]} = "{field[1]}"') + + if conditions: + return " && ".join(conditions) + + return None + + def _get_all_matching_events(self, kwargs: dict) -> list[CloudWatchEvent]: + """Retrieve all matching events from CloudWatch using pagination.""" + events = [] + + paginator = self.client.get_paginator("filter_log_events") + + for page in paginator.paginate(**kwargs): + events.extend(page.get("events", [])) + + return events + + def _parse_event(self, event: CloudWatchEvent) -> types.Event: + """Parse a CloudWatch event into the Event model. + + CloudWatch events store the event message as a JSON string + """ + return types.Event.model_validate(json.loads(event["message"])) + + +# {'events': [], +# 'searchedLogStreams': [], +# 'ResponseMetadata': {'RequestId': 'a8e80271-a375-4f5d-a74a-faf668a9140d', +# 'HTTPStatusCode': 200, +# 'HTTPHeaders': {'x-amzn-requestid': 'a8e80271-a375-4f5d-a74a-faf668a9140d', +# 'content-type': 'application/x-amz-json-1.1', +# 'content-length': '121', +# 'date': 'Fri, 25 Oct 2024 15:11:26 GMT'}, +# 'RetryAttempts': 0}} + + +# repo.client.filter_log_events(logGroupName="event_audit", filterPattern='{ $.category = "test" }') diff --git a/ckanext/event_audit/repositories/postgres.py b/ckanext/event_audit/repositories/postgres.py index a9a7b2d..0db98eb 100644 --- a/ckanext/event_audit/repositories/postgres.py +++ b/ckanext/event_audit/repositories/postgres.py @@ -7,9 +7,10 @@ from ckan.model import Session from ckanext.event_audit import model, types +from ckanext.event_audit.repositories.base import AbstractRepository -class PostgresRepository: +class PostgresRepository(AbstractRepository): def __init__(self): self.session = Session @@ -17,11 +18,11 @@ def __init__(self): def get_name(cls) -> str: return "postgres" - def write_event(self, event: types.Event) -> types.WriteStatus: + def write_event(self, event: types.Event) -> types.Result: db_event = model.EventModel(**event.model_dump()) db_event.save() - return types.WriteStatus(status=True) + return types.Result(status=True) def get_event(self, event_id: str) -> types.Event | None: result = self.session.execute( diff --git a/ckanext/event_audit/repositories/redis.py b/ckanext/event_audit/repositories/redis.py index 1865dcc..7ce5b15 100644 --- a/ckanext/event_audit/repositories/redis.py +++ b/ckanext/event_audit/repositories/redis.py @@ -13,8 +13,6 @@ class RedisRepository(AbstractRepository): - name = "redis" - @classmethod def get_name(cls) -> str: return "redis" @@ -22,12 +20,12 @@ def get_name(cls) -> str: def __init__(self) -> None: self.conn = connect_to_redis() - def write_event(self, event: types.Event) -> types.WriteStatus: + def write_event(self, event: types.Event) -> types.Result: key = self._build_event_key(event) self.conn.hset(REDIS_SET_KEY, key, event.model_dump_json()) - return types.WriteStatus(status=True) + return types.Result(status=True) def _build_event_key(self, event: types.Event) -> str: """Builds the key for the event in Redis. diff --git a/ckanext/event_audit/tests/test_interface.py b/ckanext/event_audit/tests/test_interface.py index 01dac3b..0be961b 100644 --- a/ckanext/event_audit/tests/test_interface.py +++ b/ckanext/event_audit/tests/test_interface.py @@ -14,8 +14,8 @@ class MyRepository(AbstractRepository): def get_name(cls) -> str: return "my_repository" - def write_event(self, event: types.Event) -> types.WriteStatus: - return types.WriteStatus(status=True) + def write_event(self, event: types.Event) -> types.Result: + return types.Result(status=True) def get_event(self, event_id: str) -> types.Event | None: return None diff --git a/ckanext/event_audit/types.py b/ckanext/event_audit/types.py index 89575b1..249c3f4 100644 --- a/ckanext/event_audit/types.py +++ b/ckanext/event_audit/types.py @@ -12,11 +12,18 @@ @dataclass -class WriteStatus: +class Result: status: bool message: Optional[str] = None +@dataclass +class AWSCredentials: + aws_access_key_id: str + aws_secret_access_key: str + region_name: str + + class EventData(TypedDict): id: Any category: str diff --git a/docs/index.md b/docs/index.md index bf8779e..eb49f1f 100644 --- a/docs/index.md +++ b/docs/index.md @@ -39,7 +39,7 @@ class MyRepository(AbstractRepository): def get_name(cls) -> str: return "my_repository" - def write_event(self, event: types.Event) -> types.WriteStatus: + def write_event(self, event: types.Event) -> types.Result: pass def get_event(self, event_id: str) -> types.Event | None: