From 61350402a50771b33612be72d13d045f993a3504 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Brunner?= Date: Thu, 4 Jul 2024 14:13:03 +0200 Subject: [PATCH] Stricter access --- tilecloud_chain/views/admin.py | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/tilecloud_chain/views/admin.py b/tilecloud_chain/views/admin.py index 228b02be2..ead9cbb5e 100644 --- a/tilecloud_chain/views/admin.py +++ b/tilecloud_chain/views/admin.py @@ -71,32 +71,40 @@ def __init__(self, request: pyramid.request.Request): else None ) + def _check_access(self, rase_on_no_access: bool = True) -> tuple[bool, tilecloud_chain.DatedConfig]: + assert self.gene + config = self.gene.get_host_config(self.request.host) + has_access = self.request.has_permission("admin", config.config.get("authentication", {})) + if not has_access and rase_on_no_access: + raise pyramid.httpexceptions.HTTPForbidden() + return has_access, config + @view_config(route_name="admin", renderer="tilecloud_chain:templates/admin_index.html") # type: ignore @view_config(route_name="admin_slash", renderer="tilecloud_chain:templates/admin_index.html") # type: ignore def index(self) -> dict[str, Any]: """Get the admin index page.""" assert self.gene - config = self.gene.get_host_config(self.request.host) + has_access, config = self._check_access(False) server_config = config.config.get("server", {}) main_config = self.gene.get_main_config() main_server_config = main_config.config.get("server", {}) jobs_status = None queue_store = main_config.config.get("queue_store", configuration.QUEUE_STORE_DEFAULT) - if queue_store == "postgresql": + if queue_store == "postgresql" and has_access: assert self.postgresql_queue_store is not None config_filename = self.gene.get_host_config_file(self.request.host) assert config_filename is not None jobs_status = self.postgresql_queue_store.get_status(config_filename) return { "auth_type": auth_type(self.request.registry.settings), - "has_access": self.request.has_permission("admin", config.config.get("authentication", {})), + "has_access": has_access, "commands": server_config.get("predefined_commands", []), "status": get_status(self.gene) if queue_store != "postgresql" else None, "admin_path": main_server_config.get("admin_path", "admin"), "AuthenticationType": AuthenticationType, "jobs_status": jobs_status, - "footer": main_server_config.get("admin_footer"), + "footer": main_server_config.get("admin_footer") if has_access else None, "footer_classes": main_server_config.get("admin_footer_classes", ""), } @@ -107,6 +115,7 @@ def run(self) -> pyramid.response.Response: if "TEST_USER" not in os.environ: auth_view(self.request) + self._check_access() if "command" not in self.request.POST: self.request.response.status_code = 400 @@ -210,6 +219,7 @@ def create_job(self) -> dict[str, Any]: if "TEST_USER" not in os.environ: auth_view(self.request) + self._check_access() store = self.postgresql_queue_store assert store is not None @@ -241,6 +251,7 @@ def cancel_job(self) -> dict[str, Any]: if "TEST_USER" not in os.environ: auth_view(self.request) + self._check_access() store = self.postgresql_queue_store assert store is not None @@ -269,6 +280,7 @@ def retry_job(self) -> dict[str, Any]: if "TEST_USER" not in os.environ: auth_view(self.request) + self._check_access() store = self.postgresql_queue_store assert store is not None