From 2f02a517aa1ecf52f03d26f55abf7eab9780a26c Mon Sep 17 00:00:00 2001 From: MCatherine Date: Thu, 8 Feb 2024 13:44:48 -0800 Subject: [PATCH] feat: #1139 add forest client validator (#1176) --- .github/workflows/sonar.yml | 2 + .../server/fam_admin_management_api.tf | 3 + server/admin_management/api/app/constants.py | 5 + .../integration/forest_client_integration.py | 77 +++++++++++++ server/admin_management/api/app/schemas.py | 40 ++++--- .../access_control_privilege_service.py | 33 +++++- .../validator/forest_client_validator.py | 38 ++++++ server/admin_management/api/config/config.py | 37 ++++-- server/admin_management/local-dev.env | 1 + server/admin_management/tests/conftest.py | 39 +++++-- server/admin_management/tests/constants.py | 3 +- .../test_forest_client_integration.py | 52 +++++++++ .../test_router_access_control_privilege.py | 24 +++- .../test_access_control_privilege_service.py | 71 ++++++++++++ .../services/test_forest_client_validator.py | 109 ++++++++++++++++++ 15 files changed, 495 insertions(+), 39 deletions(-) create mode 100644 server/admin_management/api/app/integration/forest_client_integration.py create mode 100644 server/admin_management/api/app/services/validator/forest_client_validator.py create mode 100644 server/admin_management/tests/integration/test_forest_client_integration.py create mode 100644 server/admin_management/tests/services/test_forest_client_validator.py diff --git a/.github/workflows/sonar.yml b/.github/workflows/sonar.yml index 61935a1e4..db2b5be07 100644 --- a/.github/workflows/sonar.yml +++ b/.github/workflows/sonar.yml @@ -118,6 +118,8 @@ jobs: python-version: 3.8 - name: Tests and coverage + env: + FC_API_TOKEN: ${{ secrets.FOREST_CLIENT_API_API_KEY }} run: | cd server/admin_management pip install -r requirements.txt -r requirements-dev.txt diff --git a/infrastructure/server/fam_admin_management_api.tf b/infrastructure/server/fam_admin_management_api.tf index ecc6b4d18..f2d00c9f2 100644 --- a/infrastructure/server/fam_admin_management_api.tf +++ b/infrastructure/server/fam_admin_management_api.tf @@ -128,6 +128,9 @@ resource "aws_lambda_function" "fam_admin_management_api_function" { COGNITO_CLIENT_ID = "3hv7q2mct0okt12m5i3p5v4phu" ALLOW_ORIGIN = "${var.front_end_redirect_path}" + + FC_API_TOKEN = "${var.forest_client_api_api_key}" + FC_API_BASE_URL = "${var.forest_client_api_base_url}" } } diff --git a/server/admin_management/api/app/constants.py b/server/admin_management/api/app/constants.py index 5194e5ab0..fd0e22dd5 100644 --- a/server/admin_management/api/app/constants.py +++ b/server/admin_management/api/app/constants.py @@ -38,3 +38,8 @@ class AdminRoleAuthGroup(str, Enum): APP_ADMIN = "APP_ADMIN" DELEGATED_ADMIN = "DELEGATED_ADMIN" + +FOREST_CLIENT_STATUS = { + "KEY": "clientStatusCode", + "CODE_ACTIVE": "ACT" +} diff --git a/server/admin_management/api/app/integration/forest_client_integration.py b/server/admin_management/api/app/integration/forest_client_integration.py new file mode 100644 index 000000000..5c822b3a6 --- /dev/null +++ b/server/admin_management/api/app/integration/forest_client_integration.py @@ -0,0 +1,77 @@ +import logging +from http import HTTPStatus +from typing import List + +import requests +from api.config import config +from api.app.schemas import ForestClientIntegrationFindResponse + +LOGGER = logging.getLogger(__name__) + + +class ForestClientIntegrationService(): + """ + The class is used for making requests to get information from Forest Client API. + Api is located at BC API Service Portal: https://api.gov.bc.ca/devportal/api-directory/3179. + API-key needs to be requested from the portal. + Spec of API: + test: https://nr-forest-client-api-test.api.gov.bc.ca/ + prod: https://nr-forest-client-api-prod.api.gov.bc.ca/ + """ + # https://requests.readthedocs.io/en/latest/user/quickstart/#timeouts + # https://docs.python-requests.org/en/latest/user/advanced/#timeouts + TIMEOUT = (5, 10) # Timeout (connect, read) in seconds. + + def __init__(self): + self.api_base_url = config.get_forest_client_api_baseurl() + self.api_clients_url = f"{self.api_base_url}/api/clients" + self.API_TOKEN = config.get_forest_client_api_token() + self.headers = {"Accept": "application/json", "X-API-KEY": self.API_TOKEN} + + # See Python: https://requests.readthedocs.io/en/latest/user/advanced/ + self.session = requests.Session() + self.session.headers.update(self.headers) + + def find_by_client_number(self, p_client_number: str) -> List[ForestClientIntegrationFindResponse]: + """ + Find Forest Client(s) information based on p_client_number search query field. + + :param p_client_number: Forest Client Number string (8 digits). + Note! Current Forest Client API can only do exact match. + '/api/clients/findByClientNumber/{clientNumber}' + :return: Search result as List for a Forest Client information object. + Current Forest Client API returns exact one result or http status + other than 200 with message content. The intent for FAM search is for + wild card search and Forest Client API could be capable of doing that + in next version. + """ + url = f"{self.api_clients_url}/findByClientNumber/{p_client_number}" + LOGGER.debug(f"ForestClientIntegrationService find_by_client_number() - url: {url}") + + try: + r = self.session.get(url, timeout=self.TIMEOUT) + r.raise_for_status() + # !! Don't map and return schema.FamForestClient or object from "scheam.py" as that + # will create circular dependency issue. let crud to map the result. + api_result = r.json() + LOGGER.debug(f"API result: {api_result}") + return [api_result] + + # Below except catches only HTTPError not general errors like network connection/timeout. + except requests.exceptions.HTTPError as he: + status_code = r.status_code + LOGGER.debug(f"API status code: {status_code}") + LOGGER.debug(f"API result: {r.content or r.reason}") + + # For some reason Forest Client API uses (a bit confusing): + # - '404' as general "client 'Not Found'", not as conventional http Not Found. + # + # Forest Client API returns '400' as "Invalid Client Number"; e.g. /findByClientNumber/abcde0001 + # Howerver FAM 'search' (as string type param) is intended for either 'number' or 'name' search), + # so if 400, will return empty. + if ((status_code == HTTPStatus.NOT_FOUND) or (status_code == HTTPStatus.BAD_REQUEST)): + return [] # return empty for FAM forest client search + + # Else raise error, including 500 + # There is a general error handler, see: requests_http_error_handler + raise he \ No newline at end of file diff --git a/server/admin_management/api/app/schemas.py b/server/admin_management/api/app/schemas.py index 512364a57..f43115020 100644 --- a/server/admin_management/api/app/schemas.py +++ b/server/admin_management/api/app/schemas.py @@ -133,13 +133,13 @@ class FamRoleCreateDto(FamRoleBase): parent_role_id: Union[int, None] = Field( default=None, title="Reference role_id to higher role" ) - forest_client_number: Optional[ - Annotated[str, StringConstraints(max_length=8)] - ] = Field(default=None, title="Forest Client this role is associated with") + forest_client_number: Optional[Annotated[str, StringConstraints(max_length=8)]] = ( + Field(default=None, title="Forest Client this role is associated with") + ) create_user: Annotated[str, StringConstraints(max_length=60)] - client_number: Optional[ - FamForestClientCreateDto - ] = None # this is matched with the model + client_number: Optional[FamForestClientCreateDto] = ( + None # this is matched with the model + ) model_config = ConfigDict(from_attributes=True) @@ -226,12 +226,15 @@ class FamAccessControlPrivilegeCreateResponse(BaseModel): # ------------------------------------- FAM Admin User Access ---------------------------------------- # class FamApplicationDto(BaseModel): id: int = Field(validation_alias="application_id") - name: Annotated[str, StringConstraints(max_length=100)] = \ - Field(validation_alias="application_name") - description: Annotated[Optional[str], StringConstraints(max_length=200)] = \ - Field(default=None, validation_alias="application_description") - env: Optional[famConstants.AppEnv] = \ - Field(validation_alias="app_environment", default=None) + name: Annotated[str, StringConstraints(max_length=100)] = Field( + validation_alias="application_name" + ) + description: Annotated[Optional[str], StringConstraints(max_length=200)] = Field( + default=None, validation_alias="application_description" + ) + env: Optional[famConstants.AppEnv] = Field( + validation_alias="app_environment", default=None + ) model_config = ConfigDict(from_attributes=True) @@ -240,8 +243,9 @@ class FamRoleDto(BaseModel): # Note, this "id" for role can either be concrete role's or abstract role's id. # In abstract role with this id, forest_clients should be present. id: int = Field(validation_alias="role_id") - name: Annotated[str, StringConstraints(max_length=100)] = \ - Field(validation_alias="role_name") + name: Annotated[str, StringConstraints(max_length=100)] = Field( + validation_alias="role_name" + ) type_code: famConstants.RoleType = Field(validation_alias="role_type_code") forest_clients: Optional[List[str]] = Field(default=None) @@ -266,3 +270,11 @@ class AdminUserAccessResponse(BaseModel): access: List[FamAuthGrantDto] model_config = ConfigDict(from_attributes=True) + + +# ------------------------------------- Forest Client API Integraion ---------------------------------------- # +class ForestClientIntegrationFindResponse(BaseModel): + clientNumber: str + clientName: str + clientStatusCode: str + clientTypeCode: str \ No newline at end of file diff --git a/server/admin_management/api/app/services/access_control_privilege_service.py b/server/admin_management/api/app/services/access_control_privilege_service.py index 25f783a8e..98d9ec96c 100644 --- a/server/admin_management/api/app/services/access_control_privilege_service.py +++ b/server/admin_management/api/app/services/access_control_privilege_service.py @@ -4,10 +4,18 @@ from api.app import constants as famConstants from api.app import schemas -from api.app.repositories.access_control_privilege_repository import \ - AccessControlPrivilegeRepository + +from api.app.integration.forest_client_integration import ForestClientIntegrationService +from api.app.repositories.access_control_privilege_repository import ( + AccessControlPrivilegeRepository, +) from api.app.services.role_service import RoleService from api.app.services.user_service import UserService +from api.app.services.validator.forest_client_validator import ( + forest_client_number_exists, + forest_client_active, + get_forest_client_status, +) from api.app.utils import utils from sqlalchemy.orm import Session @@ -76,9 +84,28 @@ def create_access_control_privilege_many( error_msg = "Invalid access control privilege request, missing forest client number." utils.raise_http_exception(HTTPStatus.BAD_REQUEST, error_msg) + forest_client_integration_service = ForestClientIntegrationService() for forest_client_number in request.forest_client_numbers: + # validate the forest client number + forest_client_validator_return = ( + forest_client_integration_service.find_by_client_number(forest_client_number) + ) + if not forest_client_number_exists(forest_client_validator_return): + error_msg = ( + "Invalid access control privilege request. " + + f"Forest client number {forest_client_number} does not exist." + ) + utils.raise_http_exception(HTTPStatus.BAD_REQUEST, error_msg) + + if not forest_client_active(forest_client_validator_return): + error_msg = ( + "Invalid access control privilege request. " + + f"Forest client number {forest_client_number} is not in active status: " + + f"{get_forest_client_status(forest_client_validator_return)}." + ) + utils.raise_http_exception(HTTPStatus.BAD_REQUEST, error_msg) + # Check if child role exists or add a new child role - # NOSONAR TODO: validate the forest client number child_role = self.role_service.find_or_create_forest_client_child_role( forest_client_number, fam_role, requester ) diff --git a/server/admin_management/api/app/services/validator/forest_client_validator.py b/server/admin_management/api/app/services/validator/forest_client_validator.py new file mode 100644 index 000000000..a48c1d554 --- /dev/null +++ b/server/admin_management/api/app/services/validator/forest_client_validator.py @@ -0,0 +1,38 @@ +import logging +from typing import List, Union + +from api.app.constants import FOREST_CLIENT_STATUS +from api.app.schemas import ForestClientIntegrationFindResponse + + +LOGGER = logging.getLogger(__name__) + + +def forest_client_number_exists( + forest_client_find_result: List[ForestClientIntegrationFindResponse], +) -> bool: + # Exact client number search - should only contain 1 result. + return len(forest_client_find_result) == 1 + + +def forest_client_active( + forest_client_find_result: List[ForestClientIntegrationFindResponse], +) -> bool: + return ( + ( + forest_client_find_result[0][FOREST_CLIENT_STATUS["KEY"]] + == FOREST_CLIENT_STATUS["CODE_ACTIVE"] + ) + if forest_client_number_exists(forest_client_find_result) + else False + ) + + +def get_forest_client_status( + forest_client_find_result: List[ForestClientIntegrationFindResponse], +) -> Union[str, None]: + return ( + forest_client_find_result[0][FOREST_CLIENT_STATUS["KEY"]] + if forest_client_number_exists(forest_client_find_result) + else None + ) diff --git a/server/admin_management/api/config/config.py b/server/admin_management/api/config/config.py index bdbbaf890..ebb0a96df 100644 --- a/server/admin_management/api/config/config.py +++ b/server/admin_management/api/config/config.py @@ -66,18 +66,18 @@ def get_aws_db_string(): username = secret_json.get("username") password = secret_json.get("password") - host = get_env_var('PG_HOST') - port = get_env_var('PG_PORT') - dbname = get_env_var('PG_DATABASE') + host = get_env_var("PG_HOST") + port = get_env_var("PG_PORT") + dbname = get_env_var("PG_DATABASE") return f"postgresql+psycopg2://{username}:{password}@{host}:{port}/{dbname}" def get_local_dev_db_string(): - username = get_env_var('POSTGRES_USER') - password = get_env_var('POSTGRES_PASSWORD') - host = get_env_var('POSTGRES_HOST') - port = get_env_var('POSTGRES_PORT') - dbname = get_env_var('POSTGRES_DB') + username = get_env_var("POSTGRES_USER") + password = get_env_var("POSTGRES_PASSWORD") + host = get_env_var("POSTGRES_HOST") + port = get_env_var("POSTGRES_PORT") + dbname = get_env_var("POSTGRES_DB") return f"postgresql+psycopg2://{username}:{password}@{host}:{port}/{dbname}" @@ -90,6 +90,7 @@ def get_aws_db_secret(): return client.get_secret_value(SecretId=secret_name) + def get_aws_region(): env_var = "COGNITO_REGION" return get_env_var(env_var) @@ -99,7 +100,6 @@ def get_aws_region(): def get_oidc_client_id(): - # Outside of AWS, you can set COGNITO_CLIENT_ID # Inside AWS, you have to get this value from an AWS Secret @@ -121,10 +121,27 @@ def get_oidc_client_id(): return _client_id + def get_user_pool_domain_name(): env_var = "COGNITO_USER_POOL_DOMAIN" return get_env_var(env_var) + def get_user_pool_id(): env_var = "COGNITO_USER_POOL_ID" - return get_env_var(env_var) \ No newline at end of file + return get_env_var(env_var) + + +def get_forest_client_api_token(): + api_token = get_env_var("FC_API_TOKEN") + return api_token + + +def get_forest_client_api_baseurl(): + forest_client_api_baseurl = ( + get_env_var("FC_API_BASE_URL") + if is_on_aws() + else "https://nr-forest-client-api-test.api.gov.bc.ca" + ) # Test env. + LOGGER.info(f"Using forest_client_api_baseurl -- {forest_client_api_baseurl}") + return forest_client_api_baseurl diff --git a/server/admin_management/local-dev.env b/server/admin_management/local-dev.env index 6e7d8954f..3c7334d23 100644 --- a/server/admin_management/local-dev.env +++ b/server/admin_management/local-dev.env @@ -7,3 +7,4 @@ COGNITO_REGION=ca-central-1 COGNITO_USER_POOL_ID=ca-central-1_p8X8GdjKW COGNITO_CLIENT_ID=6jfveou69mgford233or30hmta COGNITO_USER_POOL_DOMAIN=dev-fam-user-pool-domain +FC_API_TOKEN=thisisasecret \ No newline at end of file diff --git a/server/admin_management/tests/conftest.py b/server/admin_management/tests/conftest.py index fea6ecd4c..f2b926d32 100644 --- a/server/admin_management/tests/conftest.py +++ b/server/admin_management/tests/conftest.py @@ -20,12 +20,17 @@ from api.app.repositories.application_repository import ApplicationRepository from api.app.repositories.forest_client_repository import ForestClientRepository from api.app.repositories.application_admin_repository import ApplicationAdminRepository -from api.app.repositories.access_control_privilege_repository import AccessControlPrivilegeRepository +from api.app.repositories.access_control_privilege_repository import ( + AccessControlPrivilegeRepository, +) from api.app.services.user_service import UserService from api.app.services.role_service import RoleService from api.app.services.forest_client_service import ForestClientService from api.app.services.application_admin_service import ApplicationAdminService -from api.app.services.access_control_privilege_service import AccessControlPrivilegeService +from api.app.services.access_control_privilege_service import ( + AccessControlPrivilegeService, +) +from api.app.integration.forest_client_integration import ForestClientIntegrationService LOGGER = logging.getLogger(__name__) @@ -104,9 +109,9 @@ def test_rsa_key(): global public_rsa_key public_rsa_key = new_key.publickey().exportKey("PEM") - app.dependency_overrides[ - jwt_validation.get_rsa_key_method - ] = override_get_rsa_key_method + app.dependency_overrides[jwt_validation.get_rsa_key_method] = ( + override_get_rsa_key_method + ) return new_key.exportKey("PEM") @@ -118,9 +123,9 @@ def test_rsa_key_missing(): global public_rsa_key public_rsa_key = new_key.publickey().exportKey("PEM") - app.dependency_overrides[ - jwt_validation.get_rsa_key_method - ] = override_get_rsa_key_method_none + app.dependency_overrides[jwt_validation.get_rsa_key_method] = ( + override_get_rsa_key_method_none + ) return new_key.exportKey("PEM") @@ -146,42 +151,58 @@ def override_get_rsa_key_none(kid): def user_repo(db_pg_session: Session): return UserRepository(db_pg_session) + @pytest.fixture(scope="function") def role_repo(db_pg_session: Session): return RoleRepository(db_pg_session) + @pytest.fixture(scope="function") def application_repo(db_pg_session: Session): return ApplicationRepository(db_pg_session) + @pytest.fixture(scope="function") def forest_client_repo(db_pg_session: Session): return ForestClientRepository(db_pg_session) + @pytest.fixture(scope="function") def application_admin_repo(db_pg_session: Session): return ApplicationAdminRepository(db_pg_session) + @pytest.fixture(scope="function") def access_control_privilege_repo(db_pg_session: Session): return AccessControlPrivilegeRepository(db_pg_session) + @pytest.fixture(scope="function") def user_service(db_pg_session: Session): return UserService(db_pg_session) + @pytest.fixture(scope="function") def role_service(db_pg_session: Session): return RoleService(db_pg_session) + @pytest.fixture(scope="function") def forest_client_service(db_pg_session: Session): return ForestClientService(db_pg_session) + @pytest.fixture(scope="function") def application_admin_service(db_pg_session: Session): return ApplicationAdminService(db_pg_session) + @pytest.fixture(scope="function") def access_control_privilege_service(db_pg_session: Session): - return AccessControlPrivilegeService(db_pg_session) \ No newline at end of file + return AccessControlPrivilegeService(db_pg_session) + + +@pytest.fixture(scope="function") +def forest_client_integration_service(): + return ForestClientIntegrationService() + diff --git a/server/admin_management/tests/constants.py b/server/admin_management/tests/constants.py index c171619f7..7b689d9bf 100644 --- a/server/admin_management/tests/constants.py +++ b/server/admin_management/tests/constants.py @@ -42,7 +42,8 @@ # -------------------- test forest client data ---------------------- # TEST_NON_EXIST_FOREST_CLIENT_NUMBER = "99999999" TEST_INVALID_FOREST_CLIENT_NUMBER = "12345" -TEST_FOREST_CLIENT_NUMBER = "00000010" +TEST_INACTIVE_FOREST_CLIENT_NUMBER = "00000002" +TEST_FOREST_CLIENT_NUMBER = "00001011" TEST_FOREST_CLIENT_NUMBER_TWO = "00000001" TEST_FOERST_CLIENT_CREATE = schemas.FamForestClientCreateDto( **{ diff --git a/server/admin_management/tests/integration/test_forest_client_integration.py b/server/admin_management/tests/integration/test_forest_client_integration.py new file mode 100644 index 000000000..88614ff82 --- /dev/null +++ b/server/admin_management/tests/integration/test_forest_client_integration.py @@ -0,0 +1,52 @@ +import logging + +import pytest +from api.app.integration.forest_client_integration import ForestClientIntegrationService + +LOGGER = logging.getLogger(__name__) + + +class TestForestClientIntegrationServiceClass(object): + """ + Testing ForestClientIntegrationService class with real remote API calls (TEST environment). + Initially Forest Client API returns also "acronyms" field but it disappears + some day. Since this field is not important at the moment, so test does not + includ3e it. + """ + fc_api: ForestClientIntegrationService + example_expected_valid_result = { + 'clientNumber': '00000002', + 'clientName': 'PENDING S & R BILLING', + 'clientStatusCode': 'DAC', + 'clientTypeCode': 'G' + } + + def setup_class(self): + self.fc_api = ForestClientIntegrationService() + + def test_verify_init(self): + # Quick Verifying for init not empty + assert self.fc_api.api_base_url is not None + assert "/api/clients" in self.fc_api.api_clients_url + assert self.fc_api.API_TOKEN is not None + assert self.fc_api.headers["X-API-KEY"] == self.fc_api.API_TOKEN + + @pytest.mark.parametrize("client_id_to_test, expected_result", [ + ("0001011", []), # less than 8 digits. + ("99999999", []), # 8 digits - client not exist. + ("000001011", []) # more than 8 digits. + ]) + def test_find_by_client_number_not_exists_noresult(self, client_id_to_test, expected_result): + assert self.fc_api.find_by_client_number(client_id_to_test) == expected_result + + def test_find_by_client_number_exists_returns_list_one_item(self): + """ + The test validating API does return single client dictionary and + with the expected key(s) structure. + """ + client_id_to_test = self.example_expected_valid_result["clientNumber"] + result = self.fc_api.find_by_client_number(client_id_to_test) + assert len(result) == 1 + assert isinstance(result[0], dict) + result_fc_dict = result[0] + assert all(key in self.example_expected_valid_result.keys() for key in result_fc_dict.keys()) diff --git a/server/admin_management/tests/routers/test_router_access_control_privilege.py b/server/admin_management/tests/routers/test_router_access_control_privilege.py index ea890e9cb..e27fe490b 100644 --- a/server/admin_management/tests/routers/test_router_access_control_privilege.py +++ b/server/admin_management/tests/routers/test_router_access_control_privilege.py @@ -4,17 +4,21 @@ from api.app.main import apiPrefix from api.app.jwt_validation import ERROR_PERMISSION_REQUIRED -from api.app.routers.router_guards import ERROR_INVALID_ROLE_ID, ERROR_INVALID_APPLICATION_ID +from api.app.routers.router_guards import ( + ERROR_INVALID_ROLE_ID, + ERROR_INVALID_APPLICATION_ID, +) from tests.constants import ( TEST_FOREST_CLIENT_NUMBER, TEST_FOREST_CLIENT_NUMBER_TWO, + TEST_INACTIVE_FOREST_CLIENT_NUMBER, TEST_FOM_DEV_ADMIN_ROLE, TEST_NOT_EXIST_ROLE_ID, TEST_ACCESS_CONTROL_PRIVILEGE_CREATE_REQUEST, TEST_FAM_ADMIN_ROLE, TEST_NOT_EXIST_APPLICATION_ID, INVALID_APPLICATION_ID, - TEST_APPLICATION_ID_FOM_DEV + TEST_APPLICATION_ID_FOM_DEV, ) import tests.jwt_utils as jwt_utils @@ -85,6 +89,22 @@ def test_create_access_control_privilege_many( assert data[0].get("status_code") == HTTPStatus.CONFLICT assert data[1].get("status_code") == HTTPStatus.OK + # test create access control privilege with invalid forest client numbers + response = test_client_fixture.post( + f"{endPoint}", + json={ + **TEST_ACCESS_CONTROL_PRIVILEGE_CREATE_REQUEST, + "forest_client_numbers": [ + TEST_FOREST_CLIENT_NUMBER, + TEST_INACTIVE_FOREST_CLIENT_NUMBER, + ], + }, + headers=jwt_utils.headers(token), + ) + assert response.status_code == HTTPStatus.BAD_REQUEST + assert response.json() is not None + assert str(response.json()["detail"]).find("is not in active status") != -1 + def test_get_access_control_privileges_by_application_id( test_client_fixture: starlette.testclient.TestClient, test_rsa_key diff --git a/server/admin_management/tests/services/test_access_control_privilege_service.py b/server/admin_management/tests/services/test_access_control_privilege_service.py index 7ca3220cc..8a20f447f 100644 --- a/server/admin_management/tests/services/test_access_control_privilege_service.py +++ b/server/admin_management/tests/services/test_access_control_privilege_service.py @@ -25,6 +25,8 @@ TEST_FOREST_CLIENT_NUMBER, TEST_FOREST_CLIENT_NUMBER_TWO, TEST_NOT_EXIST_ROLE_ID, + TEST_NON_EXIST_FOREST_CLIENT_NUMBER, + TEST_INACTIVE_FOREST_CLIENT_NUMBER, ) @@ -246,3 +248,72 @@ def test_create_access_control_privilege_many_invalid_role_id( ) error_msg = f"Role id {TEST_NOT_EXIST_ROLE_ID} does not exist." assert str(e._excinfo).find(error_msg) != -1 + + +def test_create_access_control_privilege_many_invalid_forest_client( + access_control_privilege_service: AccessControlPrivilegeService, +): + with pytest.raises(HTTPException) as e: + access_control_privilege_service.create_access_control_privilege_many( + schemas.FamAccessControlPrivilegeCreateRequest( + **{ + **TEST_ACCESS_CONTROL_PRIVILEGE_CREATE_REQUEST, + "forest_client_numbers": [TEST_NON_EXIST_FOREST_CLIENT_NUMBER], + } + ), + TEST_CREATOR, + ) + assert ( + str(e._excinfo).find( + f"Forest client number {TEST_NON_EXIST_FOREST_CLIENT_NUMBER} does not exist" + ) + != -1 + ) + + +def test_create_access_control_privilege_many_inactive_forest_client( + access_control_privilege_service: AccessControlPrivilegeService, +): + with pytest.raises(HTTPException) as e: + access_control_privilege_service.create_access_control_privilege_many( + schemas.FamAccessControlPrivilegeCreateRequest( + **{ + **TEST_ACCESS_CONTROL_PRIVILEGE_CREATE_REQUEST, + "forest_client_numbers": [TEST_INACTIVE_FOREST_CLIENT_NUMBER], + } + ), + TEST_CREATOR, + ) + assert ( + str(e._excinfo).find( + f"Forest client number {TEST_INACTIVE_FOREST_CLIENT_NUMBER} is not in active status" + ) + != -1 + ) + + +def test_create_access_control_privilege_many_active_and_inactive_forest_client( + access_control_privilege_service: AccessControlPrivilegeService, +): + # test create access control privilege for abstract parent role with 3 forest client numbers, + # one is inactive, one is active, one is invalid + with pytest.raises(HTTPException) as e: + access_control_privilege_service.create_access_control_privilege_many( + schemas.FamAccessControlPrivilegeCreateRequest( + **{ + **TEST_ACCESS_CONTROL_PRIVILEGE_CREATE_REQUEST, + "forest_client_numbers": [ + TEST_FOREST_CLIENT_NUMBER, + TEST_INACTIVE_FOREST_CLIENT_NUMBER, + TEST_NON_EXIST_FOREST_CLIENT_NUMBER, + ], + } + ), + TEST_CREATOR, + ) + assert ( + str(e._excinfo).find( + f"Forest client number {TEST_INACTIVE_FOREST_CLIENT_NUMBER} is not in active status" + ) + != -1 + ) diff --git a/server/admin_management/tests/services/test_forest_client_validator.py b/server/admin_management/tests/services/test_forest_client_validator.py new file mode 100644 index 000000000..2492b6afb --- /dev/null +++ b/server/admin_management/tests/services/test_forest_client_validator.py @@ -0,0 +1,109 @@ +import logging + +from api.app.constants import FOREST_CLIENT_STATUS +from api.app.integration.forest_client_integration import ForestClientIntegrationService +from api.app.services.validator.forest_client_validator import ( + forest_client_number_exists, + forest_client_active, + get_forest_client_status, +) +from tests.constants import ( + TEST_FOREST_CLIENT_NUMBER, + TEST_NON_EXIST_FOREST_CLIENT_NUMBER, + TEST_INACTIVE_FOREST_CLIENT_NUMBER, +) + +LOGGER = logging.getLogger(__name__) + + +def test_forest_client_number_exists( + forest_client_integration_service: ForestClientIntegrationService, +): + # find active forest client number + forest_client_validator_return = ( + forest_client_integration_service.find_by_client_number( + TEST_FOREST_CLIENT_NUMBER + ) + ) + # test forest_client_number_exists return true + assert forest_client_number_exists(forest_client_validator_return) is True + + # find inactive forest client number + forest_client_validator_return = ( + forest_client_integration_service.find_by_client_number( + TEST_INACTIVE_FOREST_CLIENT_NUMBER + ) + ) + # test forest_client_number_exists return true + assert forest_client_number_exists(forest_client_validator_return) is True + + # find non exist forest client number + forest_client_validator_return = ( + forest_client_integration_service.find_by_client_number( + TEST_NON_EXIST_FOREST_CLIENT_NUMBER + ) + ) + # test forest_client_number_exists return false + assert forest_client_number_exists(forest_client_validator_return) is False + + +def test_forest_client_active(forest_client_integration_service: ForestClientIntegrationService): + # find active forest client number + forest_client_validator_return = ( + forest_client_integration_service.find_by_client_number( + TEST_FOREST_CLIENT_NUMBER + ) + ) + # test forest_client_active return true + assert forest_client_active(forest_client_validator_return) is True + + # find inactive forest client number + forest_client_validator_return = ( + forest_client_integration_service.find_by_client_number( + TEST_INACTIVE_FOREST_CLIENT_NUMBER + ) + ) + # test forest_client_active return false + assert forest_client_active(forest_client_validator_return) is False + + # find non exist forest client number + forest_client_validator_return = ( + forest_client_integration_service.find_by_client_number( + TEST_NON_EXIST_FOREST_CLIENT_NUMBER + ) + ) + # test forest_client_active return false + assert forest_client_active(forest_client_validator_return) is False + + +def test_get_forest_client_status( + forest_client_integration_service: ForestClientIntegrationService, +): + # find active forest client number + forest_client_validator_return = ( + forest_client_integration_service.find_by_client_number( + TEST_FOREST_CLIENT_NUMBER + ) + ) + # test get_forest_client_status return forest client information + assert get_forest_client_status( + forest_client_validator_return + ) == FOREST_CLIENT_STATUS.get("CODE_ACTIVE") + + # find inactive forest client number + forest_client_validator_return = ( + forest_client_integration_service.find_by_client_number( + TEST_INACTIVE_FOREST_CLIENT_NUMBER + ) + ) + # test get_forest_client_status return forest client information + assert get_forest_client_status(forest_client_validator_return) == "DAC" + + # find non exist forest client number + forest_client_validator_return = ( + forest_client_integration_service.find_by_client_number( + TEST_NON_EXIST_FOREST_CLIENT_NUMBER + ) + ) + # test get_forest_client_status return None + assert get_forest_client_status(forest_client_validator_return) is None