Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

M2-3148 Outdated data is excluded from the report and is deleted #763

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 50 additions & 2 deletions src/apps/answers/crud/answers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import uuid
from typing import Collection

from pydantic import parse_obj_as
from pydantic import PositiveInt, parse_obj_as
from sqlalchemy import (
Text,
and_,
Expand Down Expand Up @@ -37,10 +37,11 @@
UserAnswerItemData,
Version,
)
from apps.answers.errors import AnswerNotFoundError
from apps.answers.errors import AnswerNotFoundError, AnswerRetentionType
from apps.applets.db.schemas import AppletHistorySchema
from apps.shared.filtering import Comparisons, FilterField, Filtering
from apps.shared.paging import paging
from apps.workspaces.domain.constants import DataRetention
from infrastructure.database.crud import BaseCRUD


Expand Down Expand Up @@ -206,6 +207,8 @@ async def get_applet_answers(
)
.where(
AnswerSchema.applet_id == applet_id,
AnswerSchema.soft_exists(),
AnswerItemSchema.soft_exists(),
*filter_clauses,
)
)
Expand Down Expand Up @@ -567,6 +570,51 @@ async def get_applet_user_answer_items(

return parse_obj_as(list[UserAnswerItemData], db_result.all())

async def removing_outdated_answers(
self,
applet_id: uuid.UUID,
retention_period: PositiveInt,
retention_type: DataRetention,
):
hours_in_day = 24
hours_in_week = hours_in_day * 7
hours_in_month = hours_in_day * 30
hours_in_year = hours_in_day * 365

if retention_type == DataRetention.DAYS:
retention_time = datetime.timedelta(
hours=retention_period * hours_in_day
)
elif retention_type == DataRetention.WEEKS:
retention_time = datetime.timedelta(
hours=retention_period * hours_in_week
)
elif retention_type == DataRetention.MONTHS:
retention_time = datetime.timedelta(
hours=retention_period * hours_in_month
)
elif retention_type == DataRetention.YEARS:
retention_time = datetime.timedelta(
hours=retention_period * hours_in_year
)
else:
raise AnswerRetentionType()
border_datetime = datetime.datetime.utcnow() - retention_time

query: Query = delete(AnswerSchema)
query = query.where(AnswerSchema.applet_id == applet_id)
query = query.where(AnswerSchema.created_at < border_datetime)
query = query.returning(column("id"))
deleted_answer_ids: list[uuid.UUID] = [
x[0] for x in await self._execute(query)
]

item_query: Query = delete(AnswerItemSchema)
item_query = item_query.where(
AnswerItemSchema.answer_id.in_(deleted_answer_ids)
)
await self._execute(item_query)

async def update_encrypted_fields(
self, user_public_key: str, data: list[AnswerItemDataEncrypted]
):
Expand Down
4 changes: 4 additions & 0 deletions src/apps/answers/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ class AnswerNoteAccessDeniedError(AccessDeniedError):
message = _("Note access denied.")


class AnswerRetentionType(ValidationError):
message = _("Incorrect answer retention type.")


class UserDoesNotHavePermissionError(AccessDeniedError):
message = _("User does not have permission.")

Expand Down
44 changes: 44 additions & 0 deletions src/apps/answers/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,16 @@

import sentry_sdk
from fastapi import UploadFile
from sqlalchemy.engine import Result
from sqlalchemy.ext.asyncio import AsyncSession

from apps.answers.crud.answers import AnswersCRUD
from apps.answers.deps.preprocess_arbitrary import get_arbitrary_info
from apps.answers.domain import ReportServerResponse
from apps.applets.crud import AppletsCRUD
from apps.mailing.domain import MessageSchema
from apps.mailing.services import MailingService
from apps.workspaces.domain.constants import DataRetention
from broker import broker
from infrastructure.database import session_manager

Expand Down Expand Up @@ -75,3 +79,43 @@ async def create_report(
finally:
if not isinstance(session_maker, AsyncSession):
await session_maker.remove()


@broker.task(
task_name="apps.answers.tasks:removing_outdated_answers",
schedule=[{"cron": "*/30 * * * *"}],
)
async def removing_outdated_answers():
session_maker = session_manager.get_session()
try:
async with session_maker() as session:
applets_data: Result = await AppletsCRUD(
session
).get_every_non_indefinitely_applet_retentions()
for applet_data in applets_data:
applet_id, retention_period, retention_type = applet_data
retention_type = DataRetention(retention_type)

arb_uri = await get_arbitrary_info(applet_id, session)
if arb_uri:
arb_session_maker = session_manager.get_session(arb_uri)
try:
async with arb_session_maker() as arb_session:
await AnswersCRUD(
arb_session
).removing_outdated_answers(
applet_id, retention_period, retention_type
)
finally:
await arb_session_maker.remove()
else:
await AnswersCRUD(session).removing_outdated_answers(
applet_id, retention_period, retention_type
)
await session.commit()
except Exception as e:
traceback.print_exception(e)
sentry_sdk.capture_exception(e)
finally:
if not isinstance(session_maker, AsyncSession):
await session_maker.remove()
17 changes: 17 additions & 0 deletions src/apps/applets/crud/applets.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from apps.users import UserSchema
from apps.users.db.schemas import UserDeviceSchema
from apps.workspaces.db.schemas import UserAppletAccessSchema
from apps.workspaces.domain.constants import DataRetention
from infrastructure.database.crud import BaseCRUD

__all__ = ["AppletsCRUD"]
Expand Down Expand Up @@ -953,6 +954,22 @@ async def get_workspace_applets_flat_list_count(
db_result = await self._execute(select(func.count(query.c.id)))
return db_result.scalars().first() or 0

async def get_every_non_indefinitely_applet_retentions(self) -> Result:
"""returned Result[Row[uuid.UUID, int, str]]
Result[Row[applet_id, applet_retention_period, applet_retention_type]]
"""
query: Query = select(
AppletSchema.id.label("id"),
AppletSchema.retention_period.label("retention_period"),
AppletSchema.retention_type.label("retention_type"),
)
query = query.where(
AppletSchema.retention_type != DataRetention.INDEFINITELY
)
result: Result = await self._execute(query)

return result

async def clear_report_settings(self, applet_id: uuid.UUID):
query: Query = update(AppletSchema)
query = query.where(AppletSchema.id == applet_id)
Expand Down
24 changes: 24 additions & 0 deletions src/apps/applets/service/applet.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from apps.activity_flows.domain.flow_create import FlowCreate, FlowItemCreate
from apps.activity_flows.service.flow import FlowService
from apps.answers.crud.answers import AnswersCRUD
from apps.answers.deps.preprocess_arbitrary import get_arbitrary_info
from apps.applets.crud import (
AppletHistoriesCRUD,
AppletsCRUD,
Expand Down Expand Up @@ -54,9 +55,11 @@
)
from apps.themes.service import ThemeService
from apps.users.services.user import UserService
from apps.workspaces.domain.constants import DataRetention
from apps.workspaces.errors import AppletEncryptionUpdateDenied
from apps.workspaces.service.user_applet_access import UserAppletAccessService
from config import settings
from infrastructure.database import session_manager

__all__ = [
"AppletService",
Expand Down Expand Up @@ -825,6 +828,27 @@ async def set_data_retention(
await AppletsCRUD(self.session).set_data_retention(
applet_id, data_retention
)
if data_retention.retention != DataRetention.INDEFINITELY:
arb_uri = await get_arbitrary_info(applet_id, self.session)
if arb_uri:
arb_session_maker = session_manager.get_session(arb_uri)
try:
async with arb_session_maker() as arb_session:
await AnswersCRUD(
arb_session
).removing_outdated_answers(
applet_id,
data_retention.period or 10**10,
data_retention.retention,
)
finally:
await arb_session_maker.remove()
else:
await AnswersCRUD(self.session).removing_outdated_answers(
applet_id,
data_retention.period or 10**10,
data_retention.retention,
)

async def get_full_applet(self, applet_id: uuid.UUID) -> AppletFull:
schema = await AppletsCRUD(self.session).get_by_id(applet_id)
Expand Down