Skip to content

Commit

Permalink
update request/transform/proceesing core functions to add processing …
Browse files Browse the repository at this point in the history
…thread locking infor
  • Loading branch information
wguanicedew committed Oct 15, 2024
1 parent 623cf4d commit ae2f300
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 3 deletions.
15 changes: 15 additions & 0 deletions common/lib/idds/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@
import re
import requests
import signal
import socket
import subprocess
import sys
import tarfile
import threading
import time
# import traceback

Expand Down Expand Up @@ -1067,3 +1069,16 @@ def wrapper(*args, **kwargs):
return TimeoutError(f"Function '{func.__name__}' timed out after {timeout} seconds.")
return wrapper
return decorator


def get_process_thread_info():
"""
Returns: hostname, process id, thread id and thread name
"""
hostname = socket.getfqdn()
hostname = hostname.split('.')[0]
pid = os.getpid()
hb_thread = threading.current_thread()
thread_id = hb_thread.ident
thread_name = hb_thread.name
return hostname, pid, thread_id, thread_name
12 changes: 11 additions & 1 deletion main/lib/idds/core/processings.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

from idds.orm.base.session import read_session, transactional_session
from idds.common.constants import ProcessingLocking, ProcessingStatus, ProcessingType, GranularityType, ContentRelationType
from idds.common.utils import get_list_chunks
from idds.common.utils import get_list_chunks, get_process_thread_info
from idds.orm import (processings as orm_processings,
collections as orm_collections,
contents as orm_contents,
Expand Down Expand Up @@ -126,6 +126,11 @@ def get_processing_by_id_status(processing_id, status=None, locking=False, lock_
parameters = {}
parameters['locking'] = ProcessingLocking.Locking
parameters['updated_at'] = datetime.datetime.utcnow()
hostname, pid, thread_id, thread_name = get_process_thread_info()
parameters['locking_hostname'] = hostname
parameters['locking_pid'] = pid
parameters['locking_thread_id'] = thread_id
parameters['locking_thread_name'] = thread_name
orm_processings.update_processing(processing_id=pr['processing_id'], parameters=parameters, session=session)
return pr
else:
Expand All @@ -134,6 +139,11 @@ def get_processing_by_id_status(processing_id, status=None, locking=False, lock_
parameters = {}
parameters['locking'] = ProcessingLocking.Locking
parameters['updated_at'] = datetime.datetime.utcnow()
hostname, pid, thread_id, thread_name = get_process_thread_info()
parameters['locking_hostname'] = hostname
parameters['locking_pid'] = pid
parameters['locking_thread_id'] = thread_id
parameters['locking_thread_name'] = thread_name
orm_processings.update_processing(processing_id=pr['processing_id'], parameters=parameters, session=session)
return pr
return pr
Expand Down
6 changes: 6 additions & 0 deletions main/lib/idds/core/requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from idds.common.constants import (RequestStatus, RequestLocking, WorkStatus,
CollectionType, CollectionStatus, CollectionRelationType,
MessageStatus, MetaStatus)
from idds.common.utils import get_process_thread_info
from idds.orm.base.session import read_session, transactional_session
from idds.orm import requests as orm_requests
from idds.orm import transforms as orm_transforms
Expand Down Expand Up @@ -178,6 +179,11 @@ def get_request_by_id_status(request_id, status=None, locking=False, session=Non
parameters = {}
parameters['locking'] = RequestLocking.Locking
parameters['updated_at'] = datetime.datetime.utcnow()
hostname, pid, thread_id, thread_name = get_process_thread_info()
parameters['locking_hostname'] = hostname
parameters['locking_pid'] = pid
parameters['locking_thread_id'] = thread_id
parameters['locking_thread_name'] = thread_name
orm_requests.update_request(request_id=req['request_id'], parameters=parameters, session=session)
return req

Expand Down
6 changes: 6 additions & 0 deletions main/lib/idds/core/transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

from idds.common.constants import (TransformStatus, ContentRelationType, ContentStatus,
TransformLocking, CollectionRelationType)
from idds.common.utils import get_process_thread_info
from idds.orm.base.session import read_session, transactional_session
from idds.orm import (transforms as orm_transforms,
collections as orm_collections,
Expand Down Expand Up @@ -103,6 +104,11 @@ def get_transform_by_id_status(transform_id, status=None, locking=False, session
parameters = {}
parameters['locking'] = TransformLocking.Locking
parameters['updated_at'] = datetime.datetime.utcnow()
hostname, pid, thread_id, thread_name = get_process_thread_info()
parameters['locking_hostname'] = hostname
parameters['locking_pid'] = pid
parameters['locking_thread_id'] = thread_id
parameters['locking_thread_name'] = thread_name
orm_transforms.update_transform(transform_id=tf['transform_id'], parameters=parameters, session=session)
return tf

Expand Down
3 changes: 1 addition & 2 deletions main/lib/idds/tests/panda_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import sys
import datetime

"""
os.environ['PANDA_URL'] = 'http://pandaserver-doma.cern.ch:25080/server/panda'
# os.environ['PANDA_URL_SSL'] = 'https://pandaserver-doma.cern.ch:25443/server/panda'
os.environ['PANDA_URL_SSL'] = 'https://pandaserver-doma.cern.ch:443/server/panda'
Expand All @@ -20,7 +19,6 @@

# os.environ['PANDA_URL_SSL'] = 'https://pandaserver01.sdcc.bnl.gov:25443/server/panda'
# os.environ['PANDA_URL'] = 'https://pandaserver01.sdcc.bnl.gov:25443/server/panda'
"""

from pandaclient import Client # noqa E402

Expand Down Expand Up @@ -82,6 +80,7 @@
task_ids = [i for i in range(2921, 2927)]
task_ids = [124, 68, 75, 78, 79]
task_ids = [19654]
task_ids = [16700, 16704, 17055, 17646, 17792, 18509, 19754, 21666, 21714, 21739, 16148, 16149, 16150]
for task_id in task_ids:
print("Killing %s" % task_id)
ret = Client.killTask(task_id, verbose=True)
Expand Down

0 comments on commit ae2f300

Please sign in to comment.