diff --git a/backend/lcfs/db/migrations/versions/2024-11-15-21-05_1974af823b80.py b/backend/lcfs/db/migrations/versions/2024-11-15-21-05_1974af823b80.py
new file mode 100644
index 00000000..d9821dac
--- /dev/null
+++ b/backend/lcfs/db/migrations/versions/2024-11-15-21-05_1974af823b80.py
@@ -0,0 +1,101 @@
+"""Enhance audit_log: Rename id, add comments, enforce uniqueness, and create indexes.
+
+Revision ID: 1974af823b80
+Revises: b659816d0a86
+Create Date: 2024-11-15 21:05:06.629584
+
+"""
+
+import sqlalchemy as sa
+from alembic import op
+from sqlalchemy.dialects import postgresql
+
+# revision identifiers, used by Alembic.
+revision = "1974af823b80"
+down_revision = "b659816d0a86"
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+ # Step 1: Rename 'id' column to 'audit_log_id'
+ op.alter_column("audit_log", "id", new_column_name="audit_log_id")
+
+ # Step 2: Add comments to the table and columns
+ op.execute(
+ "COMMENT ON TABLE audit_log IS 'Audit log capturing changes to database tables.';"
+ )
+ op.execute(
+ "COMMENT ON COLUMN audit_log.audit_log_id IS 'Unique identifier for each audit log entry.';"
+ )
+ op.execute(
+ "COMMENT ON COLUMN audit_log.table_name IS 'Name of the table where the action occurred.';"
+ )
+ op.execute(
+ "COMMENT ON COLUMN audit_log.operation IS 'Type of operation: ''INSERT'', ''UPDATE'', or ''DELETE''.';"
+ )
+ op.execute(
+ "COMMENT ON COLUMN audit_log.row_id IS 'Primary key of the affected row, stored as JSONB to support composite keys.';"
+ )
+ op.execute(
+ "COMMENT ON COLUMN audit_log.old_values IS 'Previous values before the operation.';"
+ )
+ op.execute(
+ "COMMENT ON COLUMN audit_log.new_values IS 'New values after the operation.';"
+ )
+ op.execute("COMMENT ON COLUMN audit_log.delta IS 'JSONB delta of the changes.';")
+ op.execute(
+ "COMMENT ON COLUMN audit_log.create_date IS 'Timestamp when the audit log entry was created.';"
+ )
+ op.execute(
+ "COMMENT ON COLUMN audit_log.create_user IS 'User who created the audit log entry.';"
+ )
+ op.execute(
+ "COMMENT ON COLUMN audit_log.update_date IS 'Timestamp when the audit log entry was last updated.';"
+ )
+ op.execute(
+ "COMMENT ON COLUMN audit_log.update_user IS 'User who last updated the audit log entry.';"
+ )
+
+ # Step 3: Add unique constraint on 'audit_log_id'
+ op.create_unique_constraint(
+ "uq_audit_log_audit_log_id", "audit_log", ["audit_log_id"]
+ )
+
+ # Step 4: Create new indexes
+ op.create_index("idx_audit_log_operation", "audit_log", ["operation"])
+ op.create_index("idx_audit_log_create_date", "audit_log", ["create_date"])
+ op.create_index("idx_audit_log_create_user", "audit_log", ["create_user"])
+ op.create_index(
+ "idx_audit_log_delta", "audit_log", ["delta"], postgresql_using="gin"
+ )
+
+
+def downgrade():
+ # Reverse the above operations
+
+ # Step 4: Drop new indexes
+ op.drop_index("idx_audit_log_delta", table_name="audit_log")
+ op.drop_index("idx_audit_log_create_user", table_name="audit_log")
+ op.drop_index("idx_audit_log_create_date", table_name="audit_log")
+ op.drop_index("idx_audit_log_operation", table_name="audit_log")
+
+ # Step 3: Drop unique constraint on 'audit_log_id'
+ op.drop_constraint("uq_audit_log_audit_log_id", "audit_log", type_="unique")
+
+ # Step 2: Remove comments
+ op.execute("COMMENT ON COLUMN audit_log.update_user IS NULL;")
+ op.execute("COMMENT ON COLUMN audit_log.update_date IS NULL;")
+ op.execute("COMMENT ON COLUMN audit_log.create_user IS NULL;")
+ op.execute("COMMENT ON COLUMN audit_log.create_date IS NULL;")
+ op.execute("COMMENT ON COLUMN audit_log.delta IS NULL;")
+ op.execute("COMMENT ON COLUMN audit_log.new_values IS NULL;")
+ op.execute("COMMENT ON COLUMN audit_log.old_values IS NULL;")
+ op.execute("COMMENT ON COLUMN audit_log.row_id IS NULL;")
+ op.execute("COMMENT ON COLUMN audit_log.operation IS NULL;")
+ op.execute("COMMENT ON COLUMN audit_log.table_name IS NULL;")
+ op.execute("COMMENT ON COLUMN audit_log.audit_log_id IS NULL;")
+ op.execute("COMMENT ON TABLE audit_log IS NULL;")
+
+ # Step 1: Rename 'audit_log_id' column back to 'id'
+ op.alter_column("audit_log", "audit_log_id", new_column_name="id")
diff --git a/backend/lcfs/db/models/audit/AuditLog.py b/backend/lcfs/db/models/audit/AuditLog.py
index 7d6d1db3..e93b5c5d 100644
--- a/backend/lcfs/db/models/audit/AuditLog.py
+++ b/backend/lcfs/db/models/audit/AuditLog.py
@@ -1,6 +1,6 @@
from lcfs.db.base import Auditable, BaseModel
from sqlalchemy import (
- BigInteger,
+ Integer,
Column,
Text,
)
@@ -8,16 +8,57 @@
class AuditLog(BaseModel, Auditable):
- __tablename__ = "audit_log"
- __table_args__ = {"comment": "Track changes in defined tables."}
+ """
+ Audit log capturing changes to database tables.
+
+ As the table grows, consider implementing automatic archiving (e.g., moving older logs to an archive table)
+ and purging (e.g., deleting logs after a retention period) using tools like `pg_cron` or external schedulers.
- id = Column(BigInteger, primary_key=True, autoincrement=True)
+ Archiving:
+ - Create an `audit_log_archive` table with the same structure as `audit_log`.
+ - Use a scheduled job (e.g., with `pg_cron`) to move records older than a certain threshold (e.g., 1 month) from `audit_log` to `audit_log_archive`.
+ - Alternatively, consider creating date-based archive tables (e.g., audit_log_archive_2025_01) to organize logs by time periods.
- table_name = Column(Text, nullable=False)
- operation = Column(Text, nullable=False)
+ Purging:
+ - Use a scheduled job (e.g., with `pg_cron`) to delete records older than a defined retention period (e.g., 1 year) from `audit_log_archive`.
+ """
+
+ __tablename__ = "audit_log"
+ __table_args__ = {"comment": "Track changes in defined tables."}
- # JSONB fields for row ID, old values, new values, and delta
- row_id = Column(JSONB, nullable=False)
- old_values = Column(JSONB, nullable=True)
- new_values = Column(JSONB, nullable=True)
- delta = Column(JSONB, nullable=True)
+ audit_log_id = Column(
+ Integer,
+ primary_key=True,
+ autoincrement=True,
+ comment="Unique identifier for each audit log entry.",
+ )
+ table_name = Column(
+ Text,
+ nullable=False,
+ comment="Name of the table where the action occurred.",
+ )
+ operation = Column(
+ Text,
+ nullable=False,
+ comment="Type of operation: 'INSERT', 'UPDATE', or 'DELETE'.",
+ )
+ row_id = Column(
+ JSONB,
+ nullable=False,
+ comment="Primary key of the affected row, stored as JSONB to support composite keys.",
+ )
+ old_values = Column(
+ JSONB,
+ nullable=True,
+ comment="Previous values before the operation.",
+ )
+ new_values = Column(
+ JSONB,
+ nullable=True,
+ comment="New values after the operation.",
+ )
+ delta = Column(
+ JSONB,
+ nullable=True,
+ comment="JSONB delta of the changes.",
+ )
diff --git a/backend/lcfs/tests/audit_log/conftest.py b/backend/lcfs/tests/audit_log/conftest.py
deleted file mode 100644
index 434b1d72..00000000
--- a/backend/lcfs/tests/audit_log/conftest.py
+++ /dev/null
@@ -1,39 +0,0 @@
-# conftest.py
-from starlette.authentication import AuthenticationBackend, AuthCredentials, SimpleUser
-from starlette.middleware.authentication import AuthenticationMiddleware
-from starlette.authentication import AuthCredentials, AuthenticationBackend, BaseUser
-from fastapi import FastAPI
-import pytest
-
-class MockAuthBackend(AuthenticationBackend):
- def __init__(self):
- self.roles = []
-
- async def authenticate(self, request):
- if self.roles:
- # Use MockUser with roles instead of SimpleUser
- return AuthCredentials(["authenticated"]), MockUser("mock_user", self.roles)
- return None
-
-
-class MockUser(BaseUser):
- def __init__(self, username: str, roles: list):
- self.username = username
- self.roles = roles
-
- @property
- def is_authenticated(self) -> bool:
- return True
-
-@pytest.fixture
-def mock_user_role(fastapi_app: FastAPI):
- auth_backend = MockAuthBackend() # Create a single instance of MockAuthBackend
-
- # Add middleware only if it hasn't been added already
- if not any(isinstance(middleware, AuthenticationMiddleware) for middleware in fastapi_app.user_middleware):
- fastapi_app.add_middleware(AuthenticationMiddleware, backend=auth_backend)
-
- def set_roles(roles):
- auth_backend.roles = roles # Persist roles across requests
-
- yield set_roles # Yield the role-setting function for use in tests
diff --git a/backend/lcfs/tests/audit_log/test_audit_log.py b/backend/lcfs/tests/audit_log/test_audit_log.py
deleted file mode 100644
index 4330d0fa..00000000
--- a/backend/lcfs/tests/audit_log/test_audit_log.py
+++ /dev/null
@@ -1,150 +0,0 @@
-# import pytest
-# from fastapi import FastAPI
-# from httpx import AsyncClient
-# from starlette import status
-# from lcfs.conftest import set_mock_user
-# from lcfs.db.models.user.Role import RoleEnum
-# from lcfs.tests.test_organization import create_organization, update_organization
-
-
-# @pytest.mark.anyio
-# async def test_insert_audit_log(
-# client: AsyncClient, fastapi_app: FastAPI, mock_user_role
-# ) -> None:
-# mock_user_role([RoleEnum.GOVERNMENT])
-# payload = {
-# "name": "Test Organizationa",
-# "operatingName": "Test Operating name",
-# "email": "test@gov.bc.ca",
-# "phone": "0000000000",
-# "edrmsRecord": "EDRMS123",
-# "organizationStatusId": 2,
-# "organizationTypeId": 1,
-# "address": {
-# "name": "Test Operating name",
-# "streetAddress": "123 Test Street",
-# "addressOther": "",
-# "city": "Victoria",
-# "provinceState": "BC",
-# "country": "Canada",
-# "postalcodeZipcode": "V8W 2C3",
-# },
-# "attorneyAddress": {
-# "name": "Test Operating name",
-# "streetAddress": "123 Test Street",
-# "addressOther": "",
-# "city": "Victoria",
-# "provinceState": "BC",
-# "country": "Canada",
-# "postalcodeZipcode": "V8W 2C3",
-# },
-# }
-
-
-# response = await create_organization(client, fastapi_app, payload)
-# assert response.status_code == status.HTTP_201_CREATED
-
-# response_data = response.json()
-# organization_id = response_data["organizationId"]
-
-# # Fetch audit logs for the created organization
-# mock_user_role([RoleEnum.ADMINISTRATOR])
-# audit_url = fastapi_app.url_path_for("get_audit_log")
-# audit_response = await client.get(audit_url, params={"table_name": "organization", "operation": "INSERT"})
-# audit_log = audit_response.json()
-
-# # Assert that the audit log entry was created and contains correct data
-# assert audit_response.status_code == status.HTTP_200_OK
-# assert (
-# audit_log["row_id"] == organization_id and
-# audit_log["table_name"] == "organization" and
-# audit_log["operation"] == "INSERT"
-# ), "Expected INSERT operation in audit logs for the new organization creation."
-
-# if audit_log["row_id"] == organization_id and audit_log["operation"] == "INSERT":
-# assert audit_log["new_values"].get("name") == "Test Organizationa", "Audit log should contain the correct new 'name' value."
-# assert audit_log["new_values"].get("operating_name") == "Test Operating name", "Audit log should contain the correct 'operating' value."
-# assert audit_log["new_values"].get("email") == "test@gov.bc.ca", "Audit log should contain the correct 'email' value."
-# assert audit_log["new_values"].get("edrms_record") == "EDRMS123", "Audit log should contain the correct 'edrms_record' value."
-# assert audit_log["new_values"].get("organization_status_id") == 2, "Audit log should contain the correct 'organization_status_id' value."
-# assert audit_log["new_values"].get("organization_type_id") == 1, "Audit log should contain the correct 'organization_type_id' value."
-# else:
-# raise AssertionError("Expected INSERT operation in audit logs for the new organization creation.")
-
-# @pytest.mark.anyio
-# async def test_update_audit_log(
-# client: AsyncClient, fastapi_app: FastAPI, mock_user_role
-# ) -> None:
-# # Set mock user role for organization creation
-# mock_user_role([RoleEnum.GOVERNMENT])
-
-# payload = {
-# "name": "Test Organization",
-# "operatingName": "Test Operating name",
-# "email": "organization@gov.bc.ca",
-# "phone": "1111111111",
-# "edrmsRecord": "EDRMS123",
-# "organizationStatusId": 2,
-# "organizationTypeId": 1,
-# "address": {
-# "name": "Test Operating name",
-# "streetAddress": "123 Test Street",
-# "addressOther": "",
-# "city": "Victoria",
-# "provinceState": "BC",
-# "country": "Canada",
-# "postalcodeZipcode": "V8W 2C3",
-# },
-# "attorneyAddress": {
-# "name": "Test Operating name",
-# "streetAddress": "123 Test Street",
-# "addressOther": "",
-# "city": "Victoria",
-# "provinceState": "BC",
-# "country": "Canada",
-# "postalcodeZipcode": "V8W 2C3",
-# },
-# }
-
-# response = await update_organization(client, fastapi_app, 1, payload)
-# assert response.status_code == status.HTTP_200_OK
-# response_data = response.json()
-
-# # Set mock user role to ADMINISTRATOR for accessing audit logs
-# mock_user_role([RoleEnum.ADMINISTRATOR])
-
-# # Fetch audit logs for the updated organization
-# audit_url = fastapi_app.url_path_for("get_audit_log")
-# audit_response = await client.get(audit_url, params={"table_name": "organization", "operation": "UPDATE"})
-# audit_log = audit_response.json()
-
-# assert audit_response.status_code == status.HTTP_200_OK
-# assert (
-# audit_log["row_id"] == response_data["organizationId"] and
-# audit_log["table_name"] == "organization" and
-# audit_log["operation"] == "UPDATE"
-# ), "Expected UPDATE operation in audit logs for the updated organization."
-
-# # Assert audit log old and new values, and delta
-# if audit_log["row_id"] == response_data["organizationId"] and audit_log["operation"] == "UPDATE":
-# assert audit_log["old_values"].get("name") == "GreenLeaf Dynamics", "Audit log should contain the correct old 'name' value."
-# assert audit_log["new_values"].get("name") == "Test Organization", "Audit log should contain the correct new 'name' value."
-# assert audit_log["old_values"].get("operating_name") == "GreenLeaf Dynamics", "Audit log should contain the correct old 'operating' value."
-# assert audit_log["new_values"].get("operating_name") == "Test Operating name", "Audit log should contain the correct new 'operating' value."
-# assert audit_log["old_values"].get("phone") == None, "Audit log should contain the correct old 'phone' value."
-# assert audit_log["new_values"].get("phone") == "1111111111", "Audit log should contain the correct new 'phone' value."
-# assert audit_log["old_values"].get("email") == None, "Audit log should contain the correct old 'email' value."
-# assert audit_log["new_values"].get("email") == "organization@gov.bc.ca", "Audit log should contain the correct new 'email' value."
-# assert audit_log["old_values"].get("edrms_record") == None, "Audit log should contain the correct old 'edrms_record' value."
-# assert audit_log["new_values"].get("edrms_record") == "EDRMS123", "Audit log should contain the correct new 'edrms_record' value."
-# assert audit_log["old_values"].get("organization_status_id") == 2, "Audit log should contain the correct old 'organization_status_id' value."
-# assert audit_log["new_values"].get("organization_status_id") == 2, "Audit log should contain the correct new 'organization_status_id' value."
-# assert audit_log["old_values"].get("organization_type_id") == 1, "Audit log should contain the correct old 'organization_type_id' value."
-# assert audit_log["new_values"].get("organization_type_id") == 1, "Audit log should contain the correct new 'organization_type_id' value."
-# assert audit_log["delta"].get("name") == "Test Organization", "Audit log delta should contain the difference"
-# assert audit_log["delta"].get("email") == "organization@gov.bc.ca", "Audit log delta should contain the difference"
-# assert audit_log["delta"].get("phone") == "1111111111", "Audit log delta should contain the difference"
-# assert audit_log["delta"].get("edrms_record") == "EDRMS123", "Audit log delta should contain the difference"
-# assert audit_log["delta"].get("operating_name") == "Test Operating name", "Audit log delta should contain the difference"
-# else:
-# raise AssertionError("Expected UPDATE operation in audit logs for the updated organization.")
\ No newline at end of file
diff --git a/backend/lcfs/tests/audit_log/test_audit_log_repo.py b/backend/lcfs/tests/audit_log/test_audit_log_repo.py
new file mode 100644
index 00000000..02cab9e6
--- /dev/null
+++ b/backend/lcfs/tests/audit_log/test_audit_log_repo.py
@@ -0,0 +1,89 @@
+import pytest
+from unittest.mock import AsyncMock, MagicMock
+from lcfs.web.api.audit_log.repo import AuditLogRepository
+from lcfs.db.models.audit.AuditLog import AuditLog
+
+
+@pytest.fixture
+def mock_db():
+ return AsyncMock()
+
+
+@pytest.fixture
+def audit_log_repo(mock_db):
+ repo = AuditLogRepository()
+ repo.db = mock_db
+ return repo
+
+
+@pytest.mark.anyio
+async def test_get_audit_logs_paginated_success(audit_log_repo, mock_db):
+ # Arrange
+ expected_audit_logs = [AuditLog(audit_log_id=1), AuditLog(audit_log_id=2)]
+ expected_total_count = 2
+
+ # Mock total_count_result for count query
+ mock_total_count_result = MagicMock()
+ mock_total_count_result.scalar_one.return_value = expected_total_count
+
+ # Mock result for the data query
+ mock_result = MagicMock()
+ mock_scalars = MagicMock()
+ mock_scalars.all.return_value = expected_audit_logs
+ mock_result.scalars.return_value = mock_scalars
+
+ # Mock execute to return the total count result and the data result
+ mock_db.execute.side_effect = [mock_total_count_result, mock_result]
+
+ # Act
+ offset = 0
+ limit = 10
+ conditions = []
+ sort_orders = []
+ audit_logs, total_count = await audit_log_repo.get_audit_logs_paginated(
+ offset, limit, conditions, sort_orders
+ )
+
+ # Assert
+ assert audit_logs == expected_audit_logs
+ assert total_count == expected_total_count
+ assert (
+ mock_db.execute.call_count == 2
+ ) # One for the count query, one for the data query
+
+
+@pytest.mark.anyio
+async def test_get_audit_log_by_id_success(audit_log_repo, mock_db):
+ # Arrange
+ audit_log_id = 1
+ expected_audit_log = AuditLog(audit_log_id=audit_log_id)
+
+ # Mock result for the query
+ mock_result = MagicMock()
+ mock_result.scalar_one_or_none.return_value = expected_audit_log
+ mock_db.execute.return_value = mock_result
+
+ # Act
+ result = await audit_log_repo.get_audit_log_by_id(audit_log_id)
+
+ # Assert
+ assert result == expected_audit_log
+ mock_db.execute.assert_called_once()
+
+
+@pytest.mark.anyio
+async def test_get_audit_log_by_id_not_found(audit_log_repo, mock_db):
+ # Arrange
+ audit_log_id = 999
+
+ # Mock result for the query to return None
+ mock_result = MagicMock()
+ mock_result.scalar_one_or_none.return_value = None
+ mock_db.execute.return_value = mock_result
+
+ # Act
+ result = await audit_log_repo.get_audit_log_by_id(audit_log_id)
+
+ # Assert
+ assert result is None
+ mock_db.execute.assert_called_once()
diff --git a/backend/lcfs/tests/audit_log/test_audit_log_services.py b/backend/lcfs/tests/audit_log/test_audit_log_services.py
new file mode 100644
index 00000000..92dbee85
--- /dev/null
+++ b/backend/lcfs/tests/audit_log/test_audit_log_services.py
@@ -0,0 +1,141 @@
+import pytest
+from unittest.mock import AsyncMock
+from lcfs.web.api.audit_log.services import AuditLogService
+from lcfs.web.api.audit_log.repo import AuditLogRepository
+from lcfs.web.api.audit_log.schema import (
+ AuditLogListSchema,
+ AuditLogSchema,
+)
+from lcfs.web.api.base import (
+ PaginationRequestSchema,
+ FilterModel,
+)
+from lcfs.db.models.audit.AuditLog import AuditLog
+from lcfs.web.exception.exceptions import DataNotFoundException
+
+
+@pytest.fixture
+def mock_repo():
+ return AsyncMock(spec=AuditLogRepository)
+
+
+@pytest.fixture
+def audit_log_service(mock_repo):
+ service = AuditLogService()
+ service.repo = mock_repo
+ return service
+
+
+@pytest.mark.anyio
+async def test_get_audit_logs_paginated_success(audit_log_service, mock_repo):
+ # Arrange
+ pagination = PaginationRequestSchema(page=1, size=10, filters=[], sort_orders=[])
+ expected_audit_logs = [
+ AuditLog(
+ audit_log_id=1,
+ table_name="users",
+ operation="INSERT",
+ row_id=123,
+ delta={"name": "John Doe"},
+ create_date="2023-11-01",
+ create_user="admin",
+ ),
+ AuditLog(
+ audit_log_id=2,
+ table_name="orders",
+ operation="UPDATE",
+ row_id=456,
+ delta={"status": "completed"},
+ create_date="2023-11-02",
+ create_user="manager",
+ ),
+ ]
+ expected_total_count = 2
+ mock_repo.get_audit_logs_paginated.return_value = (
+ expected_audit_logs,
+ expected_total_count,
+ )
+
+ # Act
+ result = await audit_log_service.get_audit_logs_paginated(pagination)
+
+ # Assert
+ assert isinstance(result, AuditLogListSchema)
+ assert len(result.audit_logs) == 2
+ assert result.pagination.total == expected_total_count
+ mock_repo.get_audit_logs_paginated.assert_called_once()
+
+
+@pytest.mark.anyio
+async def test_get_audit_logs_paginated_no_data(audit_log_service, mock_repo):
+ # Arrange
+ pagination = PaginationRequestSchema(page=1, size=10, filters=[], sort_orders=[])
+ mock_repo.get_audit_logs_paginated.return_value = ([], 0)
+
+ # Act & Assert
+ with pytest.raises(DataNotFoundException):
+ await audit_log_service.get_audit_logs_paginated(pagination)
+
+
+@pytest.mark.anyio
+async def test_get_audit_log_by_id_success(audit_log_service, mock_repo):
+ # Arrange
+ audit_log_id = 1
+ expected_audit_log = AuditLog(
+ audit_log_id=audit_log_id,
+ table_name="users",
+ operation="INSERT",
+ row_id=123,
+ delta={"name": "John Doe"},
+ create_date="2023-11-01",
+ create_user="admin",
+ )
+ mock_repo.get_audit_log_by_id.return_value = expected_audit_log
+
+ # Act
+ result = await audit_log_service.get_audit_log_by_id(audit_log_id)
+
+ # Assert
+ assert isinstance(result, AuditLogSchema)
+ assert result.audit_log_id == audit_log_id
+ assert result.table_name == "users"
+ mock_repo.get_audit_log_by_id.assert_called_once_with(audit_log_id)
+
+
+@pytest.mark.anyio
+async def test_get_audit_log_by_id_not_found(audit_log_service, mock_repo):
+ # Arrange
+ audit_log_id = 999
+ mock_repo.get_audit_log_by_id.return_value = None
+
+ # Act & Assert
+ with pytest.raises(DataNotFoundException):
+ await audit_log_service.get_audit_log_by_id(audit_log_id)
+
+
+@pytest.mark.anyio
+async def test_apply_audit_log_filters(audit_log_service):
+ # Arrange
+ pagination = PaginationRequestSchema(
+ page=1,
+ size=10,
+ filters=[
+ FilterModel(
+ field="operation", filter_type="text", type="equals", filter="UPDATE"
+ ),
+ FilterModel(
+ field="createDate",
+ filter_type="date",
+ type="greaterThan",
+ date_from="2021-01-01",
+ ),
+ ],
+ sort_orders=[],
+ )
+ conditions = []
+
+ # Act
+ audit_log_service.apply_audit_log_filters(pagination, conditions)
+
+ # Assert
+ assert len(conditions) == 2 # Two filters applied
diff --git a/backend/lcfs/tests/audit_log/test_audit_log_views.py b/backend/lcfs/tests/audit_log/test_audit_log_views.py
new file mode 100644
index 00000000..0f38fde7
--- /dev/null
+++ b/backend/lcfs/tests/audit_log/test_audit_log_views.py
@@ -0,0 +1,129 @@
+import pytest
+from unittest.mock import patch
+from httpx import AsyncClient
+from fastapi import FastAPI
+
+from lcfs.web.api.audit_log.schema import (
+ AuditLogListSchema,
+ AuditLogSchema,
+)
+from lcfs.db.models.user.Role import RoleEnum
+from lcfs.web.exception.exceptions import DataNotFoundException
+
+
+@pytest.mark.anyio
+async def test_get_audit_logs_paginated_success(
+ client: AsyncClient, fastapi_app: FastAPI, set_mock_user
+):
+ with patch(
+ "lcfs.web.api.audit_log.views.AuditLogService.get_audit_logs_paginated"
+ ) as mock_service:
+ # Arrange
+ mock_service.return_value = AuditLogListSchema(
+ audit_logs=[
+ {
+ "audit_log_id": 1,
+ "table_name": "users",
+ "operation": "INSERT",
+ "row_id": 101,
+ "create_date": "2023-01-01T12:00:00",
+ "create_user": "admin",
+ },
+ {
+ "audit_log_id": 2,
+ "table_name": "orders",
+ "operation": "UPDATE",
+ "row_id": 202,
+ "create_date": "2023-01-02T13:00:00",
+ "create_user": "manager",
+ },
+ ],
+ pagination={
+ "total": 2,
+ "page": 1,
+ "size": 10,
+ "total_pages": 1,
+ },
+ )
+ set_mock_user(fastapi_app, [RoleEnum.ADMINISTRATOR])
+
+ url = fastapi_app.url_path_for("get_audit_logs_paginated")
+ payload = {"page": 1, "size": 10, "filters": [], "sortOrders": []}
+
+ # Act
+ response = await client.post(url, json=payload)
+
+ # Assert
+ assert response.status_code == 200
+ data = response.json()
+ assert data["pagination"]["total"] == 2
+ assert len(data["auditLogs"]) == 2
+ mock_service.assert_called_once()
+
+
+@pytest.mark.anyio
+async def test_get_audit_logs_paginated_forbidden(
+ client: AsyncClient, fastapi_app: FastAPI, set_mock_user
+):
+ set_mock_user(fastapi_app, [RoleEnum.SUPPLIER]) # Insufficient permissions
+
+ url = fastapi_app.url_path_for("get_audit_logs_paginated")
+ payload = {"page": 1, "size": 10, "filters": [], "sortOrders": []}
+
+ response = await client.post(url, json=payload)
+
+ assert response.status_code == 403 # Forbidden
+
+
+@pytest.mark.anyio
+async def test_get_audit_log_by_id_success(
+ client: AsyncClient, fastapi_app: FastAPI, set_mock_user
+):
+ with patch(
+ "lcfs.web.api.audit_log.views.AuditLogService.get_audit_log_by_id"
+ ) as mock_service:
+ # Arrange
+ audit_log_id = 1
+ mock_service.return_value = AuditLogSchema(
+ audit_log_id=audit_log_id,
+ table_name="users",
+ operation="UPDATE",
+ row_id=101,
+ create_date="2023-01-01T12:00:00",
+ create_user="admin",
+ )
+ set_mock_user(fastapi_app, [RoleEnum.ADMINISTRATOR])
+
+ url = fastapi_app.url_path_for("get_audit_log_by_id", audit_log_id=audit_log_id)
+
+ # Act
+ response = await client.get(url)
+
+ # Assert
+ assert response.status_code == 200
+ data = response.json()
+ assert data["auditLogId"] == audit_log_id
+ assert data["tableName"] == "users"
+ mock_service.assert_called_once_with(audit_log_id)
+
+
+@pytest.mark.anyio
+async def test_get_audit_log_by_id_not_found(
+ client: AsyncClient, fastapi_app: FastAPI, set_mock_user
+):
+ with patch(
+ "lcfs.web.api.audit_log.views.AuditLogService.get_audit_log_by_id"
+ ) as mock_service:
+ # Arrange
+ audit_log_id = 999
+ mock_service.side_effect = DataNotFoundException("Audit log not found")
+ set_mock_user(fastapi_app, [RoleEnum.ADMINISTRATOR])
+
+ url = fastapi_app.url_path_for("get_audit_log_by_id", audit_log_id=audit_log_id)
+
+ # Act
+ response = await client.get(url)
+
+ # Assert
+ assert response.status_code == 404
+ mock_service.assert_called_once_with(audit_log_id)
diff --git a/backend/lcfs/tests/test_organization.py b/backend/lcfs/tests/test_organization.py
index 9f9d44a8..3e8046be 100644
--- a/backend/lcfs/tests/test_organization.py
+++ b/backend/lcfs/tests/test_organization.py
@@ -4,7 +4,6 @@
from starlette import status
from lcfs.db.models.user.Role import RoleEnum
-from lcfs.tests.audit_log.conftest import mock_user_role
from lcfs.web.api.organizations.schema import (
OrganizationBalanceResponseSchema,
OrganizationListSchema,
@@ -72,10 +71,10 @@ async def test_get_organization_by_id_bceid_user(
@pytest.mark.anyio
async def test_create_organization_success(
- client: AsyncClient, fastapi_app: FastAPI, mock_user_role
+ client: AsyncClient, fastapi_app: FastAPI, set_mock_user
) -> None:
# Set mock user role for organization creation
- mock_user_role([RoleEnum.GOVERNMENT])
+ set_mock_user(fastapi_app, [RoleEnum.GOVERNMENT])
payload = {
"name": "Test Organizationa",
"operatingName": "Test Operating name",
@@ -109,13 +108,12 @@ async def test_create_organization_success(
assert response.status_code == status.HTTP_201_CREATED
-
@pytest.mark.anyio
async def test_update_organization_success(
- client: AsyncClient, fastapi_app: FastAPI, mock_user_role
+ client: AsyncClient, fastapi_app: FastAPI, set_mock_user
) -> None:
# Set mock user role for organization update.
- mock_user_role([RoleEnum.GOVERNMENT])
+ set_mock_user(fastapi_app, [RoleEnum.GOVERNMENT])
payload = {
"name": "Test Organization",
"operatingName": "Test Operating name",
@@ -151,10 +149,10 @@ async def test_update_organization_success(
@pytest.mark.anyio
async def test_update_organization_failure(
- client: AsyncClient, fastapi_app: FastAPI, mock_user_role
+ client: AsyncClient, fastapi_app: FastAPI, set_mock_user
) -> None:
# Set mock user role for organization update
- mock_user_role([RoleEnum.GOVERNMENT])
+ set_mock_user(fastapi_app, [RoleEnum.GOVERNMENT])
payload = {
"name": "Test Organizationa",
"operatingName": "Test Operating name",
@@ -334,22 +332,21 @@ async def create_organization(
client: AsyncClient,
fastapi_app: FastAPI,
payload: dict,
- #role: RoleEnum = RoleEnum.GOVERNMENT
+ # role: RoleEnum = RoleEnum.GOVERNMENT
) -> object:
"""Helper function to create an organization and return the response."""
- #mock_user_role([role])
+ # set_mock_user([role])
url = fastapi_app.url_path_for("create_organization")
response = await client.post(url, json=payload)
return response
async def update_organization(
- client: AsyncClient,
- fastapi_app: FastAPI,
- organization_id: int,
- payload: dict
+ client: AsyncClient, fastapi_app: FastAPI, organization_id: int, payload: dict
) -> object:
"""Helper function to update an organization and return the response."""
- url = fastapi_app.url_path_for("update_organization", organization_id=organization_id)
+ url = fastapi_app.url_path_for(
+ "update_organization", organization_id=organization_id
+ )
response = await client.put(url, json=payload)
return response
diff --git a/backend/lcfs/web/api/audit_log/__init__.py b/backend/lcfs/web/api/audit_log/__init__.py
index 42ff532f..10a4366c 100644
--- a/backend/lcfs/web/api/audit_log/__init__.py
+++ b/backend/lcfs/web/api/audit_log/__init__.py
@@ -2,4 +2,4 @@
from lcfs.web.api.audit_log.views import router
-__all__ = ["router"]
\ No newline at end of file
+__all__ = ["router"]
diff --git a/backend/lcfs/web/api/audit_log/repo.py b/backend/lcfs/web/api/audit_log/repo.py
index 970b9fd4..cc75dfb2 100644
--- a/backend/lcfs/web/api/audit_log/repo.py
+++ b/backend/lcfs/web/api/audit_log/repo.py
@@ -1,26 +1,64 @@
+from typing import Optional, List
+from fastapi import Depends
+from sqlalchemy import select, desc, asc, and_, func
from sqlalchemy.ext.asyncio import AsyncSession
-from sqlalchemy.future import select
-from typing import List, Optional
+
+from lcfs.db.dependencies import get_async_db_session
from lcfs.db.models.audit.AuditLog import AuditLog
from lcfs.web.core.decorators import repo_handler
-from sqlalchemy.dialects.postgresql import dialect # Import dialect for compiling with Postgres
+from lcfs.web.api.base import (
+ get_field_for_filter,
+ SortOrder,
+)
class AuditLogRepository:
- def __init__(self, session: AsyncSession):
- self.session = session
-
- async def get_audit_log(
- self, table_name: Optional[str] = None, operation: Optional[str] = None
- ) -> List[AuditLog]:
- query = select(AuditLog)
- if table_name:
- query = query.where(AuditLog.table_name == table_name)
- if operation:
- query = query.where(AuditLog.operation == operation)
-
- # Order by created_at in descending order and limit to 1
- query = query.order_by(AuditLog.create_date.desc()).limit(1)
-
- result = await self.session.execute(query)
- return result.scalars().first()
\ No newline at end of file
+ def __init__(self, db: AsyncSession = Depends(get_async_db_session)):
+ self.db = db
+
+ @repo_handler
+ async def get_audit_logs_paginated(
+ self,
+ offset: int,
+ limit: Optional[int],
+ conditions: List = [],
+ sort_orders: List[SortOrder] = [],
+ ):
+ """
+ Fetches paginated, filtered, and sorted audit logs.
+ """
+ query = select(AuditLog).where(and_(*conditions))
+
+ # Apply sorting
+ if sort_orders:
+ for order in sort_orders:
+ direction = asc if order.direction == "asc" else desc
+ field = get_field_for_filter(AuditLog, order.field)
+ if field is not None:
+ query = query.order_by(direction(field))
+ else:
+ # Default sorting by create_date descending
+ query = query.order_by(desc(AuditLog.create_date))
+
+ # Get total count for pagination
+ count_query = select(func.count()).select_from(query.subquery())
+ total_count_result = await self.db.execute(count_query)
+ total_count = total_count_result.scalar_one()
+
+ # Apply pagination
+ query = query.offset(offset).limit(limit)
+
+ # Execute the query
+ result = await self.db.execute(query)
+ audit_logs = result.scalars().all()
+
+ return audit_logs, total_count
+
+ @repo_handler
+ async def get_audit_log_by_id(self, audit_log_id: int) -> Optional[AuditLog]:
+ """
+ Retrieves an audit log entry by its ID.
+ """
+ query = select(AuditLog).where(AuditLog.audit_log_id == audit_log_id)
+ result = await self.db.execute(query)
+ return result.scalar_one_or_none()
diff --git a/backend/lcfs/web/api/audit_log/schema.py b/backend/lcfs/web/api/audit_log/schema.py
index d945e2f8..957bf3f7 100644
--- a/backend/lcfs/web/api/audit_log/schema.py
+++ b/backend/lcfs/web/api/audit_log/schema.py
@@ -1,15 +1,50 @@
-from typing import Optional
-from pydantic import BaseModel
+from typing import Optional, List
+from datetime import datetime
+from enum import Enum
-class AuditLogFilterSchema(BaseModel):
- table_name: Optional[str]
- operation: Optional[str]
+from lcfs.web.api.base import BaseSchema, PaginationResponseSchema
-class AuditLogResponseSchema(BaseModel):
- id: int
+
+# Operation Enum
+class AuditLogOperationEnum(str, Enum):
+ INSERT = "INSERT"
+ UPDATE = "UPDATE"
+ DELETE = "DELETE"
+
+
+# AuditLog Schema
+class AuditLogSchema(BaseSchema):
+ audit_log_id: int
table_name: str
- operation: str
+ operation: AuditLogOperationEnum
row_id: int
- old_values: Optional[dict]
- new_values: Optional[dict]
- delta: Optional[dict]
+ old_values: Optional[dict] = None
+ new_values: Optional[dict] = None
+ delta: Optional[dict] = None
+ create_date: Optional[datetime] = None
+ create_user: Optional[str] = None
+ update_date: Optional[datetime] = None
+ update_user: Optional[str] = None
+
+ class Config:
+ from_attributes = True
+
+
+# Simplified AuditLog Schema for list items
+class AuditLogListItemSchema(BaseSchema):
+ audit_log_id: int
+ table_name: str
+ operation: AuditLogOperationEnum
+ row_id: int
+ changed_fields: Optional[str] = None
+ create_date: Optional[datetime] = None
+ create_user: Optional[str] = None
+
+ class Config:
+ from_attributes = True
+
+
+# AuditLog List Schema
+class AuditLogListSchema(BaseSchema):
+ pagination: PaginationResponseSchema
+ audit_logs: List[AuditLogListItemSchema]
diff --git a/backend/lcfs/web/api/audit_log/services.py b/backend/lcfs/web/api/audit_log/services.py
index 397145fc..8a1a8152 100644
--- a/backend/lcfs/web/api/audit_log/services.py
+++ b/backend/lcfs/web/api/audit_log/services.py
@@ -1,17 +1,117 @@
-from typing import List, Optional
+from typing import List
+from math import ceil
+
from fastapi import Depends
-from sqlalchemy.ext.asyncio import AsyncSession
-from lcfs.web.core.decorators import service_handler
from .repo import AuditLogRepository
-from lcfs.db.dependencies import get_async_db_session
+from lcfs.web.api.audit_log.schema import (
+ AuditLogSchema,
+ AuditLogListItemSchema,
+ AuditLogListSchema,
+)
+from lcfs.web.api.base import (
+ PaginationRequestSchema,
+ PaginationResponseSchema,
+ apply_filter_conditions,
+ get_field_for_filter,
+ validate_pagination,
+)
+from lcfs.web.core.decorators import service_handler
+from lcfs.web.exception.exceptions import DataNotFoundException
from lcfs.db.models.audit.AuditLog import AuditLog
+
class AuditLogService:
- def __init__(self, session: AsyncSession = Depends(get_async_db_session)):
- self.repo = AuditLogRepository(session)
-
- async def get_audit_log(
- self, table_name: Optional[str] = None, operation: Optional[str] = None
- ) -> List[AuditLog]:
- return await self.repo.get_audit_log(table_name, operation)
\ No newline at end of file
+ def __init__(self, repo: AuditLogRepository = Depends(AuditLogRepository)):
+ self.repo = repo
+
+ def apply_audit_log_filters(
+ self, pagination: PaginationRequestSchema, conditions: List
+ ):
+ """
+ Apply filters to the audit logs query.
+ """
+ for filter in pagination.filters:
+ filter_value = filter.filter
+ filter_option = filter.type
+ filter_type = filter.filter_type
+
+ # Handle date filters
+ if filter.filter_type == "date":
+ filter_value = []
+ if filter.date_from:
+ filter_value.append(filter.date_from)
+ if filter.date_to:
+ filter_value.append(filter.date_to)
+ if not filter_value:
+ continue # Skip if no valid date is provided
+
+ # Retrieve the correct field based on the filter field name
+ field = get_field_for_filter(AuditLog, filter.field)
+
+ if field is not None:
+ condition = apply_filter_conditions(
+ field, filter_value, filter_option, filter_type
+ )
+ if condition is not None:
+ conditions.append(condition)
+
+ @service_handler
+ async def get_audit_logs_paginated(
+ self, pagination: PaginationRequestSchema
+ ) -> AuditLogListSchema:
+ """
+ Fetch audit logs with filters, sorting, and pagination.
+ """
+ conditions = []
+ pagination = validate_pagination(pagination)
+
+ if pagination.filters:
+ self.apply_audit_log_filters(pagination, conditions)
+
+ offset = (pagination.page - 1) * pagination.size
+ limit = pagination.size
+
+ audit_logs, total_count = await self.repo.get_audit_logs_paginated(
+ offset, limit, conditions, pagination.sort_orders
+ )
+
+ if not audit_logs:
+ raise DataNotFoundException("No audit logs found")
+
+ processed_audit_logs = []
+ for audit_log in audit_logs:
+ # Extract the changed_fields as a comma-separated string
+ if audit_log.delta:
+ changed_fields = ", ".join(audit_log.delta.keys())
+ else:
+ changed_fields = None
+
+ processed_log = AuditLogListItemSchema(
+ audit_log_id=audit_log.audit_log_id,
+ table_name=audit_log.table_name,
+ operation=audit_log.operation,
+ row_id=audit_log.row_id,
+ changed_fields=changed_fields,
+ create_date=audit_log.create_date,
+ create_user=audit_log.create_user,
+ )
+ processed_audit_logs.append(processed_log)
+
+ return AuditLogListSchema(
+ audit_logs=processed_audit_logs,
+ pagination=PaginationResponseSchema(
+ total=total_count,
+ page=pagination.page,
+ size=pagination.size,
+ total_pages=ceil(total_count / pagination.size),
+ ),
+ )
+
+ @service_handler
+ async def get_audit_log_by_id(self, audit_log_id: int) -> AuditLogSchema:
+ """Fetch a single audit log by ID."""
+ audit_log = await self.repo.get_audit_log_by_id(audit_log_id)
+ if not audit_log:
+ raise DataNotFoundException("Audit log not found")
+ return AuditLogSchema.model_validate(audit_log)
diff --git a/backend/lcfs/web/api/audit_log/views.py b/backend/lcfs/web/api/audit_log/views.py
index 9e513bf9..44f177ef 100644
--- a/backend/lcfs/web/api/audit_log/views.py
+++ b/backend/lcfs/web/api/audit_log/views.py
@@ -1,30 +1,46 @@
-from functools import cache
-from fastapi import APIRouter, Depends, Query
-from typing import List, Optional
+import structlog
+from fastapi import APIRouter, Depends, status, Request, Body
-from lcfs.db.models.user.Role import RoleEnum
+from lcfs.web.api.base import PaginationRequestSchema
from lcfs.web.core.decorators import view_handler
-from .services import AuditLogService
-from .schema import AuditLogResponseSchema
-from starlette import status
-from fastapi import Request
+from lcfs.web.api.audit_log.services import AuditLogService
+from lcfs.web.api.audit_log.schema import AuditLogListSchema, AuditLogSchema
+from lcfs.db.models.user.Role import RoleEnum
+
+logger = structlog.get_logger(__name__)
router = APIRouter()
+@router.post(
+ "/list",
+ response_model=AuditLogListSchema,
+ status_code=status.HTTP_200_OK,
+)
+@view_handler([RoleEnum.GOVERNMENT, RoleEnum.ADMINISTRATOR])
+async def get_audit_logs_paginated(
+ request: Request,
+ pagination: PaginationRequestSchema = Body(..., embed=False),
+ service: AuditLogService = Depends(),
+):
+ """
+ Fetches a list of audit logs with pagination and filtering.
+ """
+ return await service.get_audit_logs_paginated(pagination)
+
+
@router.get(
- "/",
- response_model=AuditLogResponseSchema,
+ "/{audit_log_id}",
+ response_model=AuditLogSchema,
status_code=status.HTTP_200_OK,
)
-@view_handler([RoleEnum.ADMINISTRATOR])
-async def get_audit_log(
+@view_handler([RoleEnum.GOVERNMENT, RoleEnum.ADMINISTRATOR])
+async def get_audit_log_by_id(
request: Request,
- table_name: Optional[str] = Query(None, description="Filter by table name"),
- operation: Optional[str] = Query(None, description="Filter by operation"),
+ audit_log_id: int,
service: AuditLogService = Depends(),
):
"""
- Get audit logs with optional filters for `table_name` and `operation`.
+ Retrieve an audit log entry by ID.
"""
- return await service.get_audit_log(table_name=table_name, operation=operation)
\ No newline at end of file
+ return await service.get_audit_log_by_id(audit_log_id)
diff --git a/backend/lcfs/web/api/router.py b/backend/lcfs/web/api/router.py
index da83b101..83f9d8cf 100644
--- a/backend/lcfs/web/api/router.py
+++ b/backend/lcfs/web/api/router.py
@@ -25,7 +25,7 @@
allocation_agreement,
document,
fuel_type,
- audit_log
+ audit_log,
)
api_router = APIRouter()
@@ -85,4 +85,4 @@
)
api_router.include_router(document.router, prefix="/documents", tags=["documents"])
api_router.include_router(fuel_type.router, prefix="/fuel-type", tags=["fuel_type"])
-api_router.include_router(audit_log.router, prefix="/audit_log", tags=["audit_log"])
+api_router.include_router(audit_log.router, prefix="/audit-log", tags=["audit_log"])
diff --git a/frontend/src/App.jsx b/frontend/src/App.jsx
index f2317c58..62681cd1 100644
--- a/frontend/src/App.jsx
+++ b/frontend/src/App.jsx
@@ -2,6 +2,7 @@ import { RouterProvider, createBrowserRouter, Navigate } from 'react-router-dom'
import { ROUTES } from './constants/routes'
import { MainLayout } from './layouts/MainLayout'
import { AdminMenu } from './views/Admin/AdminMenu'
+import { ViewAuditLog } from '@/views/Admin/AdminMenu/components/ViewAuditLog'
import { ViewUser } from '@/views/Admin/AdminMenu/components/ViewUser'
import { ComplianceReports } from './views/ComplianceReports'
import { Dashboard } from './views/Dashboard'
@@ -228,6 +229,16 @@ const router = createBrowserRouter([
element: