diff --git a/README.md b/README.md index 9e6143a..ef261db 100644 --- a/README.md +++ b/README.md @@ -10,6 +10,7 @@ TODO: - [ ] support batch operations for writing - [ ] add remove_event method - [ ] add remove_events method +- [ ] add async support for writing ## Developer installation diff --git a/ckanext/event_audit/model.py b/ckanext/event_audit/model.py index b5b86fb..9a81985 100644 --- a/ckanext/event_audit/model.py +++ b/ckanext/event_audit/model.py @@ -25,8 +25,8 @@ class EventModel(tk.BaseModel): Column("target_type", String, index=True), Column("target_id", String, index=True), Column("timestamp", TIMESTAMP(timezone=True), nullable=False, index=True), - Column("result", MutableDict.as_mutable(JSONB), default="{}"), # type: ignore - Column("payload", MutableDict.as_mutable(JSONB), default="{}"), # type: ignore + Column("result", MutableDict.as_mutable(JSONB), default="{}"), + Column("payload", MutableDict.as_mutable(JSONB), default="{}"), Index("ix_event_actor_action", "actor", "action"), ) diff --git a/ckanext/event_audit/repositories/cloudwatch.py b/ckanext/event_audit/repositories/cloudwatch.py index c3da7d4..3e4191d 100644 --- a/ckanext/event_audit/repositories/cloudwatch.py +++ b/ckanext/event_audit/repositories/cloudwatch.py @@ -1,26 +1,23 @@ from __future__ import annotations import json +from contextlib import suppress from datetime import datetime, timezone -from typing import Optional, TypedDict +from typing import TYPE_CHECKING, Any, Optional, TypedDict import boto3 -from ckanext.event_audit import config, types -from ckanext.event_audit.repositories import AbstractRepository - - -class ResponseMetadata(TypedDict): - RequestId: str - HTTPStatusCode: int - HTTPHeaders: dict[str, str] - RetryAttempts: int +if TYPE_CHECKING: + from mypy_boto3_logs.client import CloudWatchLogsClient + from mypy_boto3_logs.type_defs import ( + FilteredLogEventTypeDef, + ) +else: + CloudWatchLogsClient = object -class FilterEventResponse(TypedDict): - events: list[types.EventData] - searchedLogStreams: list[str] - ResponseMetadata: ResponseMetadata +from ckanext.event_audit import config, types +from ckanext.event_audit.repositories import AbstractRepository class CloudWatchEvent(TypedDict): @@ -29,7 +26,12 @@ class CloudWatchEvent(TypedDict): class CloudWatchRepository(AbstractRepository): - def __init__(self, credentials: types.AWSCredentials | None = None): + def __init__( + self, + credentials: types.AWSCredentials | None = None, + log_group: str = "ckan/event-audit", + log_stream: str = "event-audit-stream", + ): if not credentials: credentials = config.get_cloudwatch_credentials() @@ -39,9 +41,10 @@ def __init__(self, credentials: types.AWSCredentials | None = None): region_name=credentials.region_name, ) - self.client = self.session.client("logs") + self.client: CloudWatchLogsClient = self.session.client("logs") - self.log_group = "event_audit" + self.log_group = log_group + self.log_stream = log_stream # Ensure the log group exists self._create_log_group_if_not_exists() @@ -52,30 +55,17 @@ def get_name(cls) -> str: def _create_log_group_if_not_exists(self): """Creates the log group if it doesn't already exist.""" - try: + with suppress(self.client.exceptions.ClientError): self.client.create_log_group(logGroupName=self.log_group) - except self.client.exceptions.ResourceAlreadyExistsException: - pass - - def _get_log_stream_name(self, event_id: str) -> str: - """Generates a unique stream name based on the event ID.""" - return f"event-stream-{event_id}" def write_event(self, event: types.Event) -> types.Result: """Writes an event to CloudWatch Logs.""" - log_stream = self._get_log_stream_name(event.id) - - # Create the log stream if it doesn't exist - self._create_log_stream_if_not_exists(log_stream) - - # Prepare the event as JSON event_data = json.dumps(event.model_dump()) - # Write the event try: self.client.put_log_events( logGroupName=self.log_group, - # logStreamName=log_stream, + logStreamName=self._create_log_stream_if_not_exists(self.log_stream), logEvents=[ { "timestamp": int(datetime.now(timezone.utc).timestamp() * 1000), @@ -83,41 +73,47 @@ def write_event(self, event: types.Event) -> types.Result: } ], ) + return types.Result(status=True) - except Exception as e: + except ( + self.client.exceptions.InvalidParameterException, + self.client.exceptions.InvalidSequenceTokenException, + self.client.exceptions.DataAlreadyAcceptedException, + self.client.exceptions.ResourceNotFoundException, + self.client.exceptions.ServiceUnavailableException, + self.client.exceptions.UnrecognizedClientException, + ) as e: return types.Result(status=False, message=str(e)) - def _create_log_stream_if_not_exists(self, log_stream: str): + def _create_log_stream_if_not_exists(self, log_stream: str) -> str: """Creates the log stream if it doesn't already exist.""" - try: + with suppress(self.client.exceptions.ResourceAlreadyExistsException): self.client.create_log_stream( logGroupName=self.log_group, logStreamName=log_stream, ) - except self.client.exceptions.ResourceAlreadyExistsException: - pass + + return log_stream def get_event(self, event_id: str) -> Optional[types.Event]: """Retrieves a single event by its ID.""" - try: - response = self.client.get_log_events( - logGroupName=self.log_group, - # logStreamName=self._get_log_stream_name(event_id), - ) - if response["events"]: - event_data = json.loads(response["events"][0]["message"]) - return types.Event.model_validate(event_data) - except self.client.exceptions.ResourceNotFoundException: + + result = self.filter_events(types.Filters(id=event_id)) + + if not result: return None - return None + if len(result) > 1: + raise ValueError(f"Multiple events found with ID: {event_id}") + + return result[0] def filter_events( self, filters: types.Filters, ) -> list[types.Event]: """Filter events from CloudWatch logs based on the given filters.""" - kwargs = { + kwargs: dict[str, str | int | datetime | None] = { "logGroupName": self.log_group, "startTime": ( int(filters.time_from.timestamp() * 1000) if filters.time_from else None @@ -136,29 +132,32 @@ def filter_events( ] def _build_filter_pattern(self, filters: types.Filters) -> Optional[str]: - """Builds the CloudWatch filter pattern for querying logs.""" - conditions = [] - - for field in [ - ("category", filters.category), - ("action", filters.action), - ("actor", filters.actor), - ("action_object", filters.action_object), - ("action_object_id", filters.action_object_id), - ("target_type", filters.target_type), - ("target_id", filters.target_id), - ]: - if field[1]: - conditions.append(f'$.{field[0]} = "{field[1]}"') + """Builds the CloudWatch filter pattern for querying logs. + TODO: the filter pattern is not yet implemented properly !!! + """ + conditions = [ + f'$.{field} = "{value}"' + for field, value in [ + ("category", filters.category), + ("action", filters.action), + ("actor", filters.actor), + ("action_object", filters.action_object), + ("action_object_id", filters.action_object_id), + ("target_type", filters.target_type), + ("target_id", filters.target_id), + ] + ] if conditions: return " && ".join(conditions) return None - def _get_all_matching_events(self, kwargs: dict) -> list[CloudWatchEvent]: + def _get_all_matching_events( + self, kwargs: dict[str, Any] + ) -> list[FilteredLogEventTypeDef]: """Retrieve all matching events from CloudWatch using pagination.""" - events = [] + events: list[FilteredLogEventTypeDef] = [] paginator = self.client.get_paginator("filter_log_events") @@ -167,23 +166,9 @@ def _get_all_matching_events(self, kwargs: dict) -> list[CloudWatchEvent]: return events - def _parse_event(self, event: CloudWatchEvent) -> types.Event: + def _parse_event(self, event: FilteredLogEventTypeDef) -> types.Event: """Parse a CloudWatch event into the Event model. CloudWatch events store the event message as a JSON string """ return types.Event.model_validate(json.loads(event["message"])) - - -# {'events': [], -# 'searchedLogStreams': [], -# 'ResponseMetadata': {'RequestId': 'a8e80271-a375-4f5d-a74a-faf668a9140d', -# 'HTTPStatusCode': 200, -# 'HTTPHeaders': {'x-amzn-requestid': 'a8e80271-a375-4f5d-a74a-faf668a9140d', -# 'content-type': 'application/x-amz-json-1.1', -# 'content-length': '121', -# 'date': 'Fri, 25 Oct 2024 15:11:26 GMT'}, -# 'RetryAttempts': 0}} - - -# repo.client.filter_log_events(logGroupName="event_audit", filterPattern='{ $.category = "test" }') diff --git a/ckanext/event_audit/repositories/redis.py b/ckanext/event_audit/repositories/redis.py index 7ce5b15..397d922 100644 --- a/ckanext/event_audit/repositories/redis.py +++ b/ckanext/event_audit/repositories/redis.py @@ -66,7 +66,9 @@ def filter_events(self, filters: types.Filters | Any) -> list[types.Event]: break cursor, result = self.conn.hscan( - REDIS_SET_KEY, cursor=cursor, match=pattern, # type: ignore + REDIS_SET_KEY, + cursor=cursor, + match=pattern, # type: ignore ) matching_events.extend( diff --git a/ckanext/event_audit/tests/repositories/test_cloudwatch.py b/ckanext/event_audit/tests/repositories/test_cloudwatch.py new file mode 100644 index 0000000..d22ee9b --- /dev/null +++ b/ckanext/event_audit/tests/repositories/test_cloudwatch.py @@ -0,0 +1,156 @@ +from __future__ import annotations + +from typing import Any, Generator, Callable +from datetime import datetime as dt +from datetime import timedelta as td +from datetime import timezone as tz + +import pytest +from botocore.stub import Stubber + +from ckanext.event_audit import types +from ckanext.event_audit.repositories.cloudwatch import CloudWatchRepository + + +put_log_events_response: dict[str, Any] = { + "nextSequenceToken": "49654796026243824240318171692305216662718669063406487010", + "ResponseMetadata": { + "RequestId": "1111111-1111-1111-1111-111111111111", + "HTTPStatusCode": 200, + "HTTPHeaders": { + "x-amzn-requestid": "1111111-1111-1111-1111-111111111111", + "content-type": "application/x-amz-json-1.1", + "content-length": "80", + "date": "Mon, 28 Oct 2024 14:52:14 GMT", + }, + "RetryAttempts": 0, + }, +} + + +@pytest.fixture() +def cloudwatch_repo() -> Generator[tuple[CloudWatchRepository, Stubber]]: + """Fixture to initialize the CloudWatchRepository with a stubbed client.""" + repo = CloudWatchRepository() + + stubber = Stubber(repo.client) + + yield repo, stubber + + +class TestCloudWatchRepository: + def test_write_event( + self, cloudwatch_repo: tuple[CloudWatchRepository, Stubber], event: types.Event + ): + repo, stubber = cloudwatch_repo + + stubber.add_response("create_log_stream", {}) + stubber.add_response("put_log_events", put_log_events_response) + + with stubber: + result = repo.write_event(event) + + assert result.status + + def test_get_event( + self, cloudwatch_repo: tuple[CloudWatchRepository, Stubber], event: types.Event + ): + repo, stubber = cloudwatch_repo + + stubber.add_response( + "filter_log_events", + { + "events": [ + { + "timestamp": int(dt.now(tz.utc).timestamp() * 1000), + "message": event.model_dump_json(), + } + ], + "searchedLogStreams": [ + {"logStreamName": repo.log_stream, "searchedCompletely": True}, + ], + }, + ) + + with stubber: + loaded_event = repo.get_event(event.id) + assert isinstance(loaded_event, types.Event) + assert event.model_dump() == loaded_event.model_dump() + + def test_get_event_not_found( + self, cloudwatch_repo: tuple[CloudWatchRepository, Stubber] + ): + repo, stubber = cloudwatch_repo + stubber.add_response("filter_log_events", {"events": []}) + + with stubber: + assert repo.get_event("non-existent-id") is None + + def test_filter_events( + self, + cloudwatch_repo: tuple[CloudWatchRepository, Stubber], + event: types.Event, + ): + repo, stubber = cloudwatch_repo + + stubber.add_response( + "filter_log_events", + { + "events": [ + { + "timestamp": int(dt.now(tz.utc).timestamp() * 1000), + "message": event.model_dump_json(), + }, + ], + "searchedLogStreams": [ + {"logStreamName": repo.log_stream, "searchedCompletely": True}, + ], + }, + ) + + with stubber: + events = repo.filter_events(types.Filters(category="model")) + + assert len(events) == 1 + assert events[0].model_dump() == event.model_dump() + + def test_filter_events_no_match( + self, cloudwatch_repo: tuple[CloudWatchRepository, Stubber] + ): + repo, stubber = cloudwatch_repo + stubber.add_response("filter_log_events", {"events": []}) + + with stubber: + events = repo.filter_events(types.Filters(category="non-existent-category")) + + assert len(events) == 0 + + def test_filter_by_time_range( + self, cloudwatch_repo: tuple[CloudWatchRepository, Stubber], event: types.Event + ): + repo, stubber = cloudwatch_repo + + stubber.add_response( + "filter_log_events", + { + "events": [ + { + "timestamp": int( + (dt.now(tz.utc) - td(days=365)).timestamp() * 1000 + ), + "message": event.model_dump_json(), + } + ] + }, + ) + + with stubber: + events = repo.filter_events( + types.Filters( + time_from=dt.now(tz.utc) - td(days=366), + time_to=dt.now(tz.utc), + ) + ) + + assert len(events) == 1 + assert events[0].model_dump() == event.model_dump() diff --git a/ckanext/event_audit/types.py b/ckanext/event_audit/types.py index 249c3f4..b4157c2 100644 --- a/ckanext/event_audit/types.py +++ b/ckanext/event_audit/types.py @@ -122,6 +122,8 @@ class Filters(BaseModel): This model is used to filter events based on different criteria. """ + id: str = Field(default=None, description="Event ID") + category: Optional[str] = Field( default=None, description="Event category, e.g., 'api'" ) diff --git a/pyproject.toml b/pyproject.toml index 2780878..4c801d6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,6 +16,8 @@ classifiers = [ keywords = [ "CKAN" ] dependencies = [ "pydantic>=2.3.0,<2.4.0", + # "boto3-stubs[cloudwatch]>=1.35.0,<2.0.0" + "boto3-stubs[logs]>=1.35.0,<2.0.0" ] authors = [ {name = "DataShades", email = "datashades@linkdigital.com.au"}, diff --git a/test.ini b/test.ini index 89150a7..018d429 100644 --- a/test.ini +++ b/test.ini @@ -10,6 +10,9 @@ use = config:../ckan/test-core.ini # tests here. These will override the one defined in CKAN core's test-core.ini ckan.plugins = event_audit test_event_audit +ckanext.event_audit.cloudwatch.access_key = xxx +ckanext.event_audit.cloudwatch.secret_key = xxx +ckanext.event_audit.cloudwatch.region = ap-southeast-2 # Logging configuration [loggers]