diff --git a/geoportal/tests/functional/__init__.py b/geoportal/tests/functional/__init__.py index 1c20f4a195..855e0dbf1e 100644 --- a/geoportal/tests/functional/__init__.py +++ b/geoportal/tests/functional/__init__.py @@ -32,22 +32,25 @@ import logging from configparser import ConfigParser -from typing import Any +from typing import TYPE_CHECKING, Any import pyramid.registry import pyramid.request -import sqlalchemy.exc import tests import transaction import webob.acceptparse from c2c.template.config import config as configuration from pyramid import testing +from sqlalchemy.orm.session import Session import c2cgeoportal_geoportal import c2cgeoportal_geoportal.lib from c2cgeoportal_commons import models from c2cgeoportal_geoportal.lib import caching +if TYPE_CHECKING: + from c2cgeoportal_commons.models import main, static + _LOG = logging.getLogger(__name__) mapserv_url = "http://mapserver:8080/" config = None @@ -76,27 +79,27 @@ def cleanup_db() -> None: ) from c2cgeoportal_commons.models.static import OAuth2Client, Shorturl, User - try: - transaction.commit() - except sqlalchemy.exc.InvalidRequestError: - models.DBSession.rollback() - - for ra in models.DBSession.query(RestrictionArea).all(): - ra.roles = [] - models.DBSession.delete(ra) - for ti in models.DBSession.query(TreeItem).all(): - models.DBSession.delete(ti) - models.DBSession.query(OGCServer).delete() - models.DBSession.query(Interface).delete() - for r in models.DBSession.query(Role).all(): - r.functionalities = [] - models.DBSession.delete(r) - models.DBSession.query(User).delete() - models.DBSession.query(Functionality).delete() - models.DBSession.query(FullTextSearch).delete() - models.DBSession.query(Shorturl).delete() - models.DBSession.query(OAuth2Client).delete() - transaction.commit() + with models.DBSession() as session: + for ra in session.query(RestrictionArea).all(): + ra.roles = [] + session.delete(ra) + for ti in session.query(TreeItem).all(): + session.delete(ti) + session.query(OGCServer).delete() + session.query(Interface).delete() + for r in session.query(Role).all(): + r.functionalities = [] + session.delete(r) + session.query(User).delete() + session.query(Functionality).delete() + session.query(FullTextSearch).delete() + session.query(Shorturl).delete() + session.query(OAuth2Client).delete() + try: + transaction.commit() + except Exception as e: + _LOG.error(e) + transaction.abort() def setup_db() -> None: @@ -108,9 +111,13 @@ def setup_db() -> None: from c2cgeoportal_commons.models import DBSession from c2cgeoportal_commons.models.main import Role - DBSession.add_all([Role(name) for name in ("anonymous", "registered", "intranet")]) + with DBSession() as session: + for role_name in ("anonymous", "registered", "intranet"): + role = session.query(Role).filter(Role.name == role_name).one_or_none() + if role is None: + session.add(Role(name=role_name)) - transaction.commit() + transaction.commit() c2cgeoportal_geoportal.lib.ogc_server_wms_url_ids = None c2cgeoportal_geoportal.lib.ogc_server_wfs_url_ids = None @@ -170,21 +177,21 @@ def teardown_common() -> None: models.DBSessions = {} -def create_default_ogcserver(): - from c2cgeoportal_commons.models import DBSession +def create_default_ogcserver(session: Session) -> "main.OGCServer": from c2cgeoportal_commons.models.main import OGCServer - transaction.commit() - ogcserver = OGCServer(name="__test_ogc_server") + ogcserver = session.query(OGCServer).filter(OGCServer.name == "__test_ogc_server").one_or_none() + if ogcserver is None: + ogcserver = OGCServer(name="__test_ogc_server") ogcserver.url = mapserv_url - DBSession.add(ogcserver) - transaction.commit() + session.add(ogcserver) caching.invalidate_region() + session.flush() return ogcserver -def _get_user(username: str): +def _get_user(username: str) -> "static.User": from c2cgeoportal_commons.models import DBSession from c2cgeoportal_commons.models.static import User @@ -285,12 +292,10 @@ def create_dummy_request( return request -def fill_tech_user_functionality(name, functionalities): - from c2cgeoportal_commons.models import DBSession +def fill_tech_user_functionality(name, functionalities, session: Session) -> None: from c2cgeoportal_commons.models.main import Functionality, Role - role = DBSession.query(Role).filter_by(name=name).one() + role = session.query(Role).filter_by(name=name).one() role.functionalities = [Functionality(name, value) for name, value in functionalities] - DBSession.add(role) - transaction.commit() + session.add(role) caching.invalidate_region() diff --git a/geoportal/tests/functional/conftest.py b/geoportal/tests/functional/conftest.py index 38cf36f378..049cbdc8cc 100644 --- a/geoportal/tests/functional/conftest.py +++ b/geoportal/tests/functional/conftest.py @@ -25,19 +25,29 @@ # of the authors and should not be interpreted as representing official policies, # either expressed or implied, of the FreeBSD Project. +import logging +from typing import TYPE_CHECKING, Any from unittest.mock import patch +import pyramid.request import pytest import sqlalchemy.orm import transaction from c2c.template.config import config as configuration from pyramid.testing import DummyRequest +from sqlalchemy.orm.session import Session, SessionTransaction from tests.functional import setup_common as setup_module +from c2cgeoportal_commons.testing import generate_mappers, get_engine, get_session_factory, get_tm_session +from c2cgeoportal_commons.testing.initializedb import truncate_tables from c2cgeoportal_geoportal.lib import caching +_LOG = logging.getLogger(__name__) mapserv_url = "http://mapserver:8080/" +if TYPE_CHECKING: + from c2cgeoportal_commons.models import main, static + @pytest.fixture def settings(): @@ -46,23 +56,51 @@ def settings(): @pytest.fixture -def dbsession(settings): +def dbsession_old(settings: dict[str, Any]) -> Session: from c2cgeoportal_commons.models import DBSession + truncate_tables(DBSession) with patch("c2cgeoportal_geoportal.views.vector_tiles.DBSession", new=DBSession): yield DBSession + truncate_tables(DBSession) + + +@pytest.fixture +def dbsession(settings: dict[str, Any]) -> Session: + generate_mappers() + engine = get_engine(settings) + session_factory = get_session_factory(engine) + session = get_tm_session(session_factory, transaction.manager) + truncate_tables(session) + with patch("c2cgeoportal_commons.models.DBSession", new=session): + yield session + truncate_tables(session) + + +@pytest.fixture +def transact_old(dbsession_old) -> SessionTransaction: + t = dbsession_old.begin_nested() + yield t + try: + t.rollback() + except sqlalchemy.exc.ResourceClosedError: # pragma: no cover + _LOG.warning("Transaction already closed") + dbsession_old.expire_all() @pytest.fixture -def transact(dbsession): +def transact(dbsession) -> SessionTransaction: t = dbsession.begin_nested() yield t - t.rollback() + try: + t.rollback() + except sqlalchemy.exc.ResourceClosedError: # pragma: no cover + _LOG.warning("Transaction already closed") dbsession.expire_all() @pytest.fixture -def dummy_request(dbsession): +def dummy_request(dbsession_old) -> pyramid.request.Request: """ A lightweight dummy request. @@ -76,48 +114,67 @@ def dummy_request(dbsession): """ request = DummyRequest() request.host = "example.com" - request.dbsession = dbsession + request.dbsession = dbsession_old return request @pytest.fixture -def default_ogcserver(dbsession, transact): +def default_ogcserver(dbsession_old, transact_old) -> "main.OGCServer": from c2cgeoportal_commons.models.main import OGCServer - del transact + del transact_old - ogcserver = OGCServer(name="__test_ogc_server") + dbsession_old.flush() + ogcserver = dbsession_old.query(OGCServer).filter(OGCServer.name == "__test_ogc_server").one_or_none() + if ogcserver is None: + ogcserver = OGCServer(name="__test_ogc_server") ogcserver.url = mapserv_url - dbsession.add(ogcserver) - dbsession.flush() + dbsession_old.add(ogcserver) + dbsession_old.flush() yield ogcserver @pytest.fixture -def some_user(dbsession, transact): +def some_user(dbsession_old, transact_old) -> "static.User": from c2cgeoportal_commons.models.static import User - del transact + del transact_old user = User(username="__test_user", password="__test_user") - dbsession.add(user) - dbsession.flush() + dbsession_old.add(user) + dbsession_old.flush() yield user @pytest.fixture -def admin_user(dbsession, transact): +def admin_user(dbsession_old, transact_old) -> "static.User": from c2cgeoportal_commons.models.main import Role from c2cgeoportal_commons.models.static import User - del transact + del transact_old role = Role(name="role_admin") user = User(username="__test_user", password="__test_user", settings_role=role, roles=[role]) - dbsession.add_all([role, user]) - dbsession.flush() + dbsession_old.add_all([role, user]) + dbsession_old.flush() yield user + + +@pytest.fixture() +@pytest.mark.usefixtures("dbsession") +def default_ogcserver(dbsession: Session) -> "main.OGCServer": + from c2cgeoportal_commons.models.main import OGCServer + + dbsession.flush() + ogcserver = dbsession.query(OGCServer).filter(OGCServer.name == "__test_ogc_server").one_or_none() + if ogcserver is None: + ogcserver = OGCServer(name="__test_ogc_server") + ogcserver.url = mapserv_url + dbsession.add(ogcserver) + caching.invalidate_region() + + return ogcserver diff --git a/geoportal/tests/functional/test_entry.py b/geoportal/tests/functional/test_entry.py index 8f33eb729f..ab84ca9fc8 100644 --- a/geoportal/tests/functional/test_entry.py +++ b/geoportal/tests/functional/test_entry.py @@ -96,7 +96,7 @@ def setup_method(self, _): a_geo_table.drop(checkfirst=True, bind=engine) a_geo_table.create(bind=engine) - ogcserver = create_default_ogcserver() + ogcserver = create_default_ogcserver(DBSession) private_layer_edit = LayerWMS(name="__test_private_layer_edit", public=False) private_layer_edit.layer = "__test_private_layer" diff --git a/geoportal/tests/functional/test_functionalities.py b/geoportal/tests/functional/test_functionalities.py index 78f19a76f7..9aea693bce 100644 --- a/geoportal/tests/functional/test_functionalities.py +++ b/geoportal/tests/functional/test_functionalities.py @@ -1,4 +1,4 @@ -# Copyright (c) 2013-2023, Camptocamp SA +# Copyright (c) 2013-2024, Camptocamp SA # All rights reserved. # Redistribution and use in source and binary forms, with or without @@ -43,7 +43,7 @@ def setup_method(self, _): from c2cgeoportal_commons.models.main import Functionality, Role from c2cgeoportal_commons.models.static import User - create_default_ogcserver() + create_default_ogcserver(DBSession) role1 = Role(name="__test_role1") user1 = User(username="__test_user1", password="__test_user1", settings_role=role1, roles=[role1]) role2 = Role(name="__test_role2") @@ -128,7 +128,7 @@ def test_functionalities(self): self.assertEqual(get_functionality("__test_b", request3, False), []) fill_tech_user_functionality( - "registered", (("__test_s", "registered"), ("__test_a", "r1"), ("__test_a", "r2")) + "registered", (("__test_s", "registered"), ("__test_a", "r1"), ("__test_a", "r2")), DBSession ) settings = { "admin_interface": {"available_functionalities": [{"name": "__test_a"}, {"name": "__test_s"}]} @@ -146,9 +146,9 @@ def test_functionalities(self): self.assertEqual(get_functionality("__test_s", request2, False), ["db"]) self.assertEqual(set(get_functionality("__test_a", request2, False)), {"db1", "db2"}) - fill_tech_user_functionality("registered", []) + fill_tech_user_functionality("registered", [], DBSession) fill_tech_user_functionality( - "anonymous", (("__test_s", "anonymous"), ("__test_a", "a1"), ("__test_a", "a2")) + "anonymous", (("__test_s", "anonymous"), ("__test_a", "a1"), ("__test_a", "a2")), DBSession ) settings = { "admin_interface": {"available_functionalities": [{"name": "__test_a"}, {"name": "__test_s"}]} @@ -167,10 +167,10 @@ def test_functionalities(self): self.assertEqual(set(get_functionality("__test_a", request2, False)), {"db1", "db2"}) fill_tech_user_functionality( - "registered", (("__test_s", "registered"), ("__test_a", "r1"), ("__test_a", "r2")) + "registered", (("__test_s", "registered"), ("__test_a", "r1"), ("__test_a", "r2")), DBSession ) fill_tech_user_functionality( - "anonymous", (("__test_s", "anonymous"), ("__test_a", "a1"), ("__test_a", "a2")) + "anonymous", (("__test_s", "anonymous"), ("__test_a", "a1"), ("__test_a", "a2")), DBSession ) settings = { "admin_interface": {"available_functionalities": [{"name": "__test_a"}, {"name": "__test_s"}]} @@ -204,10 +204,10 @@ def test_web_client_functionalities(self): request2.user = DBSession.query(User).filter(User.username == "__test_user2").one() fill_tech_user_functionality( - "registered", (("__test_s", "registered"), ("__test_a", "r1"), ("__test_a", "r2")) + "registered", (("__test_s", "registered"), ("__test_a", "r1"), ("__test_a", "r2")), DBSession ) fill_tech_user_functionality( - "anonymous", (("__test_s", "anonymous"), ("__test_a", "a1"), ("__test_a", "a2")) + "anonymous", (("__test_s", "anonymous"), ("__test_a", "a1"), ("__test_a", "a2")), DBSession ) settings = { "functionalities": {"available_in_templates": ["__test_s", "__test_a"]}, diff --git a/geoportal/tests/functional/test_layers.py b/geoportal/tests/functional/test_layers.py index 3e2ed6febc..0c06fd87c6 100644 --- a/geoportal/tests/functional/test_layers.py +++ b/geoportal/tests/functional/test_layers.py @@ -71,7 +71,7 @@ def setup_method(self, _: Any) -> None: DBSession.add(self.role) DBSession.add(self.main) - create_default_ogcserver() + create_default_ogcserver(DBSession) transaction.commit() diff --git a/geoportal/tests/functional/test_lingva_extractor_config.py b/geoportal/tests/functional/test_lingva_extractor_config.py index b8abe805fe..8c3c75b528 100644 --- a/geoportal/tests/functional/test_lingva_extractor_config.py +++ b/geoportal/tests/functional/test_lingva_extractor_config.py @@ -81,8 +81,8 @@ def settings(): yield settings -@pytest.fixture(scope="module") -def dbsession(settings): +@pytest.fixture() +def dbsession_db1(settings, dbsession_old): from c2cgeoportal_commons.models import DBSession, DBSessions DBSessions["db1"] = DBSession @@ -92,20 +92,12 @@ def dbsession(settings): @pytest.fixture(scope="function") -def transact(dbsession): - t = dbsession.begin_nested() - yield t - t.rollback() - dbsession.expire_all() - - -@pytest.fixture(scope="function") -def test_data(dbsession, transact): +def test_data(dbsession_db1, transact_old): from sqlalchemy import text from c2cgeoportal_commons.models import main - dbsession.execute( + dbsession_db1.execute( text( """ INSERT INTO geodata.testpoint (name) @@ -121,8 +113,8 @@ def test_data(dbsession, transact): value="metadata1_value", ) ] - dbsession.add(theme) - dbsession.flush() + dbsession_db1.add(theme) + dbsession_db1.flush() @pytest.mark.usefixtures("test_data") @@ -131,7 +123,7 @@ class TestGeomapfishConfigExtractor: "c2cgeoportal_geoportal.lib.lingva_extractor.open", mock_open(read_data="vars:"), ) - def test_extract_config(self, settings, dbsession): + def test_extract_config(self, settings, dbsession_db1): extractor = GeomapfishConfigExtractor() options = Mock() diff --git a/geoportal/tests/functional/test_lingva_extractor_themes.py b/geoportal/tests/functional/test_lingva_extractor_themes.py index fb13535bb1..55f2556f62 100644 --- a/geoportal/tests/functional/test_lingva_extractor_themes.py +++ b/geoportal/tests/functional/test_lingva_extractor_themes.py @@ -44,43 +44,43 @@ def extract(self): return list(extractor("development.ini", options)) - def test_extract_theme(self, dbsession, transact): + def test_extract_theme(self, dbsession_old, transact_old): from c2cgeoportal_commons.models import main - del transact + del transact_old theme = main.Theme(name="theme") - dbsession.add(theme) - dbsession.flush() + dbsession_old.add(theme) + dbsession_old.flush() messages = self.extract() - assert {m.msgid for m in messages} == {"theme"} + assert {"theme"} == {m.msgid for m in messages} - def test_extract_group(self, dbsession, transact): + def test_extract_group(self, dbsession_old, transact_old): from c2cgeoportal_commons.models import main - del transact + del transact_old group = main.LayerGroup(name="group") - dbsession.add(group) - dbsession.flush() + dbsession_old.add(group) + dbsession_old.flush() messages = self.extract() - assert {m.msgid for m in messages} == {"group"} + assert {"group"} == {m.msgid for m in messages} - def test_extract_layer_wms(self, dbsession, transact): + def test_extract_layer_wms(self, dbsession_old, transact_old): from c2cgeoportal_commons.models import main - del transact + del transact_old ogc_server = main.OGCServer(name="mapserver", url="http://mapserver:8080") layer_wms = main.LayerWMS(name="layer_wms") layer_wms.ogc_server = ogc_server layer_wms.layer = "testpoint_unprotected" - dbsession.add(layer_wms) - dbsession.flush() + dbsession_old.add(layer_wms) + dbsession_old.flush() messages = self.extract() @@ -92,10 +92,10 @@ def test_extract_layer_wms(self, dbsession, transact): "name", } - def test_extract_layer_wmts(self, dbsession, transact): + def test_extract_layer_wmts(self, dbsession_old, transact_old): from c2cgeoportal_commons.models import main - del transact + del transact_old ogc_server = main.OGCServer(name="mapserver", url="http://mapserver:8080") layer_wmts = main.LayerWMTS(name="layer_wmts") @@ -105,8 +105,8 @@ def test_extract_layer_wmts(self, dbsession, transact): main.Metadata("ogcServer", "mapserver"), main.Metadata("queryLayers", "testpoint_protected"), ] - dbsession.add_all([ogc_server, layer_wmts]) - dbsession.flush() + dbsession_old.add_all([ogc_server, layer_wmts]) + dbsession_old.flush() messages = self.extract() @@ -118,17 +118,17 @@ def test_extract_layer_wmts(self, dbsession, transact): "name", } - def test_extract_full_text_search(self, dbsession, transact): + def test_extract_full_text_search(self, dbsession_old, transact_old): from c2cgeoportal_commons.models import main - del transact + del transact_old fts = main.FullTextSearch() fts.label = "label" fts.layer_name = "some_layer_name" fts.actions = [{"action": "add_layer", "data": "another_layer_name"}] - dbsession.add(fts) - dbsession.flush() + dbsession_old.add(fts) + dbsession_old.flush() messages = self.extract() diff --git a/geoportal/tests/functional/test_login.py b/geoportal/tests/functional/test_login.py index 597cfbd7ae..80d8e67148 100644 --- a/geoportal/tests/functional/test_login.py +++ b/geoportal/tests/functional/test_login.py @@ -273,6 +273,7 @@ def test_change_password_good_is_password_changed(self): assert user._password == crypt.crypt("1234", user._password) def test_login_0(self): + from c2cgeoportal_commons.models import DBSession from c2cgeoportal_geoportal.views.login import Login request = self._create_request_obj() @@ -304,8 +305,8 @@ def test_login_0(self): "functionalities": {"available_in_templates": ["func"]}, "admin_interface": {"available_functionalities": [{"name": "func"}]}, } - fill_tech_user_functionality("anonymous", (("func", "anon"), ("toto", "anon_value2"))) - fill_tech_user_functionality("registered", (("func", "reg"),)) + fill_tech_user_functionality("anonymous", (("func", "anon"), ("toto", "anon_value2")), DBSession) + fill_tech_user_functionality("registered", (("func", "reg"),), DBSession) login = Login(request) assert login.loginuser()["functionalities"] == {"func": ["anon"]} diff --git a/geoportal/tests/functional/test_mapserverproxy.py b/geoportal/tests/functional/test_mapserverproxy.py index 4b3e536f5a..9250dc5cb8 100644 --- a/geoportal/tests/functional/test_mapserverproxy.py +++ b/geoportal/tests/functional/test_mapserverproxy.py @@ -1,4 +1,4 @@ -# Copyright (c) 2013-2023, Camptocamp SA +# Copyright (c) 2013-2024, Camptocamp SA # All rights reserved. # Redistribution and use in source and binary forms, with or without @@ -136,7 +136,7 @@ def setup_method(self, _): setup_db() - ogc_server_internal = create_default_ogcserver() + ogc_server_internal = create_default_ogcserver(DBSession) ogcserver_geoserver = OGCServer(name="__test_ogc_server_geoserver") ogcserver_geoserver.url = mapserv_url ogcserver_geoserver.type = OGCSERVER_TYPE_GEOSERVER @@ -904,6 +904,7 @@ def test_get_feature_wfs_url(self): ) def test_substitution(self): + from c2cgeoportal_commons.models import DBSession from c2cgeoportal_geoportal.views.mapserverproxy import MapservProxy request = self._create_dummy_request() @@ -940,7 +941,7 @@ def test_substitution(self): request = self._create_dummy_request() request.method = "POST" request.body = SUBSTITUTION_GETFEATURE_REQUEST - fill_tech_user_functionality("anonymous", (("mapserver_substitution", "name=bar"),)) + fill_tech_user_functionality("anonymous", (("mapserver_substitution", "name=bar"),), DBSession) response = MapservProxy(request).proxy() self.assertTrue(response.status_int, 200) @@ -951,7 +952,9 @@ def test_substitution(self): request.body = COLUMN_RESTRICTION_GETFEATURE_REQUEST fill_tech_user_functionality( - "anonymous", [("mapserver_substitution", e) for e in ["cols=name", "cols=city", "cols=country"]] + "anonymous", + [("mapserver_substitution", e) for e in ["cols=name", "cols=city", "cols=country"]], + DBSession, ) response = MapservProxy(request).proxy() self.assertTrue(response.status_int, 200) @@ -959,7 +962,7 @@ def test_substitution(self): assert "Swiss" in response.body.decode("utf-8") fill_tech_user_functionality( - "anonymous", [("mapserver_substitution", e) for e in ["cols=name", "cols=city"]] + "anonymous", [("mapserver_substitution", e) for e in ["cols=name", "cols=city"]], DBSession ) response = MapservProxy(request).proxy() self.assertTrue(response.status_int, 200) @@ -967,14 +970,16 @@ def test_substitution(self): assert "Swiss" not in response.body.decode("utf-8") fill_tech_user_functionality( - "anonymous", [("mapserver_substitution", e) for e in ["cols=name", "cols=country"]] + "anonymous", [("mapserver_substitution", e) for e in ["cols=name", "cols=country"]], DBSession ) response = MapservProxy(request).proxy() self.assertTrue(response.status_int, 200) assert "Lausanne" not in response.body.decode("utf-8") assert "Swiss" in response.body.decode("utf-8") - fill_tech_user_functionality("anonymous", [("mapserver_substitution", e) for e in ["cols=name"]]) + fill_tech_user_functionality( + "anonymous", [("mapserver_substitution", e) for e in ["cols=name"]], DBSession + ) response = MapservProxy(request).proxy() self.assertTrue(response.status_int, 200) assert "Lausanne" not in response.body.decode("utf-8") @@ -986,7 +991,7 @@ def test_substitution(self): } request.method = "POST" request.body = SUBSTITUTION_GETFEATURE_REQUEST - fill_tech_user_functionality("anonymous", (("mapserver_substitution", "foo_bar"),)) + fill_tech_user_functionality("anonymous", (("mapserver_substitution", "foo_bar"),), DBSession) request.params.update(dict(s_test1="to be removed", S_TEST2="to be removed")) # just pass in the log message diff --git a/geoportal/tests/functional/test_mapserverproxy_capabilities.py b/geoportal/tests/functional/test_mapserverproxy_capabilities.py index dc29fc973a..f3c824d892 100644 --- a/geoportal/tests/functional/test_mapserverproxy_capabilities.py +++ b/geoportal/tests/functional/test_mapserverproxy_capabilities.py @@ -1,4 +1,4 @@ -# Copyright (c) 2013-2023, Camptocamp SA +# Copyright (c) 2013-2024, Camptocamp SA # All rights reserved. # Redistribution and use in source and binary forms, with or without @@ -54,7 +54,7 @@ def setup_method(self, _): from c2cgeoportal_commons.models.static import User setup_db() - create_default_ogcserver() + create_default_ogcserver(DBSession) ogcserver_jpeg = OGCServer(name="__test_ogc_server_jpeg") ogcserver_jpeg.url = mapserv_url diff --git a/geoportal/tests/functional/test_mapserverproxy_group.py b/geoportal/tests/functional/test_mapserverproxy_group.py index 80cfbe374f..55cdba395d 100644 --- a/geoportal/tests/functional/test_mapserverproxy_group.py +++ b/geoportal/tests/functional/test_mapserverproxy_group.py @@ -1,4 +1,4 @@ -# Copyright (c) 2013-2023, Camptocamp SA +# Copyright (c) 2013-2024, Camptocamp SA # All rights reserved. # Redistribution and use in source and binary forms, with or without @@ -43,7 +43,7 @@ def setup_method(self, _): from c2cgeoportal_commons.models.main import Interface, LayerWMS, RestrictionArea, Role from c2cgeoportal_commons.models.static import User - ogc_server_internal = create_default_ogcserver() + ogc_server_internal = create_default_ogcserver(DBSession) role1 = Role(name="__test_role1", description="__test_role1") user1 = User(username="__test_user1", password="__test_user1", settings_role=role1, roles=[role1]) diff --git a/geoportal/tests/functional/test_mobile_desktop.py b/geoportal/tests/functional/test_mobile_desktop.py index 2d479ff1ce..c64c1cad26 100644 --- a/geoportal/tests/functional/test_mobile_desktop.py +++ b/geoportal/tests/functional/test_mobile_desktop.py @@ -1,4 +1,4 @@ -# Copyright (c) 2013-2023, Camptocamp SA +# Copyright (c) 2013-2024, Camptocamp SA # All rights reserved. # Redistribution and use in source and binary forms, with or without @@ -46,7 +46,7 @@ def setup_method(self, _): from c2cgeoportal_commons.models import DBSession from c2cgeoportal_commons.models.main import Interface, LayerGroup, LayerWMS, Theme - create_default_ogcserver() + create_default_ogcserver(DBSession) main = Interface(name="main") mobile = Interface(name="mobile") diff --git a/geoportal/tests/functional/test_theme2fts.py b/geoportal/tests/functional/test_theme2fts.py index 0729bb63c0..302fd93cc7 100644 --- a/geoportal/tests/functional/test_theme2fts.py +++ b/geoportal/tests/functional/test_theme2fts.py @@ -1,4 +1,4 @@ -# Copyright (c) 2020-2023, Camptocamp SA +# Copyright (c) 2020-2024, Camptocamp SA # All rights reserved. # Redistribution and use in source and binary forms, with or without @@ -53,42 +53,27 @@ def settings(): } -@pytest.fixture(scope="module") -def dbsession(settings): - from c2cgeoportal_commons.models import DBSession - - yield DBSession - - -@pytest.fixture -def transact(dbsession): - t = dbsession.begin_nested() - yield t - t.rollback() - dbsession.expire_all() - - -def add_parent(dbsession, item, group): +def add_parent(dbsession_old, item, group): """ Utility function to add a TreeItem in a TreeGroup. """ from c2cgeoportal_commons.models import main - dbsession.add(main.LayergroupTreeitem(group=group, item=item, ordering=len(group.children_relation))) + dbsession_old.add(main.LayergroupTreeitem(group=group, item=item, ordering=len(group.children_relation))) @pytest.fixture(scope="function") -def test_data(dbsession, transact): +def test_data(dbsession_old, transact_old): from c2cgeoportal_commons.models import main ogc_server = main.OGCServer(name="ogc_server") - dbsession.add(ogc_server) + dbsession_old.add(ogc_server) interfaces = {i.name: i for i in [main.Interface(name=name) for name in ["desktop", "mobile", "api"]]} - dbsession.add_all(interfaces.values()) + dbsession_old.add_all(interfaces.values()) role = main.Role(name="role") - dbsession.add(role) + dbsession_old.add(role) public_theme = main.Theme(name="public_theme") public_theme.interfaces = list(interfaces.values()) @@ -99,37 +84,37 @@ def test_data(dbsession, transact): private_theme.restricted_roles = [role] themes = {t.name: t for t in [public_theme, private_theme]} - dbsession.add_all(themes.values()) + dbsession_old.add_all(themes.values()) first_level_group = main.LayerGroup(name="first_level_group") - add_parent(dbsession, first_level_group, public_theme) - add_parent(dbsession, first_level_group, private_theme) + add_parent(dbsession_old, first_level_group, public_theme) + add_parent(dbsession_old, first_level_group, private_theme) second_level_group = main.LayerGroup(name="second_level_group") - add_parent(dbsession, second_level_group, first_level_group) + add_parent(dbsession_old, second_level_group, first_level_group) groups = {g.name: g for g in [first_level_group, second_level_group]} - dbsession.add_all(groups.values()) + dbsession_old.add_all(groups.values()) public_layer = main.LayerWMS(name="public_layer") public_layer.ogc_server = ogc_server public_layer.interfaces = list(interfaces.values()) - add_parent(dbsession, public_layer, first_level_group) + add_parent(dbsession_old, public_layer, first_level_group) private_layer = main.LayerWMS(name="private_layer", public=False) private_layer.ogc_server = ogc_server private_layer.interfaces = list(interfaces.values()) - add_parent(dbsession, private_layer, second_level_group) + add_parent(dbsession_old, private_layer, second_level_group) layers = {layer.name: layer for layer in [public_layer, private_layer]} - dbsession.add_all(layers.values()) + dbsession_old.add_all(layers.values()) ra = main.RestrictionArea(name="ra") ra.roles = [role] ra.layers = [private_layer] - dbsession.add(ra) + dbsession_old.add(ra) - dbsession.flush() # Flush here to detect integrity errors now. + dbsession_old.flush() # Flush here to detect integrity errors now. yield { "ogc_server": ogc_server, @@ -185,11 +170,11 @@ def translation(domain, localedir=None, languages=None): @pytest.mark.usefixtures("test_data", "dummy_translation") class TestImport: - def assert_fts(self, dbsession, attrs): + def assert_fts(self, dbsession_old, attrs): from c2cgeoportal_commons.models import main fts = ( - dbsession.query(main.FullTextSearch) + dbsession_old.query(main.FullTextSearch) .filter(main.FullTextSearch.label == attrs["label"]) .filter(main.FullTextSearch.interface == attrs["interface"]) .filter(main.FullTextSearch.role == attrs["role"]) @@ -210,15 +195,15 @@ def assert_fts(self, dbsession, attrs): assert fts.actions == attrs["actions"] assert fts.from_theme is True - def test_import(self, dbsession, settings, test_data): + def test_import(self, dbsession_old, settings, test_data): from c2cgeoportal_commons.models import main from c2cgeoportal_geoportal.scripts.theme2fts import Import - Import(dbsession, settings, options()) + Import(dbsession_old, settings, options()) # languages * interfaces * (themes + groups + layers) total = 4 * 2 * (2 + 2 + 2) - assert dbsession.query(main.FullTextSearch).count() == total + assert dbsession_old.query(main.FullTextSearch).count() == total for lang in settings["available_locale_names"]: for interface in test_data["interfaces"].values(): @@ -311,24 +296,24 @@ def test_import(self, dbsession, settings, test_data): }, ] for e in expected: - self.assert_fts(dbsession, e) + self.assert_fts(dbsession_old, e) - def test_search_alias(self, dbsession, settings, test_data): + def test_search_alias(self, dbsession_old, settings, test_data): from c2cgeoportal_commons.models import main from c2cgeoportal_geoportal.scripts.theme2fts import Import alias_layer = main.LayerWMS(name="alias_layer") alias_layer.ogc_server = test_data["ogc_server"] alias_layer.interfaces = list(test_data["interfaces"].values()) - add_parent(dbsession, alias_layer, test_data["groups"]["first_level_group"]) + add_parent(dbsession_old, alias_layer, test_data["groups"]["first_level_group"]) alias_layer.metadatas = [ main.Metadata(name="searchAlias", value="myalias,mykeyword"), main.Metadata(name="searchAlias", value="myotheralias,myotherkeyword"), ] - dbsession.add(alias_layer) - dbsession.flush() + dbsession_old.add(alias_layer) + dbsession_old.flush() - Import(dbsession, settings, options()) + Import(dbsession_old, settings, options()) for lang in settings["available_locale_names"]: for interface in test_data["interfaces"].values(): @@ -351,9 +336,9 @@ def test_search_alias(self, dbsession, settings, test_data): }, ] for e in expected: - self.assert_fts(dbsession, e) + self.assert_fts(dbsession_old, e) - def test_search_label_pattern(self, dbsession, settings, test_data): + def test_search_label_pattern(self, dbsession_old, settings, test_data): from c2cgeoportal_commons.models import main from c2cgeoportal_geoportal.scripts.theme2fts import Import @@ -362,34 +347,34 @@ def test_search_label_pattern(self, dbsession, settings, test_data): label_theme.metadatas = [ main.Metadata(name="searchLabelPattern", value="{name}, {theme}"), ] - dbsession.add(label_theme) + dbsession_old.add(label_theme) label_block = main.LayerGroup(name="label_block") - add_parent(dbsession, label_block, label_theme) + add_parent(dbsession_old, label_block, label_theme) label_block.metadatas = [ main.Metadata(name="searchLabelPattern", value="{name} ({theme}, {parent})"), ] - dbsession.add(label_block) + dbsession_old.add(label_block) label_group = main.LayerGroup(name="label_group") - add_parent(dbsession, label_group, label_block) + add_parent(dbsession_old, label_group, label_block) label_group.metadatas = [ main.Metadata(name="searchLabelPattern", value="{name} ({theme}, {block}, {parent})"), ] - dbsession.add(label_group) + dbsession_old.add(label_group) label_layer = main.LayerWMS(name="label_layer") label_layer.ogc_server = test_data["ogc_server"] label_layer.interfaces = list(test_data["interfaces"].values()) - add_parent(dbsession, label_layer, label_group) + add_parent(dbsession_old, label_layer, label_group) label_layer.metadatas = [ main.Metadata(name="searchLabelPattern", value="{name} ({theme}, {block}, {parent})"), ] - dbsession.add(label_layer) - dbsession.flush() + dbsession_old.add(label_layer) + dbsession_old.flush() - Import(dbsession, settings, options()) + Import(dbsession_old, settings, options()) for lang in settings["available_locale_names"]: for interface in test_data["interfaces"].values(): @@ -454,4 +439,4 @@ def test_search_label_pattern(self, dbsession, settings, test_data): }, ] for e in expected: - self.assert_fts(dbsession, e) + self.assert_fts(dbsession_old, e) diff --git a/geoportal/tests/functional/test_themes.py b/geoportal/tests/functional/test_themes.py index eb7244589d..9261669204 100644 --- a/geoportal/tests/functional/test_themes.py +++ b/geoportal/tests/functional/test_themes.py @@ -1,4 +1,4 @@ -# Copyright (c) 2013-2023, Camptocamp SA +# Copyright (c) 2013-2024, Camptocamp SA # All rights reserved. # Redistribution and use in source and binary forms, with or without @@ -28,387 +28,373 @@ # pylint: disable=missing-docstring,attribute-defined-outside-init,protected-access -from unittest import TestCase - +import pytest import transaction -from pyramid import testing -from tests.functional import create_default_ogcserver, create_dummy_request -from tests.functional import setup_common as setup_module # noqa -from tests.functional import teardown_common as teardown_module # noqa - - -class TestThemesView(TestCase): - def setup_method(self, _): - # Always see the diff - # https://docs.python.org/2/library/unittest.html#unittest.TestCase.maxDiff - self.maxDiff = None - - from c2cgeoportal_commons.models import DBSession - from c2cgeoportal_commons.models.main import ( - OGCSERVER_AUTH_NOAUTH, - Dimension, - Functionality, - Interface, - LayerGroup, - LayerWMS, - LayerWMTS, - Metadata, - OGCServer, - Theme, - ) - - main = Interface(name="desktop") - mobile = Interface(name="mobile") - min_levels = Interface(name="min_levels") - - ogc_server_internal = create_default_ogcserver() - ogc_server_external = OGCServer( - name="__test_ogc_server_chtopo", - url="http://wms.geo.admin.ch/", - image_type="image/jpeg", - auth=OGCSERVER_AUTH_NOAUTH, - ) - ogc_server_external.wfs_support = False - - layer_internal_wms = LayerWMS(name="__test_layer_internal_wms", public=True) - layer_internal_wms.layer = "__test_layer_internal_wms" - layer_internal_wms.interfaces = [main, min_levels] - layer_internal_wms.metadatas = [Metadata("test", "internal_wms")] - layer_internal_wms.ogc_server = ogc_server_internal - - layer_external_wms = LayerWMS( - name="__test_layer_external_wms", layer="ch.swisstopo.dreiecksvermaschung", public=True - ) - layer_external_wms.interfaces = [main] - layer_external_wms.metadatas = [Metadata("test", "external_wms")] - layer_external_wms.ogc_server = ogc_server_external - - layer_wmts = LayerWMTS(name="__test_layer_wmts", public=True) - layer_wmts.url = "http://example.com/1.0.0/WMTSCapabilities.xml" - layer_wmts.layer = "map" - layer_wmts.interfaces = [main, mobile] - layer_wmts.metadatas = [Metadata("test", "wmts")] - layer_wmts.dimensions = [Dimension("year", "2015")] - - layer_group_1 = LayerGroup(name="__test_layer_group_1") - layer_group_1.children = [layer_internal_wms, layer_external_wms, layer_wmts] - layer_group_1.metadatas = [Metadata("test", "group_1")] - - layer_group_2 = LayerGroup(name="__test_layer_group_2") - layer_group_2.children = [layer_wmts, layer_internal_wms, layer_external_wms] - - layer_group_3 = LayerGroup(name="__test_layer_group_3") - layer_group_3.children = [layer_wmts, layer_internal_wms, layer_external_wms] - - layer_group_4 = LayerGroup(name="__test_layer_group_4") - layer_group_4.children = [layer_group_2] - - theme = Theme(name="__test_theme") - theme.interfaces = [main, mobile] - theme.metadatas = [Metadata("test", "theme")] - theme.children = [layer_group_1, layer_group_2] - theme_layer = Theme(name="__test_theme_layer") - theme_layer.interfaces = [min_levels] - theme_layer.children = [layer_internal_wms] - - functionality1 = Functionality(name="test_name", value="test_value_1") - functionality2 = Functionality(name="test_name", value="test_value_2") - theme.functionalities = [functionality1, functionality2] - - DBSession.add_all([theme, theme_layer]) - - transaction.commit() - - def teardown_method(self, _): - testing.tearDown() - - from c2cgeoportal_commons.models import DBSession - from c2cgeoportal_commons.models.main import Dimension, Interface, Metadata, OGCServer, TreeItem - - DBSession.query(Metadata).delete() - DBSession.query(Dimension).delete() - for item in DBSession.query(TreeItem).all(): - DBSession.delete(item) - DBSession.query(Interface).filter(Interface.name == "main").delete() - DBSession.query(OGCServer).delete() - - transaction.commit() - - # - # viewer view tests - # - - @staticmethod - def _create_request_obj(params=None, **kwargs): - if params is None: - params = {} - request = create_dummy_request(**kwargs) - - def route_url(name, _query=None, **kwargs): - del name # Unused - del kwargs # Unused - if _query is None: - return "http://localhost/ci/mapserv" - else: - return "http://localhost/ci/mapserv?" + "&".join(["=".join(i) for i in list(_query.items())]) - - request.route_url = route_url - request.params = params - - return request - - def _create_theme_obj(self, **kwargs): - from c2cgeoportal_geoportal.views.theme import Theme - - kwargs["additional_settings"] = {"admin_interface": {"available_metadata": [{"name": "test"}]}} - return Theme(self._create_request_obj(**kwargs)) - - def _only_name(self, item, attribute="name"): - result = {} - - if attribute in item: - result[attribute] = item[attribute] - - if "children" in item: - result["children"] = [self._only_name(i, attribute) for i in item["children"]] - - return result - - @staticmethod - def _get_filtered_errors(themes): - print(themes["errors"]) - return { - e - for e in themes["errors"] - if e != "The layer '' (__test_layer_external_wms) is not defined in WMS capabilities" - and not e.startswith("Unable to get DescribeFeatureType from URL ") - } - - def test_group(self): - theme_view = self._create_theme_obj(params={"group": "__test_layer_group_3"}) - themes = theme_view.themes() - assert self._get_filtered_errors(themes) == set() - self.assertEqual( - self._only_name(themes["group"]), +from tests.functional import create_dummy_request + + +@pytest.fixture() +@pytest.mark.usefixtures("dbsession", "transact", "default_ogcserver") +def themes_setup(dbsession, transact, default_ogcserver): + from c2cgeoportal_commons.models.main import ( + OGCSERVER_AUTH_NOAUTH, + Dimension, + Functionality, + Interface, + LayerGroup, + LayerWMS, + LayerWMTS, + Metadata, + OGCServer, + Theme, + TreeItem, + ) + + main = Interface(name="desktop") + mobile = Interface(name="mobile") + min_levels = Interface(name="min_levels") + + ogc_server_external = OGCServer( + name="__test_ogc_server_chtopo", + url="http://wms.geo.admin.ch/", + image_type="image/jpeg", + auth=OGCSERVER_AUTH_NOAUTH, + ) + ogc_server_external.wfs_support = False + + layer_internal_wms = LayerWMS(name="__test_layer_internal_wms", public=True) + layer_internal_wms.layer = "__test_layer_internal_wms" + layer_internal_wms.interfaces = [main, min_levels] + layer_internal_wms.metadatas = [Metadata("test", "internal_wms")] + layer_internal_wms.ogc_server = default_ogcserver + + layer_external_wms = LayerWMS( + name="__test_layer_external_wms", layer="ch.swisstopo.dreiecksvermaschung", public=True + ) + layer_external_wms.interfaces = [main] + layer_external_wms.metadatas = [Metadata("test", "external_wms")] + layer_external_wms.ogc_server = ogc_server_external + + layer_wmts = LayerWMTS(name="__test_layer_wmts", public=True) + layer_wmts.url = "http://example.com/1.0.0/WMTSCapabilities.xml" + layer_wmts.layer = "map" + layer_wmts.interfaces = [main, mobile] + layer_wmts.metadatas = [Metadata("test", "wmts")] + layer_wmts.dimensions = [Dimension("year", "2015")] + + layer_group_1 = LayerGroup(name="__test_layer_group_1") + layer_group_1.children = [layer_internal_wms, layer_external_wms, layer_wmts] + layer_group_1.metadatas = [Metadata("test", "group_1")] + + layer_group_2 = LayerGroup(name="__test_layer_group_2") + layer_group_2.children = [layer_wmts, layer_internal_wms, layer_external_wms] + + layer_group_3 = LayerGroup(name="__test_layer_group_3") + layer_group_3.children = [layer_wmts, layer_internal_wms, layer_external_wms] + + layer_group_4 = LayerGroup(name="__test_layer_group_4") + layer_group_4.children = [layer_group_2] + + theme = Theme(name="__test_theme") + theme.interfaces = [main, mobile] + theme.metadatas = [Metadata("test", "theme")] + theme.children = [layer_group_1, layer_group_2] + theme_layer = Theme(name="__test_theme_layer") + theme_layer.interfaces = [min_levels] + theme_layer.children = [layer_internal_wms] + + functionality1 = Functionality(name="test_name", value="test_value_1") + functionality2 = Functionality(name="test_name", value="test_value_2") + theme.functionalities = [functionality1, functionality2] + + dbsession.add_all( + [ + main, + mobile, + min_levels, + ogc_server_external, + layer_internal_wms, + layer_external_wms, + layer_wmts, + layer_group_1, + layer_group_2, + layer_group_3, + layer_group_4, + theme, + theme_layer, + functionality1, + functionality2, + ] + ) + + yield None + + dbsession.query(Metadata).delete() + dbsession.query(Dimension).delete() + for item in dbsession.query(TreeItem).all(): + dbsession.delete(item) + dbsession.query(Interface).filter(Interface.name == "main").delete() + dbsession.query(OGCServer).delete() + + +# +# viewer view tests +# + + +def _create_request_obj(params=None, **kwargs): + if params is None: + params = {} + request = create_dummy_request(**kwargs) + + def route_url(name, _query=None, **kwargs): + del name # Unused + del kwargs # Unused + if _query is None: + return "http://localhost/ci/mapserv" + else: + return "http://localhost/ci/mapserv?" + "&".join(["=".join(i) for i in list(_query.items())]) + + request.route_url = route_url + request.params = params + + return request + + +def _create_theme_obj(**kwargs): + from c2cgeoportal_geoportal.views.theme import Theme + + kwargs["additional_settings"] = {"admin_interface": {"available_metadata": [{"name": "test"}]}} + return Theme(_create_request_obj(**kwargs)) + + +def _only_name(item, attribute="name"): + result = {} + + if attribute in item: + result[attribute] = item[attribute] + + if "children" in item: + result["children"] = [_only_name(i, attribute) for i in item["children"]] + + return result + + +def _get_filtered_errors(themes): + print(themes["errors"]) + return { + e + for e in themes["errors"] + if e != "The layer '' (__test_layer_external_wms) is not defined in WMS capabilities" + and not e.startswith("Unable to get DescribeFeatureType from URL ") + } + + +@pytest.mark.usefixtures("dbsession", "transact", "themes_setup") +def test_group(dbsession, transact, themes_setup): + theme_view = _create_theme_obj(params={"group": "__test_layer_group_3"}) + themes = theme_view.themes() + assert _get_filtered_errors(themes) == set() + assert _only_name(themes["group"]) == { + "name": "__test_layer_group_3", + # order is important + "children": [ + {"name": "__test_layer_wmts"}, + {"name": "__test_layer_internal_wms"}, + {"name": "__test_layer_external_wms"}, + ], + } + + theme_view = _create_theme_obj(params={"group": "__test_layer_group_4"}) + themes = theme_view.themes() + assert _get_filtered_errors(themes) == set() + assert _only_name(themes["group"]) == { + "name": "__test_layer_group_4", + "children": [ { - "name": "__test_layer_group_3", + "name": "__test_layer_group_2", # order is important "children": [ {"name": "__test_layer_wmts"}, {"name": "__test_layer_internal_wms"}, {"name": "__test_layer_external_wms"}, ], - }, - ) - - theme_view = self._create_theme_obj(params={"group": "__test_layer_group_4"}) - themes = theme_view.themes() - assert self._get_filtered_errors(themes) == set() - self.assertEqual( - self._only_name(themes["group"]), - { - "name": "__test_layer_group_4", - "children": [ - { - "name": "__test_layer_group_2", - # order is important - "children": [ - {"name": "__test_layer_wmts"}, - {"name": "__test_layer_internal_wms"}, - {"name": "__test_layer_external_wms"}, - ], - } - ], - }, - ) - - def test_group_update(self): - from c2cgeoportal_commons.models import DBSession - from c2cgeoportal_commons.models.main import LayerGroup - - layer_group_3 = DBSession.query(LayerGroup).filter(LayerGroup.name == "__test_layer_group_3").one() - layer_group_3.children = layer_group_3.children[:-1] - transaction.commit() - - theme_view = self._create_theme_obj(params={"group": "__test_layer_group_3"}) - themes = theme_view.themes() - assert self._get_filtered_errors(themes) == set() - self.assertEqual( - self._only_name(themes["group"]), - { - "name": "__test_layer_group_3", - # order is important - "children": [{"name": "__test_layer_wmts"}, {"name": "__test_layer_internal_wms"}], - }, - ) - - def test_min_levels(self): - theme_view = self._create_theme_obj(params={"interface": "min_levels"}) - themes = theme_view.themes() - self.assertEqual( - self._get_filtered_errors(themes), - { - "The Layer '__test_layer_internal_wms' cannot be directly in the theme '__test_theme_layer' (0/1)." - }, - ) - - theme_view = self._create_theme_obj(params={"min_levels": "2"}) - themes = theme_view.themes() - self.assertEqual( - self._get_filtered_errors(themes), - { - "The Layer '__test_theme/__test_layer_group_1/__test_layer_internal_wms' is under indented (1/2).", - "The Layer '__test_theme/__test_layer_group_1/__test_layer_wmts' is under indented (1/2).", - "The Layer '__test_theme/__test_layer_group_2/__test_layer_external_wms' is under indented (1/2).", - "The Layer '__test_theme/__test_layer_group_2/__test_layer_internal_wms' is under indented (1/2).", - "The Layer '__test_theme/__test_layer_group_1/__test_layer_external_wms' is under indented (1/2).", - "The Layer '__test_theme/__test_layer_group_2/__test_layer_wmts' is under indented (1/2).", - }, - ) - - def test_theme_layer(self): - theme_view = self._create_theme_obj(params={"interface": "min_levels", "min_levels": "0"}) - themes = theme_view.themes() - assert self._get_filtered_errors(themes) == set() - self.assertEqual( - [self._only_name(t) for t in themes["themes"]], - [{"name": "__test_theme_layer", "children": [{"name": "__test_layer_internal_wms"}]}], - ) - - def test_interface(self): - theme_view = self._create_theme_obj(params={"interface": "mobile"}) - themes = theme_view.themes() - assert self._get_filtered_errors(themes) == set() - self.assertEqual( - [self._only_name(t) for t in themes["themes"]], - [ + } + ], + } + + +@pytest.mark.usefixtures("dbsession", "transact", "themes_setup") +def test_group_update(dbsession, transact, themes_setup): + from c2cgeoportal_commons.models.main import LayerGroup + + layer_group_3 = dbsession.query(LayerGroup).filter(LayerGroup.name == "__test_layer_group_3").one() + layer_group_3.children = layer_group_3.children[:-1] + transact.commit() + + theme_view = _create_theme_obj(params={"group": "__test_layer_group_3"}) + themes = theme_view.themes() + assert _get_filtered_errors(themes) == set() + assert _only_name(themes["group"]) == { + "name": "__test_layer_group_3", + # order is important + "children": [{"name": "__test_layer_wmts"}, {"name": "__test_layer_internal_wms"}], + } + + +@pytest.mark.usefixtures("dbsession", "transact", "themes_setup") +def test_min_levels(dbsession, transact, themes_setup): + theme_view = _create_theme_obj(params={"interface": "min_levels"}) + themes = theme_view.themes() + assert _get_filtered_errors(themes) == { + "The Layer '__test_layer_internal_wms' cannot be directly in the theme '__test_theme_layer' (0/1)." + } + + theme_view = _create_theme_obj(params={"min_levels": "2"}) + themes = theme_view.themes() + assert _get_filtered_errors(themes) == { + "The Layer '__test_theme/__test_layer_group_1/__test_layer_internal_wms' is under indented (1/2).", + "The Layer '__test_theme/__test_layer_group_1/__test_layer_wmts' is under indented (1/2).", + "The Layer '__test_theme/__test_layer_group_2/__test_layer_external_wms' is under indented (1/2).", + "The Layer '__test_theme/__test_layer_group_2/__test_layer_internal_wms' is under indented (1/2).", + "The Layer '__test_theme/__test_layer_group_1/__test_layer_external_wms' is under indented (1/2).", + "The Layer '__test_theme/__test_layer_group_2/__test_layer_wmts' is under indented (1/2).", + } + + +@pytest.mark.usefixtures("dbsession", "transact", "themes_setup") +def test_theme_layer(dbsession, transact, themes_setup): + theme_view = _create_theme_obj(params={"interface": "min_levels", "min_levels": "0"}) + themes = theme_view.themes() + assert _get_filtered_errors(themes) == set() + assert [_only_name(t) for t in themes["themes"]] == [ + {"name": "__test_theme_layer", "children": [{"name": "__test_layer_internal_wms"}]} + ] + + +@pytest.mark.usefixtures("dbsession", "transact", "themes_setup") +def test_interface(dbsession, transact, themes_setup): + theme_view = _create_theme_obj(params={"interface": "mobile"}) + themes = theme_view.themes() + assert _get_filtered_errors(themes) == set() + assert [_only_name(t) for t in themes["themes"]] == [ + { + "name": "__test_theme", + "children": [ + {"name": "__test_layer_group_1", "children": [{"name": "__test_layer_wmts"}]}, + {"name": "__test_layer_group_2", "children": [{"name": "__test_layer_wmts"}]}, + ], + } + ] + + +@pytest.mark.usefixtures("dbsession", "transact", "themes_setup") +def test_metadata(dbsession, transact, themes_setup): + theme_view = _create_theme_obj() + themes = theme_view.themes() + assert _get_filtered_errors(themes) == set() + assert [_only_name(t, "metadata") for t in themes["themes"]] == [ + { + "metadata": {"test": "theme"}, + "children": [ { - "name": "__test_theme", + "metadata": {"test": "group_1"}, + # order is important "children": [ - {"name": "__test_layer_group_1", "children": [{"name": "__test_layer_wmts"}]}, - {"name": "__test_layer_group_2", "children": [{"name": "__test_layer_wmts"}]}, + {"metadata": {"test": "internal_wms"}}, + {"metadata": {"test": "external_wms"}}, + {"metadata": {"test": "wmts"}}, ], - } - ], - ) - - def test_metadata(self): - theme_view = self._create_theme_obj() - themes = theme_view.themes() - assert self._get_filtered_errors(themes) == set() - self.assertEqual( - [self._only_name(t, "metadata") for t in themes["themes"]], - [ + }, { - "metadata": {"test": "theme"}, + "metadata": {}, + # order is important "children": [ - { - "metadata": {"test": "group_1"}, - # order is important - "children": [ - {"metadata": {"test": "internal_wms"}}, - {"metadata": {"test": "external_wms"}}, - {"metadata": {"test": "wmts"}}, - ], - }, - { - "metadata": {}, - # order is important - "children": [ - {"metadata": {"test": "wmts"}}, - {"metadata": {"test": "internal_wms"}}, - {"metadata": {"test": "external_wms"}}, - ], - }, + {"metadata": {"test": "wmts"}}, + {"metadata": {"test": "internal_wms"}}, + {"metadata": {"test": "external_wms"}}, ], - } - ], - ) - - def test_ogc_server(self): - theme_view = self._create_theme_obj() - - themes = theme_view.themes() - themes["ogcServers"]["__test_ogc_server"]["attributes"] = {} - themes["ogcServers"]["__test_ogc_server_chtopo"]["attributes"] = {} - assert self._get_filtered_errors(themes) == set() - self.assertEqual( - themes["ogcServers"], - { - "__test_ogc_server": { - "attributes": {}, - "credential": True, - "imageType": "image/png", - "isSingleTile": False, - "namespace": "http://mapserver.gis.umn.edu/mapserver", - "type": "mapserver", - "url": "http://localhost/ci/mapserv?ogcserver=__test_ogc_server", - "urlWfs": "http://localhost/ci/mapserv?ogcserver=__test_ogc_server", - "wfsSupport": True, }, - "__test_ogc_server_chtopo": { - "attributes": {}, - "credential": False, - "imageType": "image/jpeg", - "isSingleTile": False, - "namespace": None, - "type": "mapserver", - "url": "http://wms.geo.admin.ch/", - "urlWfs": "http://wms.geo.admin.ch/", - "wfsSupport": False, + ], + } + ] + + +@pytest.mark.usefixtures("dbsession", "transact", "themes_setup") +def test_ogc_server(dbsession, transact, themes_setup): + theme_view = _create_theme_obj() + + themes = theme_view.themes() + themes["ogcServers"]["__test_ogc_server"]["attributes"] = {} + themes["ogcServers"]["__test_ogc_server_chtopo"]["attributes"] = {} + assert _get_filtered_errors(themes) == set() + assert themes["ogcServers"] == { + "__test_ogc_server": { + "attributes": {}, + "credential": True, + "imageType": "image/png", + "isSingleTile": False, + "namespace": "http://mapserver.gis.umn.edu/mapserver", + "type": "mapserver", + "url": "http://localhost/ci/mapserv?ogcserver=__test_ogc_server", + "urlWfs": "http://localhost/ci/mapserv?ogcserver=__test_ogc_server", + "wfsSupport": True, + }, + "__test_ogc_server_chtopo": { + "attributes": {}, + "credential": False, + "imageType": "image/jpeg", + "isSingleTile": False, + "namespace": None, + "type": "mapserver", + "url": "http://wms.geo.admin.ch/", + "urlWfs": "http://wms.geo.admin.ch/", + "wfsSupport": False, + }, + } + assert [_only_name(t, "ogcServer") for t in themes["themes"]] == [ + { + "children": [ + { + # order is important + "children": [ + {"ogcServer": "__test_ogc_server"}, + {"ogcServer": "__test_ogc_server_chtopo"}, + {}, + ] }, - }, - ) - self.assertEqual( - [self._only_name(t, "ogcServer") for t in themes["themes"]], - [ { + # order is important "children": [ - { - # order is important - "children": [ - {"ogcServer": "__test_ogc_server"}, - {"ogcServer": "__test_ogc_server_chtopo"}, - {}, - ] - }, - { - # order is important - "children": [ - {}, - {"ogcServer": "__test_ogc_server"}, - {"ogcServer": "__test_ogc_server_chtopo"}, - ] - }, + {}, + {"ogcServer": "__test_ogc_server"}, + {"ogcServer": "__test_ogc_server_chtopo"}, ] - } - ], - ) - - def test_dimensions(self): - theme_view = self._create_theme_obj(params={"group": "__test_layer_group_3"}) - themes = theme_view.themes() - assert self._get_filtered_errors(themes) == set() - self.assertEqual( - self._only_name(themes["group"], "dimensions"), - { - # order is important - "children": [{"dimensions": {"year": "2015"}}, {"dimensions": {}}, {"dimensions": {}}] - }, - ) - - def test_background(self): - theme_view = self._create_theme_obj( - params={"background": "__test_layer_group_3", "set": "background"} - ) - themes = theme_view.themes() - assert self._get_filtered_errors(themes) == set() - self.assertEqual( - [self._only_name(e) for e in themes["background_layers"]], - # order is important - [ - {"name": "__test_layer_wmts"}, - {"name": "__test_layer_internal_wms"}, - {"name": "__test_layer_external_wms"}, - ], - ) + }, + ] + } + ] + + +@pytest.mark.usefixtures("dbsession", "transact", "themes_setup") +def test_dimensions(dbsession, transact, themes_setup): + theme_view = _create_theme_obj(params={"group": "__test_layer_group_3"}) + themes = theme_view.themes() + assert _get_filtered_errors(themes) == set() + assert _only_name(themes["group"], "dimensions") == { + # order is important + "children": [{"dimensions": {"year": "2015"}}, {"dimensions": {}}, {"dimensions": {}}] + } + + +@pytest.mark.usefixtures("dbsession", "transact", "themes_setup") +def test_background(dbsession, transact, themes_setup): + theme_view = _create_theme_obj(params={"background": "__test_layer_group_3", "set": "background"}) + themes = theme_view.themes() + assert _get_filtered_errors(themes) == set() + # order is important + assert [_only_name(e) for e in themes["background_layers"]] == [ + {"name": "__test_layer_wmts"}, + {"name": "__test_layer_internal_wms"}, + {"name": "__test_layer_external_wms"}, + ] diff --git a/geoportal/tests/functional/test_themes_dimensions.py b/geoportal/tests/functional/test_themes_dimensions.py index 4e77a55f86..4fca063143 100644 --- a/geoportal/tests/functional/test_themes_dimensions.py +++ b/geoportal/tests/functional/test_themes_dimensions.py @@ -1,4 +1,4 @@ -# Copyright (c) 2016-2023, Camptocamp SA +# Copyright (c) 2016-2024, Camptocamp SA # All rights reserved. # Redistribution and use in source and binary forms, with or without @@ -54,7 +54,7 @@ def setup_method(self, _): Theme, ) - ogc_server = create_default_ogcserver() + ogc_server = create_default_ogcserver(DBSession) main = Interface(name="main") layer_wms_1 = LayerWMS(name="__test_layer_wms_1", public=True) diff --git a/geoportal/tests/functional/test_themes_edit_columns.py b/geoportal/tests/functional/test_themes_edit_columns.py index e900a11a88..d4ee1a6e4a 100644 --- a/geoportal/tests/functional/test_themes_edit_columns.py +++ b/geoportal/tests/functional/test_themes_edit_columns.py @@ -75,7 +75,7 @@ def setup_method(self, _: Any) -> None: ) self.main = Interface(name="main") - self.ogc_server = create_default_ogcserver() + self.ogc_server = create_default_ogcserver(DBSession) self.ogc_server.auth = OGCSERVER_AUTH_NOAUTH self.metadata = None diff --git a/geoportal/tests/functional/test_themes_editing.py b/geoportal/tests/functional/test_themes_editing.py index 564c7741ee..7a3bb496fe 100644 --- a/geoportal/tests/functional/test_themes_editing.py +++ b/geoportal/tests/functional/test_themes_editing.py @@ -63,7 +63,7 @@ def setup_method(self, _): setup_db() - ogcserver = create_default_ogcserver() + ogcserver = create_default_ogcserver(DBSession) role1 = Role(name="__test_role1") user1 = User(username="__test_user1", password="__test_user1", settings_role=role1, roles=[role1]) diff --git a/geoportal/tests/functional/test_themes_entry.py b/geoportal/tests/functional/test_themes_entry.py index 207edd01d3..cd702bd676 100644 --- a/geoportal/tests/functional/test_themes_entry.py +++ b/geoportal/tests/functional/test_themes_entry.py @@ -97,7 +97,7 @@ def setup_method(self, _): a_geo_table.drop(checkfirst=True, bind=engine) a_geo_table.create(bind=engine) - ogcserver = create_default_ogcserver() + ogcserver = create_default_ogcserver(DBSession) private_layer_edit = LayerWMS(name="__test_private_layer_edit", public=False) private_layer_edit.layer = "__test_private_layer" diff --git a/geoportal/tests/functional/test_themes_layermultinameerror.py b/geoportal/tests/functional/test_themes_layermultinameerror.py index ad43f43733..c9c4f7f553 100644 --- a/geoportal/tests/functional/test_themes_layermultinameerror.py +++ b/geoportal/tests/functional/test_themes_layermultinameerror.py @@ -1,4 +1,4 @@ -# Copyright (c) 2013-2023, Camptocamp SA +# Copyright (c) 2013-2024, Camptocamp SA # All rights reserved. # Redistribution and use in source and binary forms, with or without @@ -48,7 +48,7 @@ def setup_method(self, _): main = Interface(name="desktop") - ogc_server = create_default_ogcserver() + ogc_server = create_default_ogcserver(DBSession) layer_wms_1 = LayerWMS(name="__test_layer_wms_1", public=True) layer_wms_1.layer = "testpoint_unprotected" diff --git a/geoportal/tests/functional/test_themes_loop.py b/geoportal/tests/functional/test_themes_loop.py index 8657a5a8e2..2731c10edc 100644 --- a/geoportal/tests/functional/test_themes_loop.py +++ b/geoportal/tests/functional/test_themes_loop.py @@ -1,4 +1,4 @@ -# Copyright (c) 2013-2023, Camptocamp SA +# Copyright (c) 2013-2024, Camptocamp SA # All rights reserved. # Redistribution and use in source and binary forms, with or without @@ -47,7 +47,7 @@ def setup_method(self, _): from c2cgeoportal_commons.models import DBSession from c2cgeoportal_commons.models.main import Interface, LayerGroup, LayerWMS, Theme - ogc_server = create_default_ogcserver() + ogc_server = create_default_ogcserver(DBSession) main = Interface(name="desktop2") layer = LayerWMS(name="__test_layer", public=True) diff --git a/geoportal/tests/functional/test_themes_metadata.py b/geoportal/tests/functional/test_themes_metadata.py index e942c570c2..d1e262b98a 100644 --- a/geoportal/tests/functional/test_themes_metadata.py +++ b/geoportal/tests/functional/test_themes_metadata.py @@ -1,4 +1,4 @@ -# Copyright (c) 2013-2023, Camptocamp SA +# Copyright (c) 2013-2024, Camptocamp SA # All rights reserved. # Redistribution and use in source and binary forms, with or without @@ -49,7 +49,7 @@ def setup_method(self, _): desktop = Interface(name="desktop") - ogc_server_internal = create_default_ogcserver() + ogc_server_internal = create_default_ogcserver(DBSession) layer_wms = LayerWMS(name="__test_layer_internal_wms", public=True) layer_wms.layer = "testpoint_unprotected" diff --git a/geoportal/tests/functional/test_themes_mixed.py b/geoportal/tests/functional/test_themes_mixed.py index fec589f5da..fb920569d3 100644 --- a/geoportal/tests/functional/test_themes_mixed.py +++ b/geoportal/tests/functional/test_themes_mixed.py @@ -1,4 +1,4 @@ -# Copyright (c) 2016-2023, Camptocamp SA +# Copyright (c) 2016-2024, Camptocamp SA # All rights reserved. # Redistribution and use in source and binary forms, with or without @@ -56,7 +56,7 @@ def setup_method(self, _): main = Interface(name="main") - ogc_server_internal = create_default_ogcserver() + ogc_server_internal = create_default_ogcserver(DBSession) ogc_server_external = OGCServer( name="__test_ogc_server_external", url="https://wms.geo.admin.ch/", diff --git a/geoportal/tests/functional/test_themes_nameerror.py b/geoportal/tests/functional/test_themes_nameerror.py index d9db1bc16e..e309fc7c09 100644 --- a/geoportal/tests/functional/test_themes_nameerror.py +++ b/geoportal/tests/functional/test_themes_nameerror.py @@ -1,4 +1,4 @@ -# Copyright (c) 2013-2023, Camptocamp SA +# Copyright (c) 2013-2024, Camptocamp SA # All rights reserved. # Redistribution and use in source and binary forms, with or without @@ -48,7 +48,7 @@ def setup_method(self, _): main = Interface(name="desktop") - ogc_server = create_default_ogcserver() + ogc_server = create_default_ogcserver(DBSession) layer_wms = LayerWMS(name="__test_layer_wms", public=True) layer_wms.layer = "testpoint_unprotected" diff --git a/geoportal/tests/functional/test_themes_ogc_server_cache_clean.py b/geoportal/tests/functional/test_themes_ogc_server_cache_clean.py index b93520a5cb..9ad8151e65 100644 --- a/geoportal/tests/functional/test_themes_ogc_server_cache_clean.py +++ b/geoportal/tests/functional/test_themes_ogc_server_cache_clean.py @@ -182,7 +182,7 @@ def setup_method(self, _): main = Interface(name="desktop") - ogc_server_internal = create_default_ogcserver() + ogc_server_internal = create_default_ogcserver(DBSession) layer_internal_wms = LayerWMS(name="__test_layer_internal_wms", public=True) layer_internal_wms.layer = "__test_layer_internal_wms" diff --git a/geoportal/tests/functional/test_themes_private.py b/geoportal/tests/functional/test_themes_private.py index 36f4a21db2..6a58695601 100644 --- a/geoportal/tests/functional/test_themes_private.py +++ b/geoportal/tests/functional/test_themes_private.py @@ -1,4 +1,4 @@ -# Copyright (c) 2013-2023, Camptocamp SA +# Copyright (c) 2013-2024, Camptocamp SA # All rights reserved. # Redistribution and use in source and binary forms, with or without @@ -63,7 +63,7 @@ def setUp(self): # noqa role2 = Role(name="__test_role2") user2 = User(username="__test_user2", password="__test_user", settings_role=role, roles=[role, role2]) user2.email = "__test_user@example.com" - ogc_server_internal = create_default_ogcserver() + ogc_server_internal = create_default_ogcserver(DBSession) layer_wms = LayerWMS(name="__test_layer_wms", public=True) layer_wms.layer = "__test_public_layer" diff --git a/geoportal/tests/functional/test_themes_scale.py b/geoportal/tests/functional/test_themes_scale.py index 934c4a77d2..87658ae1dd 100644 --- a/geoportal/tests/functional/test_themes_scale.py +++ b/geoportal/tests/functional/test_themes_scale.py @@ -1,4 +1,4 @@ -# Copyright (c) 2013-2023, Camptocamp SA +# Copyright (c) 2013-2024, Camptocamp SA # All rights reserved. # Redistribution and use in source and binary forms, with or without @@ -47,7 +47,7 @@ def setup_method(self, _): main = Interface(name="desktop") - ogc_server = create_default_ogcserver() + ogc_server = create_default_ogcserver(DBSession) layer_noscale = LayerWMS(name="__test_layer_noscale", public=True) layer_noscale.layer = "test_noscale" diff --git a/geoportal/tests/functional/test_themes_time.py b/geoportal/tests/functional/test_themes_time.py index e62cb87296..1d8c3ce765 100644 --- a/geoportal/tests/functional/test_themes_time.py +++ b/geoportal/tests/functional/test_themes_time.py @@ -1,4 +1,4 @@ -# Copyright (c) 2013-2023, Camptocamp SA +# Copyright (c) 2013-2024, Camptocamp SA # All rights reserved. # Redistribution and use in source and binary forms, with or without @@ -66,7 +66,7 @@ def setup_method(self, _): DBSession.query(PointTest).delete() main = Interface(name="desktop") - ogc_server = create_default_ogcserver() + ogc_server = create_default_ogcserver(DBSession) layer_wms_1 = LayerWMS(name="__test_layer_time_1", public=True) layer_wms_1.layer = "test_wmstime" diff --git a/geoportal/tests/functional/test_tinyowsproxy.py b/geoportal/tests/functional/test_tinyowsproxy.py index c0bd6f6157..defaacd638 100644 --- a/geoportal/tests/functional/test_tinyowsproxy.py +++ b/geoportal/tests/functional/test_tinyowsproxy.py @@ -1,4 +1,4 @@ -# Copyright (c) 2015-2023, Camptocamp SA +# Copyright (c) 2015-2024, Camptocamp SA # All rights reserved. # Redistribution and use in source and binary forms, with or without @@ -89,7 +89,7 @@ def setup_method(self, _): user2.roles = [role2] user2.email = "Tarenpion" - ogc_server_internal = create_default_ogcserver() + ogc_server_internal = create_default_ogcserver(DBSession) main = Interface(name="main") @@ -304,12 +304,14 @@ class TestTinyOWSProxyViewNoDb(TestCase): transaction_insert_request_file = data_base + "tinyows_transaction_insert_request.xml" def setup_method(self, _): + from c2cgeoportal_commons.models import DBSession + # Always see the diff # https://docs.python.org/2/library/unittest.html#unittest.TestCase.maxDiff self.maxDiff = None cleanup_db() - create_default_ogcserver() + create_default_ogcserver(DBSession) def teardown_method(self, _): cleanup_db() diff --git a/geoportal/tests/functional/test_vector_tiles.py b/geoportal/tests/functional/test_vector_tiles.py index 79585aa359..ddd038abc4 100644 --- a/geoportal/tests/functional/test_vector_tiles.py +++ b/geoportal/tests/functional/test_vector_tiles.py @@ -1,4 +1,4 @@ -# Copyright (c) 2021-2023, Camptocamp SA +# Copyright (c) 2021-2024, Camptocamp SA # All rights reserved. # Redistribution and use in source and binary forms, with or without @@ -35,10 +35,10 @@ @pytest.fixture -def test_data(dbsession, transact): +def test_data(dbsession_old, transact_old): from c2cgeoportal_commons.models.main import LayerVectorTiles - del transact + del transact_old points = { "p1": PointTest( @@ -54,7 +54,7 @@ def test_data(dbsession, transact): geom=WKTElement("POINT(600090 199955)", srid=21781), name="123", city="Londre", country="UK" ), } - dbsession.add_all(points.values()) + dbsession_old.add_all(points.values()) layers = { "layer_vector_tiles": LayerVectorTiles( @@ -73,9 +73,9 @@ def test_data(dbsession, transact): """, ) } - dbsession.add_all(layers.values()) + dbsession_old.add_all(layers.values()) - dbsession.flush() + dbsession_old.flush() yield { "layers": layers,