From 29162eba835ba2bf5679bbbf42a2a0298f561de4 Mon Sep 17 00:00:00 2001 From: mutantsan Date: Tue, 29 Oct 2024 14:25:00 +0200 Subject: [PATCH] feature: implement cloudwatch repo, finish --- ckanext/event_audit/cli.py | 133 ++++++++++++++++++ ckanext/event_audit/plugin.py | 1 + .../event_audit/repositories/cloudwatch.py | 29 ++-- .../tests/repositories/test_cloudwatch.py | 12 +- ckanext/event_audit/tests/test_types.py | 16 +-- ckanext/event_audit/types.py | 2 +- pyproject.toml | 5 +- 7 files changed, 163 insertions(+), 35 deletions(-) create mode 100644 ckanext/event_audit/cli.py diff --git a/ckanext/event_audit/cli.py b/ckanext/event_audit/cli.py new file mode 100644 index 0000000..b891b55 --- /dev/null +++ b/ckanext/event_audit/cli.py @@ -0,0 +1,133 @@ +from __future__ import annotations + +from typing import Any + +import click + +from ckanext.event_audit import types +from ckanext.event_audit.repositories.cloudwatch import CloudWatchRepository + +__all__ = [ + "event_audit", +] + +DATE_ISO_FORMAT = "%Y-%m-%dT%H:%M:%S" + + +@click.group() +def event_audit(): + pass + + +@event_audit.command() +@click.argument("category") +@click.argument("action") +@click.argument("actor", required=False) +@click.argument("action_object", required=False) +@click.argument("action_object_id", required=False) +@click.argument("target_type", required=False) +@click.argument("target_id", required=False) +@click.argument("payload", required=False) +def cw_write_event( + category: str, + action: str, + actor: str | None = None, + action_object: str | None = None, + action_object_id: str | None = None, + target_type: str | None = None, + target_id: str | None = None, + payload: str | None = None, +): + """Write an event to CloudWatch Logs.""" + repo = CloudWatchRepository() + + event_data = { + "category": category, + "action": action, + "actor": actor, + "action_object": action_object, + "action_object_id": action_object_id, + "target_type": target_type, + "target_id": target_id, + "payload": payload, + } + + event = repo.build_event( + types.EventData(**{k: v for k, v in event_data.items() if v is not None}) + ) + + repo.write_event(event) + + click.secho(f"Event written to CloudWatch Logs: {event.id}", fg="green") + + +@event_audit.command() +@click.argument("event_id") +def cw_get_event(event_id: str): + """Get an event from CloudWatch Logs.""" + repo = CloudWatchRepository() + + event = repo.get_event(event_id) + + if event: + return click.secho(event.model_dump_json(indent=4), fg="green") + + click.secho(f"Event not found: {event_id}", fg="red") + + +@event_audit.command() +@click.option("--category", default=None) +@click.option("--action", default=None) +@click.option("--actor", default=None) +@click.option("--action-object", default=None) +@click.option("--action-object-id", default=None) +@click.option("--target-type", default=None) +@click.option("--target-id", default=None) +@click.option( + "--time-from", default=None, type=click.DateTime(formats=[DATE_ISO_FORMAT]) +) +@click.option("--time-to", default=None, type=click.DateTime(formats=[DATE_ISO_FORMAT])) +def cw_filter_events( + category: str | None = None, + action: str | None = None, + actor: str | None = None, + action_object: str | None = None, + action_object_id: str | None = None, + target_type: str | None = None, + target_id: str | None = None, + time_from: str | None = None, + time_to: str | None = None, +): + """Filter events from CloudWatch logs based on the given filters.""" + repo = CloudWatchRepository() + + filter_data: dict[str, Any] = { + "category": category, + "action": action, + "actor": actor, + "action_object": action_object, + "action_object_id": action_object_id, + "target_type": target_type, + "target_id": target_id, + "time_from": time_from, + "time_to": time_to, + } + + for event in repo.filter_events( + types.Filters(**{k: v for k, v in filter_data.items() if v}) + ): + click.echo(event) + + +@event_audit.command() +@click.argument("log_group", required=False) +def cw_remove_log_group(log_group: str | None = None): + """Remove the specified log group or the default log group if not specified.""" + repo = CloudWatchRepository() + + try: + repo.client.delete_log_group(logGroupName=log_group or repo.log_group) + except repo.client.exceptions.ResourceNotFoundException as err: + return click.secho(str(err), fg="red") + + click.secho(f"Log group removed: {log_group or repo.log_group}", fg="green") diff --git a/ckanext/event_audit/plugin.py b/ckanext/event_audit/plugin.py index 452c50a..91c167f 100644 --- a/ckanext/event_audit/plugin.py +++ b/ckanext/event_audit/plugin.py @@ -6,5 +6,6 @@ @tk.blanket.config_declarations @tk.blanket.validators +@tk.blanket.cli class EventAuditPlugin(p.SingletonPlugin): pass diff --git a/ckanext/event_audit/repositories/cloudwatch.py b/ckanext/event_audit/repositories/cloudwatch.py index 3e4191d..37bb450 100644 --- a/ckanext/event_audit/repositories/cloudwatch.py +++ b/ckanext/event_audit/repositories/cloudwatch.py @@ -9,9 +9,7 @@ if TYPE_CHECKING: from mypy_boto3_logs.client import CloudWatchLogsClient - from mypy_boto3_logs.type_defs import ( - FilteredLogEventTypeDef, - ) + from mypy_boto3_logs.type_defs import FilteredLogEventTypeDef else: CloudWatchLogsClient = object @@ -29,7 +27,7 @@ class CloudWatchRepository(AbstractRepository): def __init__( self, credentials: types.AWSCredentials | None = None, - log_group: str = "ckan/event-audit", + log_group: str = "/ckan/event-audit", log_stream: str = "event-audit-stream", ): if not credentials: @@ -46,7 +44,6 @@ def __init__( self.log_group = log_group self.log_stream = log_stream - # Ensure the log group exists self._create_log_group_if_not_exists() @classmethod @@ -97,7 +94,6 @@ def _create_log_stream_if_not_exists(self, log_stream: str) -> str: def get_event(self, event_id: str) -> Optional[types.Event]: """Retrieves a single event by its ID.""" - result = self.filter_events(types.Filters(id=event_id)) if not result: @@ -125,19 +121,18 @@ def filter_events( } return [ - self._parse_event(e) + types.Event.model_validate(json.loads(e["message"])) for e in self._get_all_matching_events( {k: v for k, v in kwargs.items() if v is not None} - ) + ) if "message" in e ] def _build_filter_pattern(self, filters: types.Filters) -> Optional[str]: - """Builds the CloudWatch filter pattern for querying logs. - TODO: the filter pattern is not yet implemented properly !!! - """ + """Builds the CloudWatch filter pattern for querying logs.""" conditions = [ - f'$.{field} = "{value}"' + f'($.{field} = "{value}")' for field, value in [ + ("id", filters.id), ("category", filters.category), ("action", filters.action), ("actor", filters.actor), @@ -146,10 +141,11 @@ def _build_filter_pattern(self, filters: types.Filters) -> Optional[str]: ("target_type", filters.target_type), ("target_id", filters.target_id), ] + if value ] if conditions: - return " && ".join(conditions) + return f'{{ {" && ".join(conditions)} }}' return None @@ -165,10 +161,3 @@ def _get_all_matching_events( events.extend(page.get("events", [])) return events - - def _parse_event(self, event: FilteredLogEventTypeDef) -> types.Event: - """Parse a CloudWatch event into the Event model. - - CloudWatch events store the event message as a JSON string - """ - return types.Event.model_validate(json.loads(event["message"])) diff --git a/ckanext/event_audit/tests/repositories/test_cloudwatch.py b/ckanext/event_audit/tests/repositories/test_cloudwatch.py index d22ee9b..57355f2 100644 --- a/ckanext/event_audit/tests/repositories/test_cloudwatch.py +++ b/ckanext/event_audit/tests/repositories/test_cloudwatch.py @@ -1,9 +1,9 @@ from __future__ import annotations -from typing import Any, Generator, Callable from datetime import datetime as dt from datetime import timedelta as td from datetime import timezone as tz +from typing import Any import pytest from botocore.stub import Stubber @@ -11,7 +11,6 @@ from ckanext.event_audit import types from ckanext.event_audit.repositories.cloudwatch import CloudWatchRepository - put_log_events_response: dict[str, Any] = { "nextSequenceToken": "49654796026243824240318171692305216662718669063406487010", "ResponseMetadata": { @@ -29,16 +28,21 @@ @pytest.fixture() -def cloudwatch_repo() -> Generator[tuple[CloudWatchRepository, Stubber]]: +def cloudwatch_repo() -> tuple[CloudWatchRepository, Stubber]: """Fixture to initialize the CloudWatchRepository with a stubbed client.""" repo = CloudWatchRepository() stubber = Stubber(repo.client) - yield repo, stubber + return repo, stubber class TestCloudWatchRepository: + """Tests for the CloudWatchRepository. + + It's really hard to test the repository, without mocking the AWS client. + And with mocking we still have doubts of the correctness of the implementation. + """ def test_write_event( self, cloudwatch_repo: tuple[CloudWatchRepository, Stubber], event: types.Event ): diff --git a/ckanext/event_audit/tests/test_types.py b/ckanext/event_audit/tests/test_types.py index 6496f0e..a1b7f6a 100644 --- a/ckanext/event_audit/tests/test_types.py +++ b/ckanext/event_audit/tests/test_types.py @@ -110,10 +110,10 @@ def test_custom_id_generation(self): def test_invalid_field_assignment(self): """Test that assigning invalid data types to fields raises an error.""" with pytest.raises(ValidationError): - types.Event(category="model", action="created", result="not-a-dict") + types.Event(category="model", action="created", result="not-a-dict") # type: ignore with pytest.raises(ValidationError): - types.Event(category="model", action="created", payload="not-a-dict") + types.Event(category="model", action="created", payload="not-a-dict") # type: ignore class TestFilters: @@ -131,8 +131,8 @@ def test_valid_filters(self, user): action_object_id="123", target_type="organization", target_id="456", - time_from=datetime.now() - timedelta(days=1), - time_to=datetime.now(), + time_from=datetime.now(timezone.utc) - timedelta(days=1), + time_to=datetime.now(timezone.utc), ) assert filters.category == "api" assert filters.action == "created" @@ -156,19 +156,19 @@ def test_time_range_validation(self): ValueError, match="`time_from` must be earlier than `time_to`." ): types.Filters( - time_from=datetime.now(), - time_to=datetime.now() - timedelta(days=1), + time_from=datetime.now(timezone.utc), + time_to=datetime.now(timezone.utc) - timedelta(days=1), ) def test_invalid_time_from_type(self): """Test that invalid datetime fields raise a validation error.""" with pytest.raises(ValueError, match="Input should be a valid datetime"): - types.Filters(time_from="xxx") + types.Filters(time_from="xxx") # type: ignore def test_invalid_actor_type(self): """Test that passing incorrect field types raises an error.""" with pytest.raises(ValueError, match="Input should be a valid string"): - types.Filters(actor=123) # Actor must be a string + types.Filters(actor=123) # type: ignore def test_actor_doesnt_exist(self): """Test that an invalid actor reference raises a ValidationError.""" diff --git a/ckanext/event_audit/types.py b/ckanext/event_audit/types.py index b4157c2..d4011dc 100644 --- a/ckanext/event_audit/types.py +++ b/ckanext/event_audit/types.py @@ -24,7 +24,7 @@ class AWSCredentials: region_name: str -class EventData(TypedDict): +class EventData(TypedDict, total=False): id: Any category: str action: str diff --git a/pyproject.toml b/pyproject.toml index b47cd6a..7c48335 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -59,7 +59,7 @@ select = [ "BLE", # do not catch blind exception "C4", # better list/set/dict comprehensions "C90", # check McCabe complexity - # "DTZ", # enforce timezone in date objects + "DTZ", # enforce timezone in date objects "E", # pycodestyle error "W", # pycodestyle warning "F", # pyflakes @@ -99,6 +99,8 @@ ignore = [ [tool.ruff.lint.per-file-ignores] "ckanext/event_audit/tests*" = ["S", "PL", "ANN"] +"ckanext/event_audit/config.py" = ["S105",] # false positive for Possible hardcoded secret key +"ckanext/event_audit/cli.py" = ["PLR0913",] # skip max argument number check [tool.ruff.lint.flake8-import-conventions.aliases] "ckan.plugins" = "p" @@ -121,7 +123,6 @@ section-order = [ ] [tool.ruff.lint.isort.sections] -# Group all Django imports into a separate section. ckan = ["ckan"] ckanext = ["ckanext"] self = ["ckanext.event_audit"]