From e1497598612fe8a178013bf4e441184c33b6fb32 Mon Sep 17 00:00:00 2001 From: Ivan Golubykh Date: Thu, 19 Oct 2023 14:24:12 +0400 Subject: [PATCH 1/2] M2-3148 Outdated data is excluded from the report and is deleted --- src/apps/answers/crud/answers.py | 54 ++++++++++++++++++++++++++++-- src/apps/answers/errors.py | 4 +++ src/apps/answers/tasks.py | 30 +++++++++++++++++ src/apps/applets/crud/applets.py | 17 ++++++++++ src/apps/applets/service/applet.py | 5 +++ 5 files changed, 108 insertions(+), 2 deletions(-) diff --git a/src/apps/answers/crud/answers.py b/src/apps/answers/crud/answers.py index fea5d1cdfa5..88ad30f115a 100644 --- a/src/apps/answers/crud/answers.py +++ b/src/apps/answers/crud/answers.py @@ -3,7 +3,7 @@ import uuid from typing import Collection -from pydantic import parse_obj_as +from pydantic import PositiveInt, parse_obj_as from sqlalchemy import ( Text, and_, @@ -37,10 +37,11 @@ UserAnswerItemData, Version, ) -from apps.answers.errors import AnswerNotFoundError +from apps.answers.errors import AnswerNotFoundError, AnswerRetentionType from apps.applets.db.schemas import AppletHistorySchema from apps.shared.filtering import Comparisons, FilterField, Filtering from apps.shared.paging import paging +from apps.workspaces.domain.constants import DataRetention from infrastructure.database.crud import BaseCRUD @@ -206,6 +207,8 @@ async def get_applet_answers( ) .where( AnswerSchema.applet_id == applet_id, + AnswerSchema.soft_exists(), + AnswerItemSchema.soft_exists(), *filter_clauses, ) ) @@ -567,6 +570,53 @@ async def get_applet_user_answer_items( return parse_obj_as(list[UserAnswerItemData], db_result.all()) + async def removing_outdated_answers( + self, + applet_id: uuid.UUID, + retention_period: PositiveInt, + retention_type: DataRetention, + ): + hours_in_day = 24 + hours_in_week = hours_in_day * 7 + hours_in_month = hours_in_day * 30 + hours_in_year = hours_in_day * 365 + + if retention_type == DataRetention.DAYS: + retention_time = datetime.timedelta( + hours=retention_period * hours_in_day + ) + elif retention_type == DataRetention.WEEKS: + retention_time = datetime.timedelta( + hours=retention_period * hours_in_week + ) + elif retention_type == DataRetention.MONTHS: + retention_time = datetime.timedelta( + hours=retention_period * hours_in_month + ) + elif retention_type == DataRetention.YEARS: + retention_time = datetime.timedelta( + hours=retention_period * hours_in_year + ) + else: + raise AnswerRetentionType() + border_datetime = datetime.datetime.utcnow() - retention_time + + query = update(AnswerSchema) + query = query.where(AnswerSchema.applet_id == applet_id) + query = query.where(AnswerSchema.created_at < border_datetime) + query = query.where(AnswerSchema.soft_exists()) + query = query.values(is_deleted=True) + query = query.returning(column("id")) + deleted_answer_ids: list[uuid.UUID] = [ + x[0] for x in await self._execute(query) + ] + + query = update(AnswerItemSchema) + query = query.where(AnswerItemSchema.answer_id.in_(deleted_answer_ids)) + query = query.where(AnswerSchema.soft_exists()) + query = query.values(is_deleted=True) + await self._execute(query) + async def update_encrypted_fields( self, user_public_key: str, data: list[AnswerItemDataEncrypted] ): diff --git a/src/apps/answers/errors.py b/src/apps/answers/errors.py index 918c24334d1..d35cdb4f575 100644 --- a/src/apps/answers/errors.py +++ b/src/apps/answers/errors.py @@ -27,6 +27,10 @@ class AnswerNoteAccessDeniedError(AccessDeniedError): message = _("Note access denied.") +class AnswerRetentionType(ValidationError): + message = _("Incorrect answer retention type.") + + class UserDoesNotHavePermissionError(AccessDeniedError): message = _("User does not have permission.") diff --git a/src/apps/answers/tasks.py b/src/apps/answers/tasks.py index 9d52f982a6a..128420a1e5e 100644 --- a/src/apps/answers/tasks.py +++ b/src/apps/answers/tasks.py @@ -5,12 +5,16 @@ import sentry_sdk from fastapi import UploadFile +from sqlalchemy.engine import Result from sqlalchemy.ext.asyncio import AsyncSession +from apps.answers.crud.answers import AnswersCRUD from apps.answers.deps.preprocess_arbitrary import get_arbitrary_info from apps.answers.domain import ReportServerResponse +from apps.applets.crud import AppletsCRUD from apps.mailing.domain import MessageSchema from apps.mailing.services import MailingService +from apps.workspaces.domain.constants import DataRetention from broker import broker from infrastructure.database import session_manager @@ -75,3 +79,29 @@ async def create_report( finally: if not isinstance(session_maker, AsyncSession): await session_maker.remove() + + +@broker.task( + task_name="apps.answers.tasks:removing_outdated_answers", + schedule=[{"cron": "*/30 * * * *"}], +) +async def removing_outdated_answers(): + session_maker = session_manager.get_session() + try: + async with session_maker() as session: + applets_data: Result = await AppletsCRUD( + session + ).get_every_non_indefinitely_applet_retentions() + for applet_data in applets_data: + applet_id, retention_period, retention_type = applet_data + retention_type = DataRetention(retention_type) + await AnswersCRUD(session).removing_outdated_answers( + applet_id, retention_period, retention_type + ) + await session.commit() + except Exception as e: + traceback.print_exception(e) + sentry_sdk.capture_exception(e) + finally: + if not isinstance(session_maker, AsyncSession): + await session_maker.remove() diff --git a/src/apps/applets/crud/applets.py b/src/apps/applets/crud/applets.py index 234d1a979b3..50efd65b450 100644 --- a/src/apps/applets/crud/applets.py +++ b/src/apps/applets/crud/applets.py @@ -37,6 +37,7 @@ from apps.users import UserSchema from apps.users.db.schemas import UserDeviceSchema from apps.workspaces.db.schemas import UserAppletAccessSchema +from apps.workspaces.domain.constants import DataRetention from infrastructure.database.crud import BaseCRUD __all__ = ["AppletsCRUD"] @@ -953,6 +954,22 @@ async def get_workspace_applets_flat_list_count( db_result = await self._execute(select(func.count(query.c.id))) return db_result.scalars().first() or 0 + async def get_every_non_indefinitely_applet_retentions(self) -> Result: + """returned Result[Row[uuid.UUID, int, str]] + Result[Row[applet_id, applet_retention_period, applet_retention_type]] + """ + query: Query = select( + AppletSchema.id.label("id"), + AppletSchema.retention_period.label("retention_period"), + AppletSchema.retention_type.label("retention_type"), + ) + query = query.where( + AppletSchema.retention_type != DataRetention.INDEFINITELY + ) + result: Result = await self._execute(query) + + return result + async def clear_report_settings(self, applet_id: uuid.UUID): query: Query = update(AppletSchema) query = query.where(AppletSchema.id == applet_id) diff --git a/src/apps/applets/service/applet.py b/src/apps/applets/service/applet.py index b28bef05510..b802c4ad13b 100644 --- a/src/apps/applets/service/applet.py +++ b/src/apps/applets/service/applet.py @@ -54,6 +54,7 @@ ) from apps.themes.service import ThemeService from apps.users.services.user import UserService +from apps.workspaces.domain.constants import DataRetention from apps.workspaces.errors import AppletEncryptionUpdateDenied from apps.workspaces.service.user_applet_access import UserAppletAccessService from config import settings @@ -825,6 +826,10 @@ async def set_data_retention( await AppletsCRUD(self.session).set_data_retention( applet_id, data_retention ) + if data_retention.retention != DataRetention.INDEFINITELY: + await AnswersCRUD(self.session).removing_outdated_answers( + applet_id, data_retention.period or 1, data_retention.retention + ) async def get_full_applet(self, applet_id: uuid.UUID) -> AppletFull: schema = await AppletsCRUD(self.session).get_by_id(applet_id) From 3363a62b97dbc6f455e8779a5cc4667e653a9678 Mon Sep 17 00:00:00 2001 From: Ivan Golubykh Date: Fri, 27 Oct 2023 14:24:40 +0400 Subject: [PATCH 2/2] M2-3148 Work with arbitrary server is taken into consideration --- src/apps/answers/crud/answers.py | 14 ++++++-------- src/apps/answers/tasks.py | 20 +++++++++++++++++--- src/apps/applets/service/applet.py | 25 ++++++++++++++++++++++--- 3 files changed, 45 insertions(+), 14 deletions(-) diff --git a/src/apps/answers/crud/answers.py b/src/apps/answers/crud/answers.py index 88ad30f115a..fafaa3edeaf 100644 --- a/src/apps/answers/crud/answers.py +++ b/src/apps/answers/crud/answers.py @@ -601,21 +601,19 @@ async def removing_outdated_answers( raise AnswerRetentionType() border_datetime = datetime.datetime.utcnow() - retention_time - query = update(AnswerSchema) + query: Query = delete(AnswerSchema) query = query.where(AnswerSchema.applet_id == applet_id) query = query.where(AnswerSchema.created_at < border_datetime) - query = query.where(AnswerSchema.soft_exists()) - query = query.values(is_deleted=True) query = query.returning(column("id")) deleted_answer_ids: list[uuid.UUID] = [ x[0] for x in await self._execute(query) ] - query = update(AnswerItemSchema) - query = query.where(AnswerItemSchema.answer_id.in_(deleted_answer_ids)) - query = query.where(AnswerSchema.soft_exists()) - query = query.values(is_deleted=True) - await self._execute(query) + item_query: Query = delete(AnswerItemSchema) + item_query = item_query.where( + AnswerItemSchema.answer_id.in_(deleted_answer_ids) + ) + await self._execute(item_query) async def update_encrypted_fields( self, user_public_key: str, data: list[AnswerItemDataEncrypted] diff --git a/src/apps/answers/tasks.py b/src/apps/answers/tasks.py index 128420a1e5e..0e1d6fe4611 100644 --- a/src/apps/answers/tasks.py +++ b/src/apps/answers/tasks.py @@ -95,9 +95,23 @@ async def removing_outdated_answers(): for applet_data in applets_data: applet_id, retention_period, retention_type = applet_data retention_type = DataRetention(retention_type) - await AnswersCRUD(session).removing_outdated_answers( - applet_id, retention_period, retention_type - ) + + arb_uri = await get_arbitrary_info(applet_id, session) + if arb_uri: + arb_session_maker = session_manager.get_session(arb_uri) + try: + async with arb_session_maker() as arb_session: + await AnswersCRUD( + arb_session + ).removing_outdated_answers( + applet_id, retention_period, retention_type + ) + finally: + await arb_session_maker.remove() + else: + await AnswersCRUD(session).removing_outdated_answers( + applet_id, retention_period, retention_type + ) await session.commit() except Exception as e: traceback.print_exception(e) diff --git a/src/apps/applets/service/applet.py b/src/apps/applets/service/applet.py index b802c4ad13b..16ecaa91896 100644 --- a/src/apps/applets/service/applet.py +++ b/src/apps/applets/service/applet.py @@ -12,6 +12,7 @@ from apps.activity_flows.domain.flow_create import FlowCreate, FlowItemCreate from apps.activity_flows.service.flow import FlowService from apps.answers.crud.answers import AnswersCRUD +from apps.answers.deps.preprocess_arbitrary import get_arbitrary_info from apps.applets.crud import ( AppletHistoriesCRUD, AppletsCRUD, @@ -58,6 +59,7 @@ from apps.workspaces.errors import AppletEncryptionUpdateDenied from apps.workspaces.service.user_applet_access import UserAppletAccessService from config import settings +from infrastructure.database import session_manager __all__ = [ "AppletService", @@ -827,9 +829,26 @@ async def set_data_retention( applet_id, data_retention ) if data_retention.retention != DataRetention.INDEFINITELY: - await AnswersCRUD(self.session).removing_outdated_answers( - applet_id, data_retention.period or 1, data_retention.retention - ) + arb_uri = await get_arbitrary_info(applet_id, self.session) + if arb_uri: + arb_session_maker = session_manager.get_session(arb_uri) + try: + async with arb_session_maker() as arb_session: + await AnswersCRUD( + arb_session + ).removing_outdated_answers( + applet_id, + data_retention.period or 10**10, + data_retention.retention, + ) + finally: + await arb_session_maker.remove() + else: + await AnswersCRUD(self.session).removing_outdated_answers( + applet_id, + data_retention.period or 10**10, + data_retention.retention, + ) async def get_full_applet(self, applet_id: uuid.UUID) -> AppletFull: schema = await AppletsCRUD(self.session).get_by_id(applet_id)