diff --git a/ezidapp/management/commands/opensearch-delete.py b/ezidapp/management/commands/opensearch-delete.py new file mode 100644 index 00000000..c11a34a4 --- /dev/null +++ b/ezidapp/management/commands/opensearch-delete.py @@ -0,0 +1,112 @@ +from django.core.management.base import BaseCommand +from django.conf import settings +from ezidapp.models.identifier import SearchIdentifier +from impl.open_search_doc import OpenSearchDoc +import json +from django.db import connection +import time + +SPLIT_SIZE = 100 + +# run: python manage.py opensearch-delete + +class Command(BaseCommand): + def handle(self, *args, **options): + # iterate through all items in the OpenSearch index and check against the database + # SearchIdentifier table to find removed items and remove them from the index + + # Initialize the OpenSearch client + client = OpenSearchDoc.CLIENT + index_name=settings.OPENSEARCH_INDEX + + # Start the scroll + response = client.search( + index=index_name, + body={ + "query": { + "match_all": {} + } + }, + scroll='2m', # Keep the scroll context alive for 2 minutes + size=100 # Number of results per batch + ) + + # Extract the scroll ID and the initial batch of results + scroll_id = response['_scroll_id'] + hits = response['hits']['hits'] + + checked_count = 100 + + # Continue scrolling until no more results are returned + while len(hits) > 0: + ids = [hit['_id'] for hit in hits] + + # Convert the list of identifiers to a string format suitable for SQL. This UNION ALL is janky + # but MySQL doesn't support FROM VALUES. The other option was to create a temporary table every time, but + # that seemed like overkill. + ids_union = ' UNION ALL '.join(f"SELECT %s AS identifier" for _ in ids) + + # Raw SQL query to find identifiers in the list that are not in the database + query = f""" + SELECT id_list.identifier + FROM ({ids_union}) AS id_list + LEFT JOIN ezidapp_searchidentifier AS si ON id_list.identifier = si.identifier + WHERE si.identifier IS NULL; + """ + + # Execute the query + with connection.cursor() as cursor: + cursor.execute(query, ids) + missing_identifiers = [row[0] for row in cursor.fetchall()] + + missing_identifiers_list = list(missing_identifiers) + + if len(missing_identifiers_list) > 0: + # Create the bulk delete request payload + bulk_delete_payload = "" + for identifier in missing_identifiers_list: + bulk_delete_payload += json.dumps( + {"delete": {"_index": index_name, "_id": identifier}}) + "\n" + + # Send the bulk delete request to OpenSearch + response = client.bulk(body=bulk_delete_payload) + + # Check the response + if response['errors']: + print(f" Errors occurred during bulk delete of {', '.join(missing_identifiers_list)}") + else: + print(f" Bulk delete successful deleting {', '.join(missing_identifiers_list)}") + + print("checked:", checked_count) + + try: + response = self.scroll_with_retry(client, scroll_id) + except Exception as e: + print(e) + break + + + scroll_id = response['_scroll_id'] + hits = response['hits']['hits'] + checked_count += len(hits) + + # Clear the scroll context + client.clear_scroll(scroll_id=scroll_id) + print("Done removing deleted IDs") + + @staticmethod + def scroll_with_retry(client, scroll_id, max_retries=5, sleep_time=5): + for attempt in range(max_retries): + try: + response = client.scroll( + scroll_id=scroll_id, + scroll='2m' + ) + return response + except Exception as e: + if attempt < max_retries - 1: + print(f" Scroll attempt {attempt + 1} failed, retrying in {sleep_time} seconds...") + time.sleep(sleep_time) + else: + print(f" Scroll attempt {attempt + 1} failed, no more retries.") + raise e \ No newline at end of file diff --git a/ezidapp/management/commands/proc-cleanup-async-queues_v2.py b/ezidapp/management/commands/proc-cleanup-async-queues_v2.py new file mode 100644 index 00000000..27f3fa91 --- /dev/null +++ b/ezidapp/management/commands/proc-cleanup-async-queues_v2.py @@ -0,0 +1,245 @@ +#! /usr/bin/env python + +# Copyright©2021, Regents of the University of California +# http://creativecommons.org/licenses/BSD + +""" + +Clean up entries that are successfully completed or are a 'no-op' + +Identifier operation entries are retrieved by querying the database; +operations that successfully completed or are a no-op are deleted based on +pre-set interval. + +""" + +import logging +import time +from datetime import datetime +from dateutil.parser import parse + +import django.conf +import django.db +from django.db import transaction +from django.db.models import Q + +import ezidapp.management.commands.proc_base +import ezidapp.models.identifier +import ezidapp.models.shoulder + + +log = logging.getLogger(__name__) + + +class Command(ezidapp.management.commands.proc_base.AsyncProcessingCommand): + help = __doc__ + name = __name__ + + setting = 'DAEMONS_QUEUE_CLEANUP_ENABLED' + + queueType = { + 'crossref': ezidapp.models.async_queue.CrossrefQueue, + 'datacite': ezidapp.models.async_queue.DataciteQueue, + 'search': ezidapp.models.async_queue.SearchIndexerQueue + } + + refIdentifier = ezidapp.models.identifier.RefIdentifier + + def __init__(self): + super().__init__() + + def add_arguments(self, parser): + super().add_arguments(parser) + parser.add_argument( + '--pagesize', help='Rows in each batch select.', type=int) + + parser.add_argument( + '--updated_range_from', type=str, + help = ( + 'Updated date range from - local date/time in ISO 8601 format without timezone \n' + 'YYYYMMDD, YYYYMMDDTHHMMSS, YYYY-MM-DD, YYYY-MM-DDTHH:MM:SS. \n' + 'Examples: 20241001, 20241001T131001, 2024-10-01, 2024-10-01T13:10:01 or 2024-10-01' + ) + ) + + parser.add_argument( + '--updated_range_to', type=str, + help = ( + 'Updated date range to - local date/time in ISO 8601 format without timezone \n' + 'YYYYMMDD, YYYYMMDDTHHMMSS, YYYY-MM-DD, YYYY-MM-DDTHH:MM:SS. \n' + 'Examples: 20241001, 20241001T131001, 2024-10-01, 2024-10-01T13:10:01 or 2024-10-01' + ) + ) + + + def run(self): + """ + Checks for the successfully processed identifier + + Args: + None + """ + ASYNC_CLEANUP_SLEEP = 60 * 10 + + BATCH_SIZE = self.opt.pagesize + if BATCH_SIZE is None: + BATCH_SIZE = 10000 + + updated_from = None + updated_to = None + updated_from_str = self.opt.updated_range_from + updated_to_str = self.opt.updated_range_to + if updated_from_str is not None: + try: + updated_from = self.date_to_seconds(updated_from_str) + except Exception as ex: + log.error(f"Input date/time error: {ex}") + exit() + if updated_to_str is not None: + try: + updated_to = self.date_to_seconds(updated_to_str) + except Exception as ex: + log.error(f"Input date/time error: {ex}") + exit() + + if updated_from is not None and updated_to is not None: + time_range = Q(updateTime__gte=updated_from) & Q(updateTime__lte=updated_to) + time_range_str = f"updated between: {updated_from_str} and {updated_to_str}" + elif updated_to is not None: + time_range = Q(updateTime__lte=updated_to) + time_range_str = f"updated before: {updated_to_str}" + else: + max_age_ts = int(time.time()) - django.conf.settings.DAEMONS_EXPUNGE_MAX_AGE_SEC + min_age_ts = max_age_ts - django.conf.settings.DAEMONS_EXPUNGE_MAX_AGE_SEC + time_range = Q(updateTime__gte=min_age_ts) & Q(updateTime__lte=max_age_ts) + time_range_str = f"updated between: {self.seconds_to_date(min_age_ts)} and {self.seconds_to_date(max_age_ts)}" + + last_id = 0 + # keep running until terminated + while not self.terminated(): + # retrieve identifiers with update timestamp within a date range + filter = time_range & Q(id__gt=last_id) + refIdsQS = self.refIdentifier.objects.filter(filter).order_by("pk")[: BATCH_SIZE] + + log.info(f"Checking ref Ids: {time_range_str}") + log.info(f"Checking ref Ids returned: {len(refIdsQS)} records") + + # iterate over query set to check each identifier status + for refId in refIdsQS: + + # set status for each handle system + identifierStatus = { + 'crossref' : False, + 'datacite' : False, + 'search' : False + } + + # check if the identifier is processed for each background job + for key, value in self.queueType.items(): + queue = value + + qs = queue.objects.filter( + Q(refIdentifier_id=refId.pk) + ) + + # if the identifier does not exist in the table + # mark as 'OK' to delete from the refIdentifier + if not qs: + identifierStatus[key] = True + continue + + for task_model in qs: + log.info('-' * 10) + log.info("Running job for identifier: " + refId.identifier + " in " + key + " queue") + + # delete identifier if the status is successfully synced or + # not applicable for this handle system + if (task_model.status==queue.SUCCESS or task_model.status==queue.IGNORED): + log.info( + "Delete identifier: " + refId.identifier + " in " + key + " queue") + identifierStatus[key] = True + self.deleteRecord(queue, task_model.pk, record_type=key, identifier=refId.identifier) + + # if the identifier is successfully processed for all the handle system + # delete it from the refIdentifier table + if all(i for i in identifierStatus.values()): + log.info( + "Delete identifier: " + refId.identifier + " from refIdentifier table.") + self.deleteRecord(self.refIdentifier, refId.pk, record_type='refId', identifier=refId.identifier) + + last_id = refId.pk + if len(refIdsQS) < BATCH_SIZE: + if updated_from is not None or updated_to is not None: + log.info(f"Finished - Checking ref Ids: {time_range_str}") + exit() + else: + log.info(f"Sleep {ASYNC_CLEANUP_SLEEP} seconds before processing next batch") + self.sleep(ASYNC_CLEANUP_SLEEP) + min_age_ts = max_age_ts + max_age_ts = min_age_ts + ASYNC_CLEANUP_SLEEP + time_range = Q(updateTime__gte=min_age_ts) & Q(updateTime__lte=max_age_ts) + time_range_str = f"updated between: {self.seconds_to_date(min_age_ts)} and {self.seconds_to_date(max_age_ts)}" + else: + self.sleep(django.conf.settings.DAEMONS_BATCH_SLEEP) + + def deleteRecord(self, queue, primary_key, record_type=None, identifier=None): + """ + Deletes the identifier record that has been successfully completed + based on the record's primary key provided + + Args: + queue : async handle queue + primary_key (str): primary key of the record to be deleted. + record_type (str): . Defaults to None. + identifier (str): . Defaults to None. + """ + try: + # check if the record to be deleted is a refIdentifier record + if (record_type is not None and record_type == 'refId'): + log.info(type(queue)) + log.info("Delete refId: " + str(primary_key)) + with transaction.atomic(): + obj = queue.objects.select_for_update().get(id=primary_key) + obj.delete() + else: + log.info(f"Delete async queue {queue.__name__} entry: " + str(primary_key)) + with transaction.atomic(): + obj = queue.objects.select_for_update().get(seq=primary_key) + obj.delete() + except Exception as e: + log.error("Exception occured while processing identifier '" + identifier + "' for '" + + record_type + "' table") + log.error(e) + + + def date_to_seconds(self, date_time_str: str) -> int: + """ + Convert date/time string to seconds since the Epotch. + For example: + 2024-01-01 00:00:00 => 1704096000 + 2024-10-10 00:00:00 => 1728543600 + + Parameter: + date_time_str: A date/time string in in ISO 8601 format without timezone. + For example: 'YYYYMMDD, YYYYMMDDTHHMMSS, YYYY-MM-DD, YYYY-MM-DDTHH:MM:SS. + + Returns: + int: seconds since the Epotch + + """ + + # Parse the date and time string to a datetime object + dt_object = parse(date_time_str) + + # Convert the datetime object to seconds since the Epoch + seconds_since_epoch = int(dt_object.timestamp()) + + return seconds_since_epoch + + + def seconds_to_date(self, seconds_since_epoch: int) -> str: + dt_object = datetime.fromtimestamp(seconds_since_epoch) + + # Format the datetime object to a string in the desired format + formatted_time = dt_object.strftime("%Y-%m-%dT%H:%M:%S") + return formatted_time \ No newline at end of file diff --git a/ezidapp/management/commands/proc-link-checker-update.py b/ezidapp/management/commands/proc-link-checker-update.py index 2cefc3eb..42d51ccd 100644 --- a/ezidapp/management/commands/proc-link-checker-update.py +++ b/ezidapp/management/commands/proc-link-checker-update.py @@ -18,6 +18,7 @@ import impl.log from impl.open_search_doc import OpenSearchDoc import opensearchpy.exceptions +import time log = logging.getLogger(__name__) @@ -98,9 +99,13 @@ def run(self): ) si2.linkIsBroken = newValue si2.computeHasIssues() - si2.save(update_fields=["linkIsBroken", "hasIssues"]) + + si2.updateTime = int(time.time()) + si2.save(update_fields=["updateTime", "linkIsBroken", "hasIssues"]) open_s = OpenSearchDoc(identifier=si2) - open_s.update_link_issues(link_is_broken=si2.linkIsBroken, has_issues=si2.hasIssues) + open_s.update_link_issues(link_is_broken=si2.linkIsBroken, + has_issues=si2.hasIssues, + update_time=si2.updateTime) except ezidapp.models.identifier.SearchIdentifier.DoesNotExist: log.exception('SearchIdentifier.DoesNotExist') except opensearchpy.exceptions.OpenSearchException as e: diff --git a/impl/open_search_doc.py b/impl/open_search_doc.py index 27fb8b1d..cd320c2d 100644 --- a/impl/open_search_doc.py +++ b/impl/open_search_doc.py @@ -18,6 +18,7 @@ from opensearchpy.exceptions import NotFoundError from django.conf import settings import urllib +import time # the functools allows memoizing the results of functions, so they're not recalculated every time (ie cached # results if called more than once on the same instance) @@ -138,10 +139,11 @@ def remove_from_index(self): return True return False - def update_link_issues(self, link_is_broken=False, has_issues=False): + # Note that this time is passed in as an integer, but it's converted to an iso datetime for opensearch + def update_link_issues(self, link_is_broken=False, has_issues=False, update_time=int(time.time())): dict_to_update = { 'open_search_updated': datetime.datetime.now().isoformat(), - 'update_time': datetime.datetime.now().isoformat(), + 'update_time': datetime.datetime.utcfromtimestamp(update_time).isoformat(), 'link_is_broken': link_is_broken, 'has_issues': has_issues } diff --git a/tests/test_open_search_doc.py b/tests/test_open_search_doc.py index 28a9a5df..e021a888 100644 --- a/tests/test_open_search_doc.py +++ b/tests/test_open_search_doc.py @@ -272,7 +272,7 @@ def test_update_link_issues(mock_client, open_search_doc): mock_client.update.return_value = mock_response # Act - result = open_search_doc.update_link_issues(link_is_broken=True, has_issues=True) + result = open_search_doc.update_link_issues(link_is_broken=True, has_issues=True, update_time=1727984570) # Assert mock_client.update.assert_called_once_with( @@ -282,7 +282,8 @@ def test_update_link_issues(mock_client, open_search_doc): 'open_search_updated': ANY, # Use unittest.mock.ANY if the exact value doesn't matter 'update_time': ANY, 'link_is_broken': True, - 'has_issues': True + 'has_issues': True, + 'update_time': '2024-10-03T19:42:50' }} ) assert result is True