diff --git a/API/user/QueryDashboard.py b/API/user/QueryDashboard.py index d7a2cf0..af8e483 100644 --- a/API/user/QueryDashboard.py +++ b/API/user/QueryDashboard.py @@ -69,13 +69,15 @@ def get(self): if not dashboard: return gettext('Forbidden'), 403 # Explicitely vague for security purpose - dashboard_sites_ids = DashDashboardSites.get_sites_ids_for_dashboard(dashboard.id_dashboard) + dashboard_sites_ids = DashDashboardSites.get_sites_ids_for_dashboard( + dashboard.id_dashboard, enabled_only=request_args['enabled']) if dashboard_sites_ids: # Check that we have a match for at least one site if len(set(accessible_site_ids).intersection(dashboard_sites_ids)) == 0: return gettext('Forbidden'), 403 - dashboard_proj_ids = DashDashboardProjects.get_projects_ids_for_dashboard(dashboard.id_dashboard) + dashboard_proj_ids = DashDashboardProjects.get_projects_ids_for_dashboard( + dashboard.id_dashboard, enabled_only=request_args['enabled']) if dashboard_proj_ids: # Check that we have a match for at least one project if len(set(accessible_project_ids).intersection(dashboard_proj_ids)) == 0: @@ -105,7 +107,7 @@ def get(self): return gettext('Forbidden'), 403 dashboards = DashDashboards.get_dashboards_globals() else: - return gettext('Must specify at least one id parameter or "globals"') + return gettext('Must specify at least one id parameter or "globals"'), 400 # # Convert to json and return dashboards_json = [dash.to_json(minimal=request_args['list'], latest=not request_args['all_versions']) diff --git a/ConfigManager.py b/ConfigManager.py index 2c96ee2..5f2cf9a 100644 --- a/ConfigManager.py +++ b/ConfigManager.py @@ -39,10 +39,10 @@ def create_defaults(self): # Default backend configuration self.backend_config['hostname'] = '127.0.0.1' - self.backend_config['port'] = 40100 + self.backend_config['port'] = 40075 # Default redis configuration - self.redis_config['hostname'] = 'db' + self.redis_config['hostname'] = '127.0.0.1' self.redis_config['port'] = 6379 self.redis_config['username'] = '' self.redis_config['password'] = '' diff --git a/FlaskModule.py b/FlaskModule.py index f389f5c..33206f0 100644 --- a/FlaskModule.py +++ b/FlaskModule.py @@ -256,7 +256,6 @@ def init_views(self): flask_app.add_url_rule('/dashboards', view_func=DashboardsIndex.as_view('dashboards', *args, **kwargs)) - @flask_app.errorhandler(404) def page_not_found(e): print(e) diff --git a/tests/BaseDashboardsAPITest.py b/tests/BaseDashboardsAPITest.py new file mode 100644 index 0000000..ffff7b3 --- /dev/null +++ b/tests/BaseDashboardsAPITest.py @@ -0,0 +1,99 @@ +import unittest +from ConfigManager import ConfigManager +import Globals as Globals +from flask.testing import FlaskClient +import random +from string import digits, ascii_lowercase, ascii_uppercase +from tests.FakeDashboardsService import FakeDashboardsService + + +class BaseDashboardsAPITest(unittest.TestCase): + _config = None + _service = None + _db_man = None + test_endpoint = '' + user_token_key = ''.join(random.choice(digits + ascii_lowercase + ascii_uppercase) for _ in range(36)) + participant_token_key = ''.join(random.choice(digits + ascii_lowercase + ascii_uppercase) for _ in range(36)) + service_token_key = ''.join(random.choice(digits + ascii_lowercase + ascii_uppercase) for _ in range(36)) + device_token_key = ''.join(random.choice(digits + ascii_lowercase + ascii_uppercase) for _ in range(36)) + user_login_endpoint = '/api/user/login' + user_logout_endpoint = '/api/user/logout' + + @classmethod + def setUpClass(cls): + cls._config = BaseDashboardsAPITest.getConfig() + # Instance of Fake service API will create a new flask_app + cls._service = FakeDashboardsService() + cls._users = cls._service.users + # API Need this variable to be set + Globals.service = cls._service + from libDashboards.db.DBManager import DBManager + cls._db_man: DBManager = DBManager(app=cls._service.flask_app, test=True) + # Cheating using same db as FakeService + cls._db_man.db = cls._service.db_manager.db + + with cls._service.flask_app.app_context(): + cls._db_man.create_defaults(test=True) + + def app_context(self): + self.assertIsNotNone(self._service) + self.assertIsNotNone(self._service.flask_app) + return self._service.flask_app.app_context() + + @classmethod + def tearDownClass(cls): + with cls._service.flask_app.app_context(): + cls._db_man.db.session.remove() + + @classmethod + def getConfig(cls) -> ConfigManager: + config = ConfigManager() + config.create_defaults() + return config + + def setUp(self): + self.assertIsNotNone(self._service) + self.assertIsNotNone(self._service.flask_app) + self.test_client = self._service.flask_app.test_client() + + def tearDown(self): + with self.app_context(): + # Make sure pending queries are rollbacked. + self._db_man.db.session.rollback() + + def _get_with_service_token_auth(self, client: FlaskClient, token=None, params=None, endpoint=None): + if params is None: + params = {} + if endpoint is None: + endpoint = self.test_endpoint + if token is not None: + headers = {'Authorization': 'OpenTera ' + token} + else: + headers = {} + + return client.get(endpoint, headers=headers, query_string=params) + + def _get_with_user_token_auth(self, client: FlaskClient, token: str = '', params=None, endpoint=None): + if params is None: + params = {} + if endpoint is None: + endpoint = self.test_endpoint + headers = {'Authorization': 'OpenTera ' + token} + return client.get(endpoint, headers=headers, query_string=params) + + def _post_with_user_token_auth(self, client: FlaskClient, token: str = '', json: dict = None, params: dict = None, + endpoint=None): + if params is None: + params = {} + if endpoint is None: + endpoint = self.test_endpoint + headers = {'Authorization': 'OpenTera ' + token} + return client.post(endpoint, headers=headers, query_string=params, json=json) + + def _delete_with_user_token_auth(self, client: FlaskClient, token: str = '', params=None, endpoint=None): + if params is None: + params = {} + if endpoint is None: + endpoint = self.test_endpoint + headers = {'Authorization': 'OpenTera ' + token} + return client.delete(endpoint, headers=headers, query_string=params) diff --git a/tests/FakeDashboardsService.py b/tests/FakeDashboardsService.py new file mode 100644 index 0000000..43f9bc0 --- /dev/null +++ b/tests/FakeDashboardsService.py @@ -0,0 +1,211 @@ +from FlaskModule import CustomAPI, authorizations +import requests +from requests.auth import _basic_auth_str +from ConfigManager import ConfigManager +from libDashboards.db.DBManager import DBManager +from opentera.modules.BaseModule import BaseModule +from opentera.services.ServiceOpenTera import ServiceOpenTera +from opentera.redis.RedisVars import RedisVars +from opentera.services.ServiceAccessManager import ServiceAccessManager + +from flask import Flask +from flask_babel import Babel +import redis +import uuid + + +class FakeFlaskModule(BaseModule): + def __init__(self, config: ConfigManager, flask_app): + BaseModule.__init__(self, 'FakeFlaskModule', config) + + # Will allow for user api to work + self.config.server_config = {'hostname': '127.0.0.1', 'port': 40075} + + self.flask_app = flask_app + self.api = CustomAPI(self.flask_app, version='1.0.0', title='DashboardsService API', + description='FakeDashboardsService API Documentation', doc='/doc', prefix='/api', + authorizations=authorizations) + + self.babel = Babel(self.flask_app) + + flask_app.debug = False + flask_app.testing = True + flask_app.secret_key = str(uuid.uuid4()) # Normally service UUID + flask_app.config.update({'SESSION_TYPE': 'redis'}) + redis_url = redis.from_url('redis://%(username)s:%(password)s@%(hostname)s:%(port)s/%(db)s' + % self.config.redis_config) + + flask_app.config.update({'SESSION_REDIS': redis_url}) + flask_app.config.update({'BABEL_DEFAULT_LOCALE': 'fr'}) + flask_app.config.update({'SESSION_COOKIE_SECURE': True}) + self.recrutement_api_namespace = self.api.namespace('', description='RecrutementService API') + + self.setup_fake_recrutement_api(flask_app) + + def setup_fake_recrutement_api(self, flask_app): + from FlaskModule import FlaskModule + with flask_app.app_context(): + # Setup Fake Service API + kwargs = {'flaskModule': self, + 'test': True} + FlaskModule.init_user_api(self, None, self.recrutement_api_namespace, kwargs) + + +class FakeDashboardsService(ServiceOpenTera): + """ + The only thing we want here is a way to simulate communication with the base server. + We will simulate the service API with the database. + """ + service_token = str() + + def __init__(self, db=None): + self.flask_app = Flask('FakeDashboardsService') + + # OpenTera server informations + self.backend_hostname = '127.0.0.1' + self.backend_port = 40075 + self.server_url = 'https://' + self.backend_hostname + ':' + str(self.backend_port) + + self.config_man = ConfigManager() + self.config_man.create_defaults() + import Globals + Globals.service = self + self.db_manager = DBManager(self.flask_app, test=True) + # Cheating on db (reusing already opened from test) + if db is not None: + self.db_manager.db = db + self.db_manager.create_defaults(test=True) + else: + self.db_manager.open_local({}, echo=False) + + self.test_client = self.flask_app.test_client() + + print('Resetting OpenTera database') + self.reset_opentera_test_db() + + # Create service on OpenTera (using user API) + print('Creating service : DashboardsService') + json_data = { + 'service': { + "id_service": 0, + "service_clientendpoint": "/dashboards", + "service_enabled": True, + "service_endpoint": "/", + "service_hostname": "127.0.0.1", + "service_name": "Dashboard Services", + "service_port": 5055, + "service_key": "DashboardsService", + "service_endpoint_participant": "/participant", + "service_endpoint_user": "/user", + "service_endpoint_device": "/device" + } + } + r = self.post_to_opentera_as_user('/api/user/services', json_data, 'admin', 'admin') + if r.status_code != 200: + print('Error creating service') + exit(1) + + with self.flask_app.app_context(): + # Update redis vars and basic token + self.setup_service_access_manager() + + # Get service UUID + service_info = self.redisGet(RedisVars.RedisVar_ServicePrefixKey + + Globals.config_man.service_config['name']) + + import json + service_info = json.loads(service_info) + if 'service_uuid' not in service_info: + exit(1) + + Globals.config_man.service_config['ServiceUUID'] = service_info['service_uuid'] + + # Redis variables & db must be initialized before + ServiceOpenTera.__init__(self, self.config_man, service_info) + + # Will contain list of users by service role: super_admin, admin, manager, user and no access + self.users = {} + self.init_service() + + # Setup modules + self.flask_module = FakeFlaskModule(self.config_man, self.flask_app) + + def init_service(self): + print('Initializing service...') + # Get users tokens + response = self.get_from_opentera_as_user('/api/user/login', {}, 'admin', 'admin') + if response.status_code != 200: + print("Unable to query super admin token") + exit(1) + + self.users['superadmin'] = response.json()['user_token'] + + response = self.get_from_opentera_as_user('/api/user/login', {}, 'user4', 'user4') + if response.status_code != 200: + print("Unable to query no access user token") + exit(1) + + self.users['noaccess'] = response.json()['user_token'] + + response = self.get_from_opentera_as_user('/api/user/login', {}, 'user', 'user') + if response.status_code != 200: + print("Unable to query user token") + exit(1) + + self.users['user'] = response.json()['user_token'] + + response = self.get_from_opentera_as_user('/api/user/login', {}, 'user3', 'user3') + if response.status_code != 200: + print("Unable to query manager user token") + exit(1) + + self.users['projectadmin'] = response.json()['user_token'] + + response = self.get_from_opentera_as_user('/api/user/login', {}, 'siteadmin', 'siteadmin') + if response.status_code != 200: + print("Unable to query admin user token") + exit(1) + + self.users['siteadmin'] = response.json()['user_token'] + + def setup_service_access_manager(self): + self.redis = redis.Redis(host=self.config_man.redis_config['hostname'], + port=self.config_man.redis_config['port'], + db=self.config_man.redis_config['db'], + username=self.config_man.redis_config['username'], + password=self.config_man.redis_config['password'], + client_name=self.__class__.__name__) + + # Initialize service from redis + # User token key (dynamic) + ServiceAccessManager.api_user_token_key = 'test_api_user_token_key' + self.redis.set(RedisVars.RedisVar_UserTokenAPIKey, + ServiceAccessManager.api_user_token_key) + + # Participant token key (dynamic) + ServiceAccessManager.api_participant_token_key = 'test_api_participant_token_key' + self.redis.set(RedisVars.RedisVar_ParticipantTokenAPIKey, + ServiceAccessManager.api_participant_token_key) + + # Service Token Key (dynamic) + ServiceAccessManager.api_service_token_key = 'test_api_service_token_key' + self.redis.set(RedisVars.RedisVar_ServiceTokenAPIKey, ServiceAccessManager.api_service_token_key) + ServiceAccessManager.config_man = self.config_man + + def post_to_opentera_as_user(self, api_url: str, json_data: dict, username: str, password: str) \ + -> requests.Response: + # Synchronous call to OpenTera + headers = {'Authorization': _basic_auth_str(username, password)} + answer = requests.post(self.server_url + api_url, headers=headers, json=json_data, verify=False) + return answer + + def get_from_opentera_as_user(self, api_url: str, params: dict, username: str, password: str) \ + -> requests.Response: + # Synchronous call to OpenTera + headers = {'Authorization': _basic_auth_str(username, password)} + answer = requests.get(self.server_url + api_url, headers=headers, params=params, verify=False) + return answer + + def reset_opentera_test_db(self): + return requests.get(self.server_url + '/api/test/database/reset', verify=False) + diff --git a/tests/api/user/test_QueryDashboard.py b/tests/api/user/test_QueryDashboard.py new file mode 100644 index 0000000..d140729 --- /dev/null +++ b/tests/api/user/test_QueryDashboard.py @@ -0,0 +1,171 @@ +from tests.BaseDashboardsAPITest import BaseDashboardsAPITest +from libDashboards.db.models import DashDashboards + + +class QueryDashboardTest(BaseDashboardsAPITest): + test_endpoint = '/api/dashboards' + + def setUp(self): + super().setUp() + + # Load list of dashboards to query / test + self._dashboards = {} + with self.app_context(): + dashboards = DashDashboards.query.all() + self._dashboards["site"] = {'id': dashboards[0].id_dashboard, 'uuid': dashboards[0].dashboard_uuid} + self._dashboards["project_global"] = {'id': dashboards[1].id_dashboard, + 'uuid': dashboards[1].dashboard_uuid + } + self._dashboards["project_alert"] = {'id': dashboards[2].id_dashboard, 'uuid': dashboards[2].dashboard_uuid} + self._dashboards["global"] = {'id': dashboards[3].id_dashboard, 'uuid': dashboards[3].dashboard_uuid} + + def tearDown(self): + super().tearDown() + + def test_get_with_invalid_token(self): + with self.app_context(): + response = self._get_with_user_token_auth(self.test_client, token="invalid") + self.assertEqual(403, response.status_code) + + def test_get_no_params(self): + with self.app_context(): + for user in self._users: + response = self._get_with_user_token_auth(self.test_client, token=self._users[user]) + self.assertEqual(400, response.status_code) + + def test_get_with_bad_id(self): + with self.app_context(): + for user in self._users: + response = self._get_with_user_token_auth(self.test_client, token=self._users[user], + params={'id_dashboard': -1}) + self.assertEqual(403, response.status_code) + + def test_get_with_bad_uuid(self): + with self.app_context(): + for user in self._users: + response = self._get_with_user_token_auth(self.test_client, token=self._users[user], + params={'uuid': "0000-0000-0000-0000"}) + self.assertEqual(403, response.status_code) + + def test_get_site_dashboard_by_id(self): + with self.app_context(): + for user in self._users: + response = self._get_with_user_token_auth(self.test_client, token=self._users[user], + params={'id_dashboard': self._dashboards["site"]["id"]}) + if user == "noaccess": + self.assertEqual(403, response.status_code) + else: + self.assertEqual(200, response.status_code) + self.assertEqual(1, len(response.json)) + self._check_json(response.json[0]) + self.assertEqual(self._dashboards["site"]["id"], response.json[0]['id_dashboard']) + + def test_get_site_dashboard_by_uuid(self): + with self.app_context(): + for user in self._users: + response = self._get_with_user_token_auth(self.test_client, token=self._users[user], + params={'uuid': self._dashboards["site"]["uuid"]}) + if user == "noaccess": + self.assertEqual(403, response.status_code) + else: + self.assertEqual(200, response.status_code) + self.assertEqual(1, len(response.json)) + self._check_json(response.json[0]) + self.assertEqual(self._dashboards["site"]["id"], response.json[0]['id_dashboard']) + + def test_get_project_dashboard_by_id(self): + with self.app_context(): + for user in self._users: + response = self._get_with_user_token_auth(self.test_client, token=self._users[user], + params={'id_dashboard': + self._dashboards["project_global"]["id"]}) + if user == "noaccess" or user == "projectadmin": # Project admin has access to projet 1, but not 2. + self.assertEqual(403, response.status_code) + else: + self.assertEqual(200, response.status_code) + self.assertEqual(1, len(response.json)) + self._check_json(response.json[0]) + self.assertEqual(self._dashboards["project_global"]["id"], response.json[0]['id_dashboard']) + + # Check with enabled = false + response = self._get_with_user_token_auth(self.test_client, token=self._users[user], + params={'id_dashboard': + self._dashboards["project_global"]["id"], + 'enabled': False}) + if user == "noaccess": # Project admin has access to projet 1, but not 2. + self.assertEqual(403, response.status_code) + else: + self.assertEqual(200, response.status_code) + self.assertEqual(1, len(response.json)) + self._check_json(response.json[0]) + self.assertEqual(self._dashboards["project_global"]["id"], response.json[0]['id_dashboard']) + + def test_get_project_dashboard_by_uuid(self): + with self.app_context(): + for user in self._users: + response = self._get_with_user_token_auth(self.test_client, token=self._users[user], + params={'uuid': + self._dashboards["project_alert"]["uuid"]}) + if user == "noaccess": # Project admin has access to projet 1, but not 2. + self.assertEqual(403, response.status_code) + else: + self.assertEqual(200, response.status_code) + self.assertEqual(1, len(response.json)) + self._check_json(response.json[0]) + self.assertEqual(self._dashboards["project_alert"]["id"], response.json[0]['id_dashboard']) + + # Check with enabled = false + response = self._get_with_user_token_auth(self.test_client, token=self._users[user], + params={'uuid': + self._dashboards["project_alert"]["uuid"], + 'enabled': False}) + if user == "noaccess": # Project admin has access to projet 1, but not 2. + self.assertEqual(403, response.status_code) + else: + self.assertEqual(200, response.status_code) + self.assertEqual(1, len(response.json)) + self._check_json(response.json[0]) + self.assertEqual(self._dashboards["project_alert"]["id"], response.json[0]['id_dashboard']) + + def test_get_by_site_id(self): + for user in self._users: + response = self._get_with_user_token_auth(self.test_client, token=self._users[user], + params={'id_site': 1}) + if user == "noaccess": # Project admin has access to projet 1, but not 2. + self.assertEqual(403, response.status_code) + else: + self.assertEqual(200, response.status_code) + self.assertEqual(1, len(response.json)) + self._check_json(response.json[0]) + self.assertEqual(self._dashboards["site"]["id"], response.json[0]['id_dashboard']) + + def test_get_by_project_id(self): + for user in self._users: + response = self._get_with_user_token_auth(self.test_client, token=self._users[user], + params={'id_project': 1}) + if user == "noaccess": + self.assertEqual(403, response.status_code) + else: + self.assertEqual(200, response.status_code) + self.assertEqual(1, len(response.json)) + self._check_json(response.json[0]) + self.assertEqual(self._dashboards["project_alert"]["id"], response.json[0]['id_dashboard']) + + # Check with enabled = false + response = self._get_with_user_token_auth(self.test_client, token=self._users[user], + params={'id_project': 1, 'enabled': False}) + if user == "noaccess": + self.assertEqual(403, response.status_code) + else: + self.assertEqual(200, response.status_code) + self.assertEqual(2, len(response.json)) + + def _check_json(self, json_data: str, minimal=False): + self.assertTrue(json_data.__contains__('id_dashboard')) + self.assertTrue(json_data.__contains__('dashboard_uuid')) + self.assertTrue(json_data.__contains__('dashboard_name')) + self.assertTrue(json_data.__contains__('dashboard_description')) + if not minimal: + self.assertTrue(json_data.__contains__('versions')) + else: + self.assertFalse(json_data.__contains__('versions')) diff --git a/tests/test_docker_test_setup.py b/tests/test_docker_test_setup.py index 8011c94..c4383c8 100644 --- a/tests/test_docker_test_setup.py +++ b/tests/test_docker_test_setup.py @@ -5,6 +5,7 @@ import time import sys + class DockerTestSetupTest(unittest.TestCase): SERVER_URL = 'https://localhost:40100' @@ -24,7 +25,6 @@ def setUp(self): time.sleep(10) print('Starting tests...') - def tearDown(self): # Get directory of this file dir_path = os.path.dirname(os.path.realpath(__file__)) @@ -45,5 +45,6 @@ def test_reset_db_endpoint(self): response = requests.get(self.SERVER_URL + '/api/test/database/reset', verify=False) self.assertEqual(response.status_code, 200) + if __name__ == '__main__': unittest.main()