From 677abf34a014c5526bf8ec6c315c7a441c79c4c4 Mon Sep 17 00:00:00 2001 From: Tom Kazimiers Date: Thu, 7 Mar 2024 21:55:51 +0100 Subject: [PATCH] History tracking: add simple backend utility to undelete neurons This looks at the history table rows that where affected by the delete transaction and restores them (unless they still exist). There are currently not many checks and the code is only a first building block. --- django/applications/catmaid/history.py | 214 +++++++++++++++++++++++++ django/applications/catmaid/util.py | 17 ++ 2 files changed, 231 insertions(+) diff --git a/django/applications/catmaid/history.py b/django/applications/catmaid/history.py index f10dc66f95..fa4c274583 100644 --- a/django/applications/catmaid/history.py +++ b/django/applications/catmaid/history.py @@ -1,14 +1,19 @@ import functools import re +import dateutil.parser +import logging from django.db import connection from django.db.transaction import TransactionManagementError from catmaid import locks +from catmaid.util import ask_to_continue transaction_label_pattern = re.compile(r'^\w+\.\w+$') +logger = logging.getLogger(__name__) + def fail_on_wrong_format_label(label) -> None: """Check the passed in label if it matches the expected format and raise an @@ -136,3 +141,212 @@ def disable_history_tracking(ignore_missing_fn=False) -> bool: 'lock_id': locks.history_update_event_lock }) return True + + +class Transaction: + + def __init__(self, transaction_id, transaction_time): + self.id = transaction_id + self.time = transaction_time + self.date = dateutil.parser.parse(self.time) + + def __str__(self): + return "TX {} @ {}".format(self.id, self.time) + + +def get_historic_row_count_affected_by_tx(tx): + """Counts how many historic rows reference the passed in transaction. + Returned is a list of tuples (table_name, count). + """ + cursor = connection.cursor() + cursor.execute(""" + DO $$ + DECLARE + + row record; + + BEGIN + + CREATE TEMPORARY TABLE tx_history_matches ( + history_table text, + n_matches int + ); + + FOR row in SELECT format( + 'INSERT INTO tx_history_matches ' + 'SELECT ''%%2$s'', COUNT(*) FROM ONLY %%1$s ht ' + 'WHERE exec_transaction_id = %(tx_id)s ' + 'AND upper(sys_period) = '%(tx_time)s'', + cht.history_table, cht.history_table::text) as query + FROM catmaid_history_table cht + LOOP + EXECUTE row.query; + END LOOP; + + END + $$; + + SELECT * FROM tx_history_matches + WHERE n_matches > 0; + """, { + 'tx_id': tx.id, + 'tx_time': tx.time, + }) + + tx_matches = cursor.fetchall() + + cursor.execute('DROP TABLE tx_history_matches') + + return tx_matches + + +def get_dependent_historic_tx(tx, target_list=None): + """Find all historic transactions that happened after all passed in + transactions that affected rows touched by the passed in transaction. These + transactions can't be guaranteed to be valid after the passed in + transactions have been undone. Therefore, they need to be rolled back as + well, including their dependen transactions. + """ + if target_list is None: + target_list = [] + + cursor = connection.cursor() + cursor.execute(""" + DO $$ + DECLARE + + row record; + + BEGIN + + CREATE TEMPORARY TABLE dependent_tx ( + tx_id bigint, + execution_time text, + user_id int, + label text + ); + + FOR row in SELECT format( + 'INSERT INTO dependent_tx ' + 'SELECT DISTINCT ht2.exec_transaction_id, upper(ht2.sys_period), ' + ' cti.user_id, cti.label ' + 'FROM ( ' + ' SELECT DISTINCT %%3$s as id' + ' FROM ONLY %%1$s ht ' + ' WHERE ht.exec_transaction_id = %(tx_id)s ' + ' AND upper(ht.sys_period) = '%(tx_time)s'' + ') touched_data(id) ' + 'JOIN %%1$s ht2 ON ht2.%%3$s = touched_data.id ' + 'LEFT JOIN catmaid_transaction_info cti ' + 'ON cti.transaction_id = ht2.exec_transaction_id ' + 'AND cti.execution_time = upper(ht2.sys_period) ' + 'WHERE ht2.exec_transaction_id <> %(tx_id)s ' + 'AND ht2.sys_period IS NOT NULL ' + 'AND upper(ht2.sys_period) >= '%(tx_time)s'', + cht.history_table, cht.history_table::text, cht.live_table_pkey_column) as query + FROM catmaid_history_table cht + LOOP + EXECUTE row.query; + END LOOP; + + END + $$; + + SELECT * FROM dependent_tx; + """, { + 'tx_id': tx.id, + 'tx_time': tx.time, + }) + + tx_dependent_tx = list(cursor.fetchall()) + target_list.extend(tx_dependent_tx) + + cursor.execute('DROP TABLE dependent_tx') + + # Add dependent historic transactions of the transactions we just found + for (id, exec_time, _, _) in tx_dependent_tx: + get_dependent_historic_tx(Transaction(id, exec_time), target_list) + + return target_list + + +def undelete_neuron(tx, interactive=False): + """Recreates a neuron and its connections. This simply restores everything + from a delete.neuron transaction. Some materialized views as + treenode_connector_edge or treenode_edge need to be recreated selectively + for the resurrected neuron. Therefore, an update of these views is done for + all skeleton IDs encountered. + """ + tx_matches = get_historic_row_count_affected_by_tx(tx) + + if interactive: + if tx_matches: + logger.info('The following historic entries have been found for transaction {}'.format(tx)) + for row in tx_matches: + logger.info(' table {}: {} rows'.format(row[0], row[1])) + else: + logger.info('No historic entries found for transaction: {}'.format(tx)) + + cursor = connection.cursor() + nr_notices = len(cursor.connection.notices) + cursor.execute(""" + DO $$ + DECLARE + + row record; + + BEGIN + + CREATE TEMPORARY TABLE seen_skeleton ( + id bigint + ); + + INSERT INTO seen_skeleton + SELECT DISTINCT skeleton_id + FROM treenode__history th + WHERE th.exec_transaction_id = %(tx_id)s + AND upper(th.sys_period) >= %(tx_time)s; + + FOR row IN SELECT format('INSERT INTO %%1$s (', cht.live_table) || + array_to_string(array_agg(column_name::text order by pos), ',') || + ') SELECT ' || + array_to_string(array_agg(column_name::text order by pos), ',') || + format( + ' FROM ONLY %%1$s ht ' + ' WHERE ht.exec_transaction_id = %(tx_id)s ' + ' AND upper(ht.sys_period) >= '%(tx_time)s'' + ' ON CONFLICT DO NOTHING', + cht.history_table, cht.history_table::text, + cht.live_table_pkey_column, cht.live_table, + cht.live_table::text) as query + FROM catmaid_history_table cht + JOIN catmaid_table_info cti + ON cti.rel_oid = cht.live_table + WHERE column_name::text NOT IN ('txid', 'edition_time') + GROUP BY cht.history_table, cht.live_table + LOOP + RAISE NOTICE '%%', row.query; + EXECUTE row.query; + END LOOP; + + END + $$; + + SELECT id FROM seen_skeleton; + """, { + 'tx_id': tx.id, + 'tx_time': tx.time, + }) + + skeleton_ids = [r[0] for r in cursor.fetchall()] + + cursor.execute('DROP TABLE seen_skeleton') + + for notice in cursor.connection.notices: + logger.debug(f'NOTICE: {notice}') + + from catmaid.control.edge import rebuild_edges_selectively + logger.info(f'Rebuilding edges for skeletons {skeleton_ids}') + rebuild_edges_selectively(skeleton_ids, log=lambda msg: logger.info(msg)) + + return skeleton_ids diff --git a/django/applications/catmaid/util.py b/django/applications/catmaid/util.py index e83dfba832..fe70000f00 100644 --- a/django/applications/catmaid/util.py +++ b/django/applications/catmaid/util.py @@ -83,3 +83,20 @@ def str2list(v): if len(v.strip()) == 0: return None return list(map(lambda x: x.strip(), v.split(','))) + +def ask_to_continue(): + def ask(): + start = input("Continue? [y/n] ").strip() + + if start == 'y': + return True + elif start == 'n': + return False + else: + print("Only 'y' and 'n' are allowed") + return None + + while True: + c = ask() + if c is not None: + return c