Skip to content

Commit

Permalink
Merge pull request #775 from CDLUC3/develop
Browse files Browse the repository at this point in the history
Merge develop to main
  • Loading branch information
jsjiang authored Oct 30, 2024
2 parents d48e634 + e071c5d commit 1d9f7b8
Show file tree
Hide file tree
Showing 5 changed files with 371 additions and 6 deletions.
112 changes: 112 additions & 0 deletions ezidapp/management/commands/opensearch-delete.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
from django.core.management.base import BaseCommand
from django.conf import settings
from ezidapp.models.identifier import SearchIdentifier
from impl.open_search_doc import OpenSearchDoc
import json
from django.db import connection
import time

SPLIT_SIZE = 100

# run: python manage.py opensearch-delete

class Command(BaseCommand):
def handle(self, *args, **options):
# iterate through all items in the OpenSearch index and check against the database
# SearchIdentifier table to find removed items and remove them from the index

# Initialize the OpenSearch client
client = OpenSearchDoc.CLIENT
index_name=settings.OPENSEARCH_INDEX

# Start the scroll
response = client.search(
index=index_name,
body={
"query": {
"match_all": {}
}
},
scroll='2m', # Keep the scroll context alive for 2 minutes
size=100 # Number of results per batch
)

# Extract the scroll ID and the initial batch of results
scroll_id = response['_scroll_id']
hits = response['hits']['hits']

checked_count = 100

# Continue scrolling until no more results are returned
while len(hits) > 0:
ids = [hit['_id'] for hit in hits]

# Convert the list of identifiers to a string format suitable for SQL. This UNION ALL is janky
# but MySQL doesn't support FROM VALUES. The other option was to create a temporary table every time, but
# that seemed like overkill.
ids_union = ' UNION ALL '.join(f"SELECT %s AS identifier" for _ in ids)

# Raw SQL query to find identifiers in the list that are not in the database
query = f"""
SELECT id_list.identifier
FROM ({ids_union}) AS id_list
LEFT JOIN ezidapp_searchidentifier AS si ON id_list.identifier = si.identifier
WHERE si.identifier IS NULL;
"""

# Execute the query
with connection.cursor() as cursor:
cursor.execute(query, ids)
missing_identifiers = [row[0] for row in cursor.fetchall()]

missing_identifiers_list = list(missing_identifiers)

if len(missing_identifiers_list) > 0:
# Create the bulk delete request payload
bulk_delete_payload = ""
for identifier in missing_identifiers_list:
bulk_delete_payload += json.dumps(
{"delete": {"_index": index_name, "_id": identifier}}) + "\n"

# Send the bulk delete request to OpenSearch
response = client.bulk(body=bulk_delete_payload)

# Check the response
if response['errors']:
print(f" Errors occurred during bulk delete of {', '.join(missing_identifiers_list)}")
else:
print(f" Bulk delete successful deleting {', '.join(missing_identifiers_list)}")

print("checked:", checked_count)

try:
response = self.scroll_with_retry(client, scroll_id)
except Exception as e:
print(e)
break


scroll_id = response['_scroll_id']
hits = response['hits']['hits']
checked_count += len(hits)

# Clear the scroll context
client.clear_scroll(scroll_id=scroll_id)
print("Done removing deleted IDs")

@staticmethod
def scroll_with_retry(client, scroll_id, max_retries=5, sleep_time=5):
for attempt in range(max_retries):
try:
response = client.scroll(
scroll_id=scroll_id,
scroll='2m'
)
return response
except Exception as e:
if attempt < max_retries - 1:
print(f" Scroll attempt {attempt + 1} failed, retrying in {sleep_time} seconds...")
time.sleep(sleep_time)
else:
print(f" Scroll attempt {attempt + 1} failed, no more retries.")
raise e
245 changes: 245 additions & 0 deletions ezidapp/management/commands/proc-cleanup-async-queues_v2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
#! /usr/bin/env python

# Copyright©2021, Regents of the University of California
# http://creativecommons.org/licenses/BSD

"""
Clean up entries that are successfully completed or are a 'no-op'
Identifier operation entries are retrieved by querying the database;
operations that successfully completed or are a no-op are deleted based on
pre-set interval.
"""

import logging
import time
from datetime import datetime
from dateutil.parser import parse

import django.conf
import django.db
from django.db import transaction
from django.db.models import Q

import ezidapp.management.commands.proc_base
import ezidapp.models.identifier
import ezidapp.models.shoulder


log = logging.getLogger(__name__)


class Command(ezidapp.management.commands.proc_base.AsyncProcessingCommand):
help = __doc__
name = __name__

setting = 'DAEMONS_QUEUE_CLEANUP_ENABLED'

queueType = {
'crossref': ezidapp.models.async_queue.CrossrefQueue,
'datacite': ezidapp.models.async_queue.DataciteQueue,
'search': ezidapp.models.async_queue.SearchIndexerQueue
}

refIdentifier = ezidapp.models.identifier.RefIdentifier

def __init__(self):
super().__init__()

def add_arguments(self, parser):
super().add_arguments(parser)
parser.add_argument(
'--pagesize', help='Rows in each batch select.', type=int)

parser.add_argument(
'--updated_range_from', type=str,
help = (
'Updated date range from - local date/time in ISO 8601 format without timezone \n'
'YYYYMMDD, YYYYMMDDTHHMMSS, YYYY-MM-DD, YYYY-MM-DDTHH:MM:SS. \n'
'Examples: 20241001, 20241001T131001, 2024-10-01, 2024-10-01T13:10:01 or 2024-10-01'
)
)

parser.add_argument(
'--updated_range_to', type=str,
help = (
'Updated date range to - local date/time in ISO 8601 format without timezone \n'
'YYYYMMDD, YYYYMMDDTHHMMSS, YYYY-MM-DD, YYYY-MM-DDTHH:MM:SS. \n'
'Examples: 20241001, 20241001T131001, 2024-10-01, 2024-10-01T13:10:01 or 2024-10-01'
)
)


def run(self):
"""
Checks for the successfully processed identifier
Args:
None
"""
ASYNC_CLEANUP_SLEEP = 60 * 10

BATCH_SIZE = self.opt.pagesize
if BATCH_SIZE is None:
BATCH_SIZE = 10000

updated_from = None
updated_to = None
updated_from_str = self.opt.updated_range_from
updated_to_str = self.opt.updated_range_to
if updated_from_str is not None:
try:
updated_from = self.date_to_seconds(updated_from_str)
except Exception as ex:
log.error(f"Input date/time error: {ex}")
exit()
if updated_to_str is not None:
try:
updated_to = self.date_to_seconds(updated_to_str)
except Exception as ex:
log.error(f"Input date/time error: {ex}")
exit()

if updated_from is not None and updated_to is not None:
time_range = Q(updateTime__gte=updated_from) & Q(updateTime__lte=updated_to)
time_range_str = f"updated between: {updated_from_str} and {updated_to_str}"
elif updated_to is not None:
time_range = Q(updateTime__lte=updated_to)
time_range_str = f"updated before: {updated_to_str}"
else:
max_age_ts = int(time.time()) - django.conf.settings.DAEMONS_EXPUNGE_MAX_AGE_SEC
min_age_ts = max_age_ts - django.conf.settings.DAEMONS_EXPUNGE_MAX_AGE_SEC
time_range = Q(updateTime__gte=min_age_ts) & Q(updateTime__lte=max_age_ts)
time_range_str = f"updated between: {self.seconds_to_date(min_age_ts)} and {self.seconds_to_date(max_age_ts)}"

last_id = 0
# keep running until terminated
while not self.terminated():
# retrieve identifiers with update timestamp within a date range
filter = time_range & Q(id__gt=last_id)
refIdsQS = self.refIdentifier.objects.filter(filter).order_by("pk")[: BATCH_SIZE]

log.info(f"Checking ref Ids: {time_range_str}")
log.info(f"Checking ref Ids returned: {len(refIdsQS)} records")

# iterate over query set to check each identifier status
for refId in refIdsQS:

# set status for each handle system
identifierStatus = {
'crossref' : False,
'datacite' : False,
'search' : False
}

# check if the identifier is processed for each background job
for key, value in self.queueType.items():
queue = value

qs = queue.objects.filter(
Q(refIdentifier_id=refId.pk)
)

# if the identifier does not exist in the table
# mark as 'OK' to delete from the refIdentifier
if not qs:
identifierStatus[key] = True
continue

for task_model in qs:
log.info('-' * 10)
log.info("Running job for identifier: " + refId.identifier + " in " + key + " queue")

# delete identifier if the status is successfully synced or
# not applicable for this handle system
if (task_model.status==queue.SUCCESS or task_model.status==queue.IGNORED):
log.info(
"Delete identifier: " + refId.identifier + " in " + key + " queue")
identifierStatus[key] = True
self.deleteRecord(queue, task_model.pk, record_type=key, identifier=refId.identifier)

# if the identifier is successfully processed for all the handle system
# delete it from the refIdentifier table
if all(i for i in identifierStatus.values()):
log.info(
"Delete identifier: " + refId.identifier + " from refIdentifier table.")
self.deleteRecord(self.refIdentifier, refId.pk, record_type='refId', identifier=refId.identifier)

last_id = refId.pk
if len(refIdsQS) < BATCH_SIZE:
if updated_from is not None or updated_to is not None:
log.info(f"Finished - Checking ref Ids: {time_range_str}")
exit()
else:
log.info(f"Sleep {ASYNC_CLEANUP_SLEEP} seconds before processing next batch")
self.sleep(ASYNC_CLEANUP_SLEEP)
min_age_ts = max_age_ts
max_age_ts = min_age_ts + ASYNC_CLEANUP_SLEEP
time_range = Q(updateTime__gte=min_age_ts) & Q(updateTime__lte=max_age_ts)
time_range_str = f"updated between: {self.seconds_to_date(min_age_ts)} and {self.seconds_to_date(max_age_ts)}"
else:
self.sleep(django.conf.settings.DAEMONS_BATCH_SLEEP)

def deleteRecord(self, queue, primary_key, record_type=None, identifier=None):
"""
Deletes the identifier record that has been successfully completed
based on the record's primary key provided
Args:
queue : async handle queue
primary_key (str): primary key of the record to be deleted.
record_type (str): . Defaults to None.
identifier (str): . Defaults to None.
"""
try:
# check if the record to be deleted is a refIdentifier record
if (record_type is not None and record_type == 'refId'):
log.info(type(queue))
log.info("Delete refId: " + str(primary_key))
with transaction.atomic():
obj = queue.objects.select_for_update().get(id=primary_key)
obj.delete()
else:
log.info(f"Delete async queue {queue.__name__} entry: " + str(primary_key))
with transaction.atomic():
obj = queue.objects.select_for_update().get(seq=primary_key)
obj.delete()
except Exception as e:
log.error("Exception occured while processing identifier '" + identifier + "' for '" +
record_type + "' table")
log.error(e)


def date_to_seconds(self, date_time_str: str) -> int:
"""
Convert date/time string to seconds since the Epotch.
For example:
2024-01-01 00:00:00 => 1704096000
2024-10-10 00:00:00 => 1728543600
Parameter:
date_time_str: A date/time string in in ISO 8601 format without timezone.
For example: 'YYYYMMDD, YYYYMMDDTHHMMSS, YYYY-MM-DD, YYYY-MM-DDTHH:MM:SS.
Returns:
int: seconds since the Epotch
"""

# Parse the date and time string to a datetime object
dt_object = parse(date_time_str)

# Convert the datetime object to seconds since the Epoch
seconds_since_epoch = int(dt_object.timestamp())

return seconds_since_epoch


def seconds_to_date(self, seconds_since_epoch: int) -> str:
dt_object = datetime.fromtimestamp(seconds_since_epoch)

# Format the datetime object to a string in the desired format
formatted_time = dt_object.strftime("%Y-%m-%dT%H:%M:%S")
return formatted_time
9 changes: 7 additions & 2 deletions ezidapp/management/commands/proc-link-checker-update.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import impl.log
from impl.open_search_doc import OpenSearchDoc
import opensearchpy.exceptions
import time

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -98,9 +99,13 @@ def run(self):
)
si2.linkIsBroken = newValue
si2.computeHasIssues()
si2.save(update_fields=["linkIsBroken", "hasIssues"])

si2.updateTime = int(time.time())
si2.save(update_fields=["updateTime", "linkIsBroken", "hasIssues"])
open_s = OpenSearchDoc(identifier=si2)
open_s.update_link_issues(link_is_broken=si2.linkIsBroken, has_issues=si2.hasIssues)
open_s.update_link_issues(link_is_broken=si2.linkIsBroken,
has_issues=si2.hasIssues,
update_time=si2.updateTime)
except ezidapp.models.identifier.SearchIdentifier.DoesNotExist:
log.exception('SearchIdentifier.DoesNotExist')
except opensearchpy.exceptions.OpenSearchException as e:
Expand Down
Loading

0 comments on commit 1d9f7b8

Please sign in to comment.