Skip to content

Commit

Permalink
Optionally process transactions in background (#593)
Browse files Browse the repository at this point in the history
* Refactor transaction endpoint into standalone function

* Move process transaction to tasks

* Optionally process in background
  • Loading branch information
DavidMStraub authored Dec 29, 2024
1 parent a378a43 commit 5d337d3
Show file tree
Hide file tree
Showing 3 changed files with 182 additions and 139 deletions.
143 changes: 23 additions & 120 deletions gramps_webapi/api/resources/transactions.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#
# Gramps Web API - A RESTful API for the Gramps genealogy program
#
# Copyright (C) 2021-2023 David Straub
# Copyright (C) 2021-2024 David Straub
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
Expand All @@ -20,35 +20,19 @@
"""Raw database transaction API resource."""

import json
from typing import Dict

from flask import Response, request
from gramps.gen.db import DbTxn
from gramps.gen.db.base import DbReadBase
from flask_jwt_extended import get_jwt_identity
from gramps.gen.db.dbconst import TXNADD, TXNDEL, TXNUPD
from gramps.gen.errors import HandleError
from gramps.gen.lib.serialize import from_json, to_json
from gramps.gen.merge.diff import diff_items
from webargs import fields

from ...auth.const import PERM_ADD_OBJ, PERM_DEL_OBJ, PERM_EDIT_OBJ
from ...types import ResponseReturnValue
from ..auth import require_permissions
from ..search import (
SearchIndexer,
get_search_indexer,
SemanticSearchIndexer,
get_semantic_search_indexer,
)
from ..util import (
abort_with_message,
check_quota_people,
get_db_handle,
get_tree_from_jwt_or_fail,
update_usage_people,
use_args,
)
from ..tasks import AsyncResult, make_task_response, process_transactions, run_task
from ..util import abort_with_message, use_args, get_tree_from_jwt_or_fail
from . import ProtectedResource
from .util import app_has_semantic_search, reverse_transaction, transaction_to_json
from .util import reverse_transaction

trans_code = {"delete": TXNDEL, "add": TXNADD, "update": TXNUPD}

Expand All @@ -60,10 +44,11 @@ class TransactionsResource(ProtectedResource):
{
"undo": fields.Boolean(load_default=False),
"force": fields.Boolean(load_default=False),
"background": fields.Boolean(load_default=False),
},
location="query",
)
def post(self, args) -> Response:
def post(self, args) -> ResponseReturnValue:
"""Post the transaction."""
require_permissions([PERM_ADD_OBJ, PERM_EDIT_OBJ, PERM_DEL_OBJ])
payload = request.json
Expand All @@ -72,108 +57,26 @@ def post(self, args) -> Response:
is_undo = args["undo"]
if is_undo:
payload = reverse_transaction(payload)
db_handle = get_db_handle(readonly=False)
num_people_deleted = sum(
item["type"] == "delete" and item["_class"] == "Person" for item in payload
)
num_people_added = sum(
item["type"] == "add" and item["_class"] == "Person" for item in payload
)
num_people_new = num_people_added - num_people_deleted
check_quota_people(to_add=num_people_new)
with DbTxn("Raw transaction", db_handle) as trans:
for item in payload:
try:
class_name = item["_class"]
trans_type = item["type"]
handle = item["handle"]
old_data = item["old"]
if not args["force"] and not self.old_unchanged(
db_handle, class_name, handle, old_data
):
if num_people_added or num_people_deleted:
update_usage_people()
abort_with_message(409, "Object has changed")
new_data = item["new"]
if new_data:
new_obj = from_json(json.dumps(new_data))
if trans_type == "delete":
self.handle_delete(trans, class_name, handle)
if (
class_name == "Person"
and handle == db_handle.get_default_handle()
):
db_handle.set_default_person_handle(None)
elif trans_type == "add":
self.handle_add(trans, class_name, new_obj)
elif trans_type == "update":
self.handle_commit(trans, class_name, new_obj)
else:
if num_people_added or num_people_deleted:
update_usage_people()
abort_with_message(400, "Unexpected transaction type")
except (KeyError, UnicodeDecodeError, json.JSONDecodeError, TypeError):
if num_people_added or num_people_deleted:
update_usage_people()
abort_with_message(400, "Error while processing transaction")
trans_dict = transaction_to_json(trans)
if num_people_new:
update_usage_people()
# update search index
tree = get_tree_from_jwt_or_fail()
indexer: SearchIndexer = get_search_indexer(tree)
for _trans_dict in trans_dict:
handle = _trans_dict["handle"]
class_name = _trans_dict["_class"]
if _trans_dict["type"] == "delete":
indexer.delete_object(handle, class_name)
else:
indexer.add_or_update_object(handle, db_handle, class_name)
if app_has_semantic_search():
semantic_indexer: SemanticSearchIndexer = get_semantic_search_indexer(tree)
for _trans_dict in trans_dict:
handle = _trans_dict["handle"]
class_name = _trans_dict["_class"]
if _trans_dict["type"] == "delete":
semantic_indexer.delete_object(handle, class_name)
else:
semantic_indexer.add_or_update_object(handle, db_handle, class_name)
user_id = get_jwt_identity()
if args["background"]:
task = run_task(
process_transactions,
tree=tree,
user_id=user_id,
payload=payload,
force=args["force"],
)
if isinstance(task, AsyncResult):
return make_task_response(task)
return task, 200
trans_dict = process_transactions(
tree=tree, user_id=user_id, payload=payload, force=args["force"]
)
res = Response(
response=json.dumps(trans_dict),
status=200,
mimetype="application/json",
)
res.headers.add("X-Total-Count", str(len(trans_dict)))
return res

def handle_delete(self, trans: DbTxn, class_name: str, handle: str) -> None:
"""Handle a delete action."""
del_func = trans.db.method("remove_%s", class_name)
del_func(handle, trans)

def handle_commit(self, trans: DbTxn, class_name: str, obj) -> None:
"""Handle an update action."""
com_func = trans.db.method("commit_%s", class_name)
com_func(obj, trans)

def handle_add(self, trans: DbTxn, class_name: str, obj) -> None:
"""Handle an add action."""
if class_name != "Tag" and not obj.gramps_id:
abort_with_message(400, "Gramps ID missing")
self.handle_commit(trans, class_name, obj)

def old_unchanged(
self, db: DbReadBase, class_name: str, handle: str, old_data: Dict
) -> bool:
"""Check if the "old" object is still unchanged."""
handle_func = db.method("get_%s_from_handle", class_name)
try:
obj = handle_func(handle)
except HandleError:
if old_data is None:
return True
return False
obj_dict = json.loads(to_json(obj))
if diff_items(class_name, old_data, obj_dict):
return False
return True
128 changes: 125 additions & 3 deletions gramps_webapi/api/tasks.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#
# Gramps Web API - A RESTful API for the Gramps genealogy program
#
# Copyright (C) 2021-2023 David Straub
# Copyright (C) 2021-2024 David Straub
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
Expand All @@ -20,6 +19,7 @@

from __future__ import annotations

import json
import os
import uuid
from gettext import gettext as _
Expand All @@ -29,6 +29,11 @@
from celery import shared_task, Task
from celery.result import AsyncResult
from flask import current_app
from gramps.gen.db import DbTxn
from gramps.gen.lib.serialize import from_json, to_json
from gramps.gen.db.base import DbReadBase
from gramps.gen.errors import HandleError
from gramps.gen.merge.diff import diff_items

from gramps_webapi.api.search.indexer import SearchIndexer, SemanticSearchIndexer

Expand All @@ -40,9 +45,15 @@
from .media_importer import MediaImporter
from .report import run_report
from .resources.delete import delete_all_objects
from .resources.util import dry_run_import, run_import
from .resources.util import (
app_has_semantic_search,
dry_run_import,
run_import,
transaction_to_json,
)
from .search import get_search_indexer, get_semantic_search_indexer
from .util import (
abort_with_message,
check_quota_people,
close_db,
get_config,
Expand Down Expand Up @@ -410,3 +421,114 @@ def delete_objects(
self, title="Updating semantic search index..."
),
)


@shared_task(bind=True)
def process_transactions(
self, tree: str, user_id: str, payload: list[dict], force: bool
):
"""Process a set of database transactions, updating search indices as needed."""
num_people_deleted = sum(
item["type"] == "delete" and item["_class"] == "Person" for item in payload
)
num_people_added = sum(
item["type"] == "add" and item["_class"] == "Person" for item in payload
)
num_people_new = num_people_added - num_people_deleted
check_quota_people(to_add=num_people_new, tree=tree, user_id=user_id)
db_handle = get_db_outside_request(
tree=tree, view_private=True, readonly=False, user_id=user_id
)
with DbTxn("Raw transaction", db_handle) as trans:
for item in payload:
try:
class_name = item["_class"]
trans_type = item["type"]
handle = item["handle"]
old_data = item["old"]
if not force and not old_unchanged(
db_handle, class_name, handle, old_data
):
if num_people_added or num_people_deleted:
update_usage_people(tree=tree, user_id=user_id)
abort_with_message(409, "Object has changed")
new_data = item["new"]
if new_data:
new_obj = from_json(json.dumps(new_data))
if trans_type == "delete":
handle_delete(trans, class_name, handle)
if (
class_name == "Person"
and handle == db_handle.get_default_handle()
):
db_handle.set_default_person_handle(None)
elif trans_type == "add":
handle_add(trans, class_name, new_obj)
elif trans_type == "update":
handle_commit(trans, class_name, new_obj)
else:
if num_people_added or num_people_deleted:
update_usage_people(tree=tree, user_id=user_id)
abort_with_message(400, "Unexpected transaction type")
except (KeyError, UnicodeDecodeError, json.JSONDecodeError, TypeError):
if num_people_added or num_people_deleted:
update_usage_people(tree=tree, user_id=user_id)
abort_with_message(400, "Error while processing transaction")
trans_dict = transaction_to_json(trans)
if num_people_new:
update_usage_people(tree=tree, user_id=user_id)
# update search index
indexer: SearchIndexer = get_search_indexer(tree)
for _trans_dict in trans_dict:
handle = _trans_dict["handle"]
class_name = _trans_dict["_class"]
if _trans_dict["type"] == "delete":
indexer.delete_object(handle, class_name)
else:
indexer.add_or_update_object(handle, db_handle, class_name)
# update semantic search index
if app_has_semantic_search():
semantic_indexer: SemanticSearchIndexer = get_semantic_search_indexer(tree)
for _trans_dict in trans_dict:
handle = _trans_dict["handle"]
class_name = _trans_dict["_class"]
if _trans_dict["type"] == "delete":
semantic_indexer.delete_object(handle, class_name)
else:
semantic_indexer.add_or_update_object(handle, db_handle, class_name)
return trans_dict


def handle_delete(trans: DbTxn, class_name: str, handle: str) -> None:
"""Handle a delete action."""
del_func = trans.db.method("remove_%s", class_name)
del_func(handle, trans)


def handle_commit(trans: DbTxn, class_name: str, obj) -> None:
"""Handle an update action."""
com_func = trans.db.method("commit_%s", class_name)
com_func(obj, trans)


def handle_add(trans: DbTxn, class_name: str, obj) -> None:
"""Handle an add action."""
if class_name != "Tag" and not obj.gramps_id:
abort_with_message(400, "Gramps ID missing")
handle_commit(trans, class_name, obj)


def old_unchanged(db: DbReadBase, class_name: str, handle: str, old_data: Dict) -> bool:
"""Check if the "old" object is still unchanged."""
handle_func = db.method("get_%s_from_handle", class_name)
assert handle_func is not None, "No handle function found"
try:
obj = handle_func(handle)
except HandleError:
if old_data is None:
return True
return False
obj_dict = json.loads(to_json(obj))
if diff_items(class_name, old_data, obj_dict):
return False
return True
Loading

0 comments on commit 5d337d3

Please sign in to comment.