Skip to content

Commit

Permalink
feature: implement cloudwatch repo, wip 2
Browse files Browse the repository at this point in the history
  • Loading branch information
mutantsan committed Oct 28, 2024
1 parent 9cf9ccd commit 43991c2
Show file tree
Hide file tree
Showing 8 changed files with 233 additions and 82 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ TODO:
- [ ] support batch operations for writing
- [ ] add remove_event method
- [ ] add remove_events method
- [ ] add async support for writing

## Developer installation

Expand Down
4 changes: 2 additions & 2 deletions ckanext/event_audit/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ class EventModel(tk.BaseModel):
Column("target_type", String, index=True),
Column("target_id", String, index=True),
Column("timestamp", TIMESTAMP(timezone=True), nullable=False, index=True),
Column("result", MutableDict.as_mutable(JSONB), default="{}"), # type: ignore
Column("payload", MutableDict.as_mutable(JSONB), default="{}"), # type: ignore
Column("result", MutableDict.as_mutable(JSONB), default="{}"),
Column("payload", MutableDict.as_mutable(JSONB), default="{}"),
Index("ix_event_actor_action", "actor", "action"),
)

Expand Down
143 changes: 64 additions & 79 deletions ckanext/event_audit/repositories/cloudwatch.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,23 @@
from __future__ import annotations

import json
from contextlib import suppress
from datetime import datetime, timezone
from typing import Optional, TypedDict
from typing import TYPE_CHECKING, Any, Optional, TypedDict

import boto3

from ckanext.event_audit import config, types
from ckanext.event_audit.repositories import AbstractRepository


class ResponseMetadata(TypedDict):
RequestId: str
HTTPStatusCode: int
HTTPHeaders: dict[str, str]
RetryAttempts: int
if TYPE_CHECKING:
from mypy_boto3_logs.client import CloudWatchLogsClient
from mypy_boto3_logs.type_defs import (
FilteredLogEventTypeDef,
)
else:
CloudWatchLogsClient = object


class FilterEventResponse(TypedDict):
events: list[types.EventData]
searchedLogStreams: list[str]
ResponseMetadata: ResponseMetadata
from ckanext.event_audit import config, types
from ckanext.event_audit.repositories import AbstractRepository


class CloudWatchEvent(TypedDict):
Expand All @@ -29,7 +26,12 @@ class CloudWatchEvent(TypedDict):


class CloudWatchRepository(AbstractRepository):
def __init__(self, credentials: types.AWSCredentials | None = None):
def __init__(
self,
credentials: types.AWSCredentials | None = None,
log_group: str = "ckan/event-audit",
log_stream: str = "event-audit-stream",
):
if not credentials:
credentials = config.get_cloudwatch_credentials()

Expand All @@ -39,9 +41,10 @@ def __init__(self, credentials: types.AWSCredentials | None = None):
region_name=credentials.region_name,
)

self.client = self.session.client("logs")
self.client: CloudWatchLogsClient = self.session.client("logs")

self.log_group = "event_audit"
self.log_group = log_group
self.log_stream = log_stream

# Ensure the log group exists
self._create_log_group_if_not_exists()
Expand All @@ -52,72 +55,65 @@ def get_name(cls) -> str:

def _create_log_group_if_not_exists(self):
"""Creates the log group if it doesn't already exist."""
try:
with suppress(self.client.exceptions.ClientError):
self.client.create_log_group(logGroupName=self.log_group)
except self.client.exceptions.ResourceAlreadyExistsException:
pass

def _get_log_stream_name(self, event_id: str) -> str:
"""Generates a unique stream name based on the event ID."""
return f"event-stream-{event_id}"

def write_event(self, event: types.Event) -> types.Result:
"""Writes an event to CloudWatch Logs."""
log_stream = self._get_log_stream_name(event.id)

# Create the log stream if it doesn't exist
self._create_log_stream_if_not_exists(log_stream)

# Prepare the event as JSON
event_data = json.dumps(event.model_dump())

# Write the event
try:
self.client.put_log_events(
logGroupName=self.log_group,
# logStreamName=log_stream,
logStreamName=self._create_log_stream_if_not_exists(self.log_stream),
logEvents=[
{
"timestamp": int(datetime.now(timezone.utc).timestamp() * 1000),
"message": event_data,
}
],
)

return types.Result(status=True)
except Exception as e:
except (
self.client.exceptions.InvalidParameterException,
self.client.exceptions.InvalidSequenceTokenException,
self.client.exceptions.DataAlreadyAcceptedException,
self.client.exceptions.ResourceNotFoundException,
self.client.exceptions.ServiceUnavailableException,
self.client.exceptions.UnrecognizedClientException,
) as e:
return types.Result(status=False, message=str(e))

def _create_log_stream_if_not_exists(self, log_stream: str):
def _create_log_stream_if_not_exists(self, log_stream: str) -> str:
"""Creates the log stream if it doesn't already exist."""
try:
with suppress(self.client.exceptions.ResourceAlreadyExistsException):
self.client.create_log_stream(
logGroupName=self.log_group,
logStreamName=log_stream,
)
except self.client.exceptions.ResourceAlreadyExistsException:
pass

return log_stream

def get_event(self, event_id: str) -> Optional[types.Event]:
"""Retrieves a single event by its ID."""
try:
response = self.client.get_log_events(
logGroupName=self.log_group,
# logStreamName=self._get_log_stream_name(event_id),
)
if response["events"]:
event_data = json.loads(response["events"][0]["message"])
return types.Event.model_validate(event_data)
except self.client.exceptions.ResourceNotFoundException:

result = self.filter_events(types.Filters(id=event_id))

if not result:
return None

return None
if len(result) > 1:
raise ValueError(f"Multiple events found with ID: {event_id}")

return result[0]

def filter_events(
self,
filters: types.Filters,
) -> list[types.Event]:
"""Filter events from CloudWatch logs based on the given filters."""
kwargs = {
kwargs: dict[str, str | int | datetime | None] = {
"logGroupName": self.log_group,
"startTime": (
int(filters.time_from.timestamp() * 1000) if filters.time_from else None
Expand All @@ -136,29 +132,32 @@ def filter_events(
]

def _build_filter_pattern(self, filters: types.Filters) -> Optional[str]:
"""Builds the CloudWatch filter pattern for querying logs."""
conditions = []

for field in [
("category", filters.category),
("action", filters.action),
("actor", filters.actor),
("action_object", filters.action_object),
("action_object_id", filters.action_object_id),
("target_type", filters.target_type),
("target_id", filters.target_id),
]:
if field[1]:
conditions.append(f'$.{field[0]} = "{field[1]}"')
"""Builds the CloudWatch filter pattern for querying logs.
TODO: the filter pattern is not yet implemented properly !!!
"""
conditions = [
f'$.{field} = "{value}"'
for field, value in [
("category", filters.category),
("action", filters.action),
("actor", filters.actor),
("action_object", filters.action_object),
("action_object_id", filters.action_object_id),
("target_type", filters.target_type),
("target_id", filters.target_id),
]
]

if conditions:
return " && ".join(conditions)

return None

def _get_all_matching_events(self, kwargs: dict) -> list[CloudWatchEvent]:
def _get_all_matching_events(
self, kwargs: dict[str, Any]
) -> list[FilteredLogEventTypeDef]:
"""Retrieve all matching events from CloudWatch using pagination."""
events = []
events: list[FilteredLogEventTypeDef] = []

paginator = self.client.get_paginator("filter_log_events")

Expand All @@ -167,23 +166,9 @@ def _get_all_matching_events(self, kwargs: dict) -> list[CloudWatchEvent]:

return events

def _parse_event(self, event: CloudWatchEvent) -> types.Event:
def _parse_event(self, event: FilteredLogEventTypeDef) -> types.Event:
"""Parse a CloudWatch event into the Event model.
CloudWatch events store the event message as a JSON string
"""
return types.Event.model_validate(json.loads(event["message"]))


# {'events': [],
# 'searchedLogStreams': [],
# 'ResponseMetadata': {'RequestId': 'a8e80271-a375-4f5d-a74a-faf668a9140d',
# 'HTTPStatusCode': 200,
# 'HTTPHeaders': {'x-amzn-requestid': 'a8e80271-a375-4f5d-a74a-faf668a9140d',
# 'content-type': 'application/x-amz-json-1.1',
# 'content-length': '121',
# 'date': 'Fri, 25 Oct 2024 15:11:26 GMT'},
# 'RetryAttempts': 0}}


# repo.client.filter_log_events(logGroupName="event_audit", filterPattern='{ $.category = "test" }')
4 changes: 3 additions & 1 deletion ckanext/event_audit/repositories/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@ def filter_events(self, filters: types.Filters | Any) -> list[types.Event]:
break

cursor, result = self.conn.hscan(
REDIS_SET_KEY, cursor=cursor, match=pattern, # type: ignore
REDIS_SET_KEY,
cursor=cursor,
match=pattern, # type: ignore
)

matching_events.extend(
Expand Down
Loading

0 comments on commit 43991c2

Please sign in to comment.