Skip to content

Commit

Permalink
Merge branch 'main' into feat/1010-admin-api-gateway
Browse files Browse the repository at this point in the history
  • Loading branch information
ianliuwk1019 authored Nov 23, 2023
2 parents 6e4d82a + 5b69940 commit 57aac25
Show file tree
Hide file tree
Showing 19 changed files with 808 additions and 5 deletions.
9 changes: 9 additions & 0 deletions server/admin_management/api/app/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from enum import Enum


class UserType(str, Enum):
IDIR = "I"
BCEID = "B"


COGNITO_USERNAME_KEY = "username"
12 changes: 12 additions & 0 deletions server/admin_management/api/app/jwt_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
get_user_pool_id,
)

from api.app.constants import COGNITO_USERNAME_KEY


JWT_GROUPS_KEY = "cognito:groups"
JWT_CLIENT_ID_KEY = "client_id"
Expand Down Expand Up @@ -199,3 +201,13 @@ def authorize(claims: dict = Depends(validate_token)) -> dict:
def get_access_roles(claims: dict = Depends(authorize)):
groups = claims[JWT_GROUPS_KEY]
return groups


def get_request_cognito_user_id(claims: dict = Depends(authorize)):
# This is NOT user's name, display name or user ID.
# It is mapped to "cognito:username" (ID Token) and "username" (Access Token).
# It is the "cognito_user_id" column for fam_user table.
# Example value: idir_b5ecdb094dfb4149a6a8445a0mangled0@idir
cognito_username = claims[COGNITO_USERNAME_KEY]
LOGGER.debug(f"Current requester's cognito_username for API: {cognito_username}")
return cognito_username
9 changes: 7 additions & 2 deletions server/admin_management/api/app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from mangum import Mangum

from api.config.config import get_root_path, get_allow_origins
from api.app.routers import router_smoke_test
from api.app.routers import router_smoke_test, router_application_admin


logConfigFile = os.path.join(
Expand All @@ -19,7 +19,7 @@

apiPrefix = ""
description = """
Forest Access Management API used by the Forest Access Management application
Forest Access Management Admin Management API used by the Forest Access Management application
to define admin access to forest applications.
"""

Expand Down Expand Up @@ -58,6 +58,11 @@ def custom_generate_unique_id(route: APIRouter):
app.include_router(
router_smoke_test.router, prefix=apiPrefix + "/smoke_test", tags=["Smoke Test"]
)
app.include_router(
router_application_admin.router,
prefix=apiPrefix + "/application_admin",
tags=["FAM Application Admin"],
)


@app.get("/", include_in_schema=False, tags=["docs"])
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import logging
from sqlalchemy.orm import Session
from typing import List

from api.app.models import model as models


LOGGER = logging.getLogger(__name__)


class ApplicationAdminRepository:
def __init__(self, db: Session):
self.db = db

def get_application_admin_by_app_and_user_id(
self, application_id: int, user_id: int
) -> models.FamApplicationAdmin:
return (
self.db.query(models.FamApplicationAdmin)
.filter(
models.FamApplicationAdmin.application_id == application_id,
models.FamApplicationAdmin.user_id == user_id,
)
.one_or_none()
)

def get_application_admin_by_id(
self, application_admin_id: int
) -> models.FamApplicationAdmin:
return (
self.db.query(models.FamApplicationAdmin)
.filter(
models.FamApplicationAdmin.application_admin_id == application_admin_id
)
.one_or_none()
)

def get_application_admin_by_application_id(
self, application_id: int
) -> List[models.FamApplicationAdmin]:
return (
self.db.query(models.FamApplicationAdmin)
.filter(
models.FamApplicationAdmin.application_id == application_id
)
.all()
)

def create_application_admin(
self, application_id: int, user_id: int, requester: str
) -> models.FamApplicationAdmin:
new_fam_application_admin: models.FamApplicationAdmin = (
models.FamApplicationAdmin(
**{
"user_id": user_id,
"application_id": application_id,
"create_user": requester,
}
)
)
self.db.add(new_fam_application_admin)
self.db.flush()
self.db.refresh(new_fam_application_admin)
LOGGER.debug(
f"New FamApplicationAdmin added for {new_fam_application_admin.__dict__}"
)
return new_fam_application_admin

def delete_application_admin(self, application_admin_id: int):
record = (
self.db.query(models.FamApplicationAdmin)
.filter(
models.FamApplicationAdmin.application_admin_id == application_admin_id
)
.one()
)
self.db.delete(record)
self.db.flush()
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import logging
from sqlalchemy.orm import Session

from api.app.models import model as models


LOGGER = logging.getLogger(__name__)


class ApplicationRepository:
def __init__(self, db: Session):
self.db = db

def get_application(self, application_id: int) -> models.FamApplication:
return (
self.db.query(models.FamApplication)
.filter(models.FamApplication.application_id == application_id)
.one_or_none()
)
45 changes: 45 additions & 0 deletions server/admin_management/api/app/repositories/user_repository.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import logging
from sqlalchemy.orm import Session

from api.app.models import model as models
from api.app import schemas


LOGGER = logging.getLogger(__name__)


class UserRepository:
def __init__(self, db: Session):
self.db = db

def get_user_by_domain_and_name(
self, user_type_code: str, user_name: str
) -> models.FamUser:
fam_user: models.FamUser = (
self.db.query(models.FamUser)
.filter(
models.FamUser.user_type_code == user_type_code,
models.FamUser.user_name.ilike(user_name),
)
.one_or_none()
)
LOGGER.debug(
f"fam_user {str(fam_user.user_id) + ' found' if fam_user else 'not found'}."
)
return fam_user

def get_user_by_cognito_user_id(self, cognito_user_id: str) -> models.FamUser:
return (
self.db.query(models.FamUser)
.filter(models.FamUser.cognito_user_id == cognito_user_id)
.one_or_none()
)

def create_user(self, fam_user: schemas.FamUser) -> models.FamUser:
LOGGER.debug(f"Creating fam user: {fam_user}")

fam_user_dict = fam_user.model_dump()
db_item = models.FamUser(**fam_user_dict)
self.db.add(db_item)
self.db.flush()
return db_item
164 changes: 164 additions & 0 deletions server/admin_management/api/app/routers/router_application_admin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
import logging
from fastapi import APIRouter, Depends, Request, Response, HTTPException
from sqlalchemy.orm import Session
from typing import List


from api.app.models import model as models
from api.app.routers.router_guards import (
get_current_requester,
authorize_by_fam_admin,
enforce_self_grant_guard,
validate_param_application_admin_id,
validate_param_application_id,
)
from api.app import database, jwt_validation, schemas
from api.app.schemas import Requester
from api.app.services.application_admin_service import ApplicationAdminService
from api.app.services.user_service import UserService
from api.app.services.application_service import ApplicationService
from api.app.utils.audit_util import AuditEventLog, AuditEventOutcome, AuditEventType

LOGGER = logging.getLogger(__name__)

router = APIRouter()


@router.post(
"",
response_model=schemas.FamAppAdminGet,
dependencies=[
Depends(authorize_by_fam_admin),
Depends(enforce_self_grant_guard),
Depends(validate_param_application_id),
],
)
def create_application_admin(
application_admin_request: schemas.FamAppAdminCreate,
request: Request,
db: Session = Depends(database.get_db),
token_claims: dict = Depends(jwt_validation.authorize),
requester: Requester = Depends(get_current_requester),
):

LOGGER.debug(
f"Executing 'create_application_admin' "
f"with request: {application_admin_request}, requestor: {token_claims}"
)

audit_event_log = AuditEventLog(
request=request,
event_type=AuditEventType.CREATE_APPLICATION_ADMIN_ACCESS,
event_outcome=AuditEventOutcome.SUCCESS,
)

try:
application_admin_service = ApplicationAdminService(db)
application_service = ApplicationService(db)
user_service = UserService(db)

audit_event_log.requesting_user = user_service.get_user_by_cognito_user_id(
requester.cognito_user_id
)
audit_event_log.application = application_service.get_application(
application_admin_request.application_id
)
audit_event_log.target_user = user_service.get_user_by_domain_and_name(
application_admin_request.user_type_code,
application_admin_request.user_name,
)

return application_admin_service.create_application_admin(
application_admin_request, requester.cognito_user_id
)

except Exception as e:
audit_event_log.event_outcome = AuditEventOutcome.FAIL
audit_event_log.exception = e
raise e

finally:
if audit_event_log.target_user is None:
audit_event_log.target_user = models.FamUser(
user_type_code=application_admin_request.user_type_code,
user_name=application_admin_request.user_name,
user_guid="unknown",
cognito_user_id="unknown",
)

audit_event_log.log_event()


@router.delete(
"/{application_admin_id}",
response_class=Response,
dependencies=[
Depends(authorize_by_fam_admin),
Depends(enforce_self_grant_guard),
Depends(validate_param_application_admin_id),
],
)
def delete_application_admin(
application_admin_id: int,
request: Request,
db: Session = Depends(database.get_db),
requester: Requester = Depends(get_current_requester),
):
LOGGER.debug(
f"Executing 'delete_application_admin' with request: {application_admin_id}"
)

audit_event_log = AuditEventLog(
request=request,
event_type=AuditEventType.REMOVE_APPLICATION_ADMIN_ACCESS,
event_outcome=AuditEventOutcome.SUCCESS,
)

try:
application_admin_service = ApplicationAdminService(db)
user_service = UserService(db)

application_admin = application_admin_service.get_application_admin_by_id(
application_admin_id
)
audit_event_log.requesting_user = user_service.get_user_by_cognito_user_id(
requester.cognito_user_id
)
audit_event_log.application = application_admin.application
audit_event_log.target_user = application_admin.user

return application_admin_service.delete_application_admin(application_admin_id)

except Exception as e:
audit_event_log.event_outcome = AuditEventOutcome.FAIL
audit_event_log.exception = e
raise e

finally:
audit_event_log.log_event()


@router.get(
"/{application_id}/admins",
response_model=List[schemas.FamAppAdminGet],
status_code=200,
dependencies=[Depends(authorize_by_fam_admin)],
)
def get_application_admin_by_applicationid(
application_id: int,
db: Session = Depends(database.get_db),
):
LOGGER.debug(
f"Loading application admin access for application_id: {application_id}"
)
application_admin_service = ApplicationAdminService(db)
application_admin_access = (
application_admin_service.get_application_admin_by_application_id(
application_id
)
)
LOGGER.debug(
f"Finished loading application admin access for application - # of results = {len(application_admin_access)}"
)

return application_admin_access
Loading

0 comments on commit 57aac25

Please sign in to comment.