From 876bfde8d93ac34893c38dfb9465843944276dfb Mon Sep 17 00:00:00 2001 From: "olfa.mizen_externe@rte-france.com" Date: Mon, 8 Jan 2024 15:30:54 +0100 Subject: [PATCH] perf(watcher): change db queries to improve Watcher scanning perfs --- ...fd73601a9075_add_delete_cascade_studies.py | 69 +++++++++++++++++++ antarest/study/model.py | 4 +- antarest/study/repository.py | 17 +++-- antarest/study/service.py | 25 +++---- .../storage/variantstudy/model/dbmodel.py | 2 +- scripts/rollback.sh | 2 +- tests/storage/test_service.py | 32 ++++++--- 7 files changed, 119 insertions(+), 32 deletions(-) create mode 100644 alembic/versions/fd73601a9075_add_delete_cascade_studies.py diff --git a/alembic/versions/fd73601a9075_add_delete_cascade_studies.py b/alembic/versions/fd73601a9075_add_delete_cascade_studies.py new file mode 100644 index 0000000000..ac063fe516 --- /dev/null +++ b/alembic/versions/fd73601a9075_add_delete_cascade_studies.py @@ -0,0 +1,69 @@ +""" +Add delete cascade constraint to study foreign keys + +Revision ID: fd73601a9075 +Revises: 3c70366b10ea +Create Date: 2024-02-12 17:27:37.314443 +""" +from alembic import op + +# revision identifiers, used by Alembic. +revision = "fd73601a9075" +down_revision = "3c70366b10ea" +branch_labels = None +depends_on = None + +# noinspection SpellCheckingInspection +RAWSTUDY_FK = "rawstudy_id_fkey" + +# noinspection SpellCheckingInspection +VARIANTSTUDY_FK = "variantstudy_id_fkey" + +# noinspection SpellCheckingInspection +STUDY_ADDITIONAL_DATA_FK = "study_additional_data_study_id_fkey" + + +def upgrade() -> None: + dialect_name: str = op.get_context().dialect.name + if dialect_name == "postgresql": + with op.batch_alter_table("rawstudy", schema=None) as batch_op: + batch_op.drop_constraint(RAWSTUDY_FK, type_="foreignkey") + batch_op.create_foreign_key(RAWSTUDY_FK, "study", ["id"], ["id"], ondelete="CASCADE") + + with op.batch_alter_table("study_additional_data", schema=None) as batch_op: + batch_op.drop_constraint(STUDY_ADDITIONAL_DATA_FK, type_="foreignkey") + batch_op.create_foreign_key(STUDY_ADDITIONAL_DATA_FK, "study", ["study_id"], ["id"], ondelete="CASCADE") + + with op.batch_alter_table("variantstudy", schema=None) as batch_op: + batch_op.drop_constraint(VARIANTSTUDY_FK, type_="foreignkey") + batch_op.create_foreign_key(VARIANTSTUDY_FK, "study", ["id"], ["id"], ondelete="CASCADE") + + elif dialect_name == "sqlite": + # Adding ondelete="CASCADE" to a foreign key in sqlite is not supported + pass + + else: + raise NotImplementedError(f"{dialect_name=} not implemented") + + +def downgrade() -> None: + dialect_name: str = op.get_context().dialect.name + if dialect_name == "postgresql": + with op.batch_alter_table("rawstudy", schema=None) as batch_op: + batch_op.drop_constraint(RAWSTUDY_FK, type_="foreignkey") + batch_op.create_foreign_key(RAWSTUDY_FK, "study", ["id"], ["id"]) + + with op.batch_alter_table("study_additional_data", schema=None) as batch_op: + batch_op.drop_constraint(STUDY_ADDITIONAL_DATA_FK, type_="foreignkey") + batch_op.create_foreign_key(STUDY_ADDITIONAL_DATA_FK, "study", ["study_id"], ["id"]) + + with op.batch_alter_table("variantstudy", schema=None) as batch_op: + batch_op.drop_constraint(VARIANTSTUDY_FK, type_="foreignkey") + batch_op.create_foreign_key(VARIANTSTUDY_FK, "study", ["id"], ["id"]) + + elif dialect_name == "sqlite": + # Removing ondelete="CASCADE" to a foreign key in sqlite is not supported + pass + + else: + raise NotImplementedError(f"{dialect_name=} not implemented") diff --git a/antarest/study/model.py b/antarest/study/model.py index fe10b4f211..df36efa856 100644 --- a/antarest/study/model.py +++ b/antarest/study/model.py @@ -130,7 +130,7 @@ class StudyAdditionalData(Base): # type:ignore study_id = Column( String(36), - ForeignKey("study.id"), + ForeignKey("study.id", ondelete="CASCADE"), primary_key=True, ) author = Column(String(255), default="Unknown") @@ -230,7 +230,7 @@ class RawStudy(Study): id = Column( String(36), - ForeignKey("study.id"), + ForeignKey("study.id", ondelete="CASCADE"), primary_key=True, ) content_status = Column(Enum(StudyContentStatus)) diff --git a/antarest/study/repository.py b/antarest/study/repository.py index 3aa6e60681..1a540786e2 100644 --- a/antarest/study/repository.py +++ b/antarest/study/repository.py @@ -3,7 +3,7 @@ import typing as t from pydantic import BaseModel, NonNegativeInt -from sqlalchemy import func, not_, or_ # type: ignore +from sqlalchemy import func, not_, or_, select # type: ignore from sqlalchemy.orm import Session, joinedload, with_polymorphic # type: ignore from antarest.core.interfaces.cache import ICache @@ -272,10 +272,10 @@ def get_all_raw(self, exists: t.Optional[bool] = None) -> t.Sequence[RawStudy]: studies: t.Sequence[RawStudy] = query.all() return studies - def delete(self, id: str) -> None: + def delete(self, id_: str, *ids: str) -> None: + ids = (id_,) + ids session = self.session - u: Study = session.query(Study).get(id) - session.delete(u) + session.query(Study).filter(Study.id.in_(ids)).delete(synchronize_session=False) session.commit() def update_tags(self, study: Study, new_tags: t.Sequence[str]) -> None: @@ -292,3 +292,12 @@ def update_tags(self, study: Study, new_tags: t.Sequence[str]) -> None: study.tags = [Tag(label=tag) for tag in new_labels] + existing_tags self.session.merge(study) self.session.commit() + + def list_duplicates(self) -> t.List[t.Tuple[str, str]]: + """ + Get list of duplicates as tuples (id, path). + """ + session = self.session + subquery = session.query(Study.path).group_by(Study.path).having(func.count() > 1).subquery() + query = session.query(Study.id, Study.path).filter(Study.path.in_(subquery)) + return t.cast(t.List[t.Tuple[str, str]], query.all()) diff --git a/antarest/study/service.py b/antarest/study/service.py index ae86fe62ae..3edb5e0a2d 100644 --- a/antarest/study/service.py +++ b/antarest/study/service.py @@ -1,4 +1,5 @@ import base64 +import collections import contextlib import io import json @@ -696,20 +697,16 @@ def get_input_matrix_startdate( return get_start_date(file_study, output_id, level) def remove_duplicates(self) -> None: - study_paths: t.Dict[str, t.List[str]] = {} - for study in self.repository.get_all(): - if isinstance(study, RawStudy) and not study.archived: - path = str(study.path) - if path not in study_paths: - study_paths[path] = [] - study_paths[path].append(study.id) - - for studies_with_same_path in study_paths.values(): - if len(studies_with_same_path) > 1: - logger.info(f"Found studies {studies_with_same_path} with same path, de duplicating") - for study_name in studies_with_same_path[1:]: - logger.info(f"Removing study {study_name}") - self.repository.delete(study_name) + duplicates = self.repository.list_duplicates() + ids: t.List[str] = [] + # ids with same path + duplicates_by_path = collections.defaultdict(list) + for study_id, path in duplicates: + duplicates_by_path[path].append(study_id) + for path, study_ids in duplicates_by_path.items(): + ids.extend(study_ids[1:]) + # delete list ids + self.repository.delete(*ids) def sync_studies_on_disk(self, folders: t.List[StudyFolder], directory: t.Optional[Path] = None) -> None: """ diff --git a/antarest/study/storage/variantstudy/model/dbmodel.py b/antarest/study/storage/variantstudy/model/dbmodel.py index bbe264f89f..9272eb797f 100644 --- a/antarest/study/storage/variantstudy/model/dbmodel.py +++ b/antarest/study/storage/variantstudy/model/dbmodel.py @@ -77,7 +77,7 @@ class VariantStudy(Study): id: str = Column( String(36), - ForeignKey("study.id"), + ForeignKey("study.id", ondelete="CASCADE"), primary_key=True, ) generation_task: t.Optional[str] = Column(String(), nullable=True) diff --git a/scripts/rollback.sh b/scripts/rollback.sh index bf92685dc4..46d04a7966 100755 --- a/scripts/rollback.sh +++ b/scripts/rollback.sh @@ -12,5 +12,5 @@ CUR_DIR=$(cd "$(dirname "$0")" && pwd) BASE_DIR=$(dirname "$CUR_DIR") cd "$BASE_DIR" -alembic downgrade 1f5db5dfad80 +alembic downgrade 3c70366b10ea cd - diff --git a/tests/storage/test_service.py b/tests/storage/test_service.py index e7e8662394..6374b0d8d6 100644 --- a/tests/storage/test_service.py +++ b/tests/storage/test_service.py @@ -350,18 +350,30 @@ def test_partial_sync_studies_from_disk() -> None: ) -@pytest.mark.unit_test -def test_remove_duplicate() -> None: - ma = RawStudy(id="a", path="a") - mb = RawStudy(id="b", path="a") +@with_db_context +def test_remove_duplicate(db_session: Session) -> None: + with db_session: + db_session.add(RawStudy(id="a", path="/path/to/a")) + db_session.add(RawStudy(id="b", path="/path/to/a")) + db_session.add(RawStudy(id="c", path="/path/to/c")) + db_session.commit() + study_count = db_session.query(RawStudy).filter(RawStudy.path == "/path/to/a").count() + assert study_count == 2 # there are 2 studies with same path before removing duplicates - repository = Mock() - repository.get_all.return_value = [ma, mb] - config = Config(storage=StorageConfig(workspaces={DEFAULT_WORKSPACE_NAME: WorkspaceConfig()})) - service = build_study_service(Mock(), repository, config) + with db_session: + repository = StudyMetadataRepository(Mock(), db_session) + config = Config(storage=StorageConfig(workspaces={DEFAULT_WORKSPACE_NAME: WorkspaceConfig()})) + service = build_study_service(Mock(), repository, config) + service.remove_duplicates() - service.remove_duplicates() - repository.delete.assert_called_once_with(mb.id) + # example with 1 duplicate with same path + with db_session: + study_count = db_session.query(RawStudy).filter(RawStudy.path == "/path/to/a").count() + assert study_count == 1 + # example with no duplicates with same path + with db_session: + study_count = db_session.query(RawStudy).filter(RawStudy.path == "/path/to/c").count() + assert study_count == 1 # noinspection PyArgumentList