diff --git a/common/lib/idds/common/utils.py b/common/lib/idds/common/utils.py index e51b721b..bb611ec8 100644 --- a/common/lib/idds/common/utils.py +++ b/common/lib/idds/common/utils.py @@ -23,9 +23,11 @@ import re import requests import signal +import socket import subprocess import sys import tarfile +import threading import time # import traceback @@ -1067,3 +1069,16 @@ def wrapper(*args, **kwargs): return TimeoutError(f"Function '{func.__name__}' timed out after {timeout} seconds.") return wrapper return decorator + + +def get_process_thread_info(): + """ + Returns: hostname, process id, thread id and thread name + """ + hostname = socket.getfqdn() + hostname = hostname.split('.')[0] + pid = os.getpid() + hb_thread = threading.current_thread() + thread_id = hb_thread.ident + thread_name = hb_thread.name + return hostname, pid, thread_id, thread_name diff --git a/main/lib/idds/core/processings.py b/main/lib/idds/core/processings.py index 9ad3f211..f7a23160 100644 --- a/main/lib/idds/core/processings.py +++ b/main/lib/idds/core/processings.py @@ -17,7 +17,7 @@ from idds.orm.base.session import read_session, transactional_session from idds.common.constants import ProcessingLocking, ProcessingStatus, ProcessingType, GranularityType, ContentRelationType -from idds.common.utils import get_list_chunks +from idds.common.utils import get_list_chunks, get_process_thread_info from idds.orm import (processings as orm_processings, collections as orm_collections, contents as orm_contents, @@ -126,6 +126,11 @@ def get_processing_by_id_status(processing_id, status=None, locking=False, lock_ parameters = {} parameters['locking'] = ProcessingLocking.Locking parameters['updated_at'] = datetime.datetime.utcnow() + hostname, pid, thread_id, thread_name = get_process_thread_info() + parameters['locking_hostname'] = hostname + parameters['locking_pid'] = pid + parameters['locking_thread_id'] = thread_id + parameters['locking_thread_name'] = thread_name orm_processings.update_processing(processing_id=pr['processing_id'], parameters=parameters, session=session) return pr else: @@ -134,6 +139,11 @@ def get_processing_by_id_status(processing_id, status=None, locking=False, lock_ parameters = {} parameters['locking'] = ProcessingLocking.Locking parameters['updated_at'] = datetime.datetime.utcnow() + hostname, pid, thread_id, thread_name = get_process_thread_info() + parameters['locking_hostname'] = hostname + parameters['locking_pid'] = pid + parameters['locking_thread_id'] = thread_id + parameters['locking_thread_name'] = thread_name orm_processings.update_processing(processing_id=pr['processing_id'], parameters=parameters, session=session) return pr return pr diff --git a/main/lib/idds/core/requests.py b/main/lib/idds/core/requests.py index 4b4c3189..0fa494b6 100644 --- a/main/lib/idds/core/requests.py +++ b/main/lib/idds/core/requests.py @@ -19,6 +19,7 @@ from idds.common.constants import (RequestStatus, RequestLocking, WorkStatus, CollectionType, CollectionStatus, CollectionRelationType, MessageStatus, MetaStatus) +from idds.common.utils import get_process_thread_info from idds.orm.base.session import read_session, transactional_session from idds.orm import requests as orm_requests from idds.orm import transforms as orm_transforms @@ -178,6 +179,11 @@ def get_request_by_id_status(request_id, status=None, locking=False, session=Non parameters = {} parameters['locking'] = RequestLocking.Locking parameters['updated_at'] = datetime.datetime.utcnow() + hostname, pid, thread_id, thread_name = get_process_thread_info() + parameters['locking_hostname'] = hostname + parameters['locking_pid'] = pid + parameters['locking_thread_id'] = thread_id + parameters['locking_thread_name'] = thread_name orm_requests.update_request(request_id=req['request_id'], parameters=parameters, session=session) return req diff --git a/main/lib/idds/core/transforms.py b/main/lib/idds/core/transforms.py index 1a2d8e23..9338079d 100644 --- a/main/lib/idds/core/transforms.py +++ b/main/lib/idds/core/transforms.py @@ -20,6 +20,7 @@ from idds.common.constants import (TransformStatus, ContentRelationType, ContentStatus, TransformLocking, CollectionRelationType) +from idds.common.utils import get_process_thread_info from idds.orm.base.session import read_session, transactional_session from idds.orm import (transforms as orm_transforms, collections as orm_collections, @@ -103,6 +104,11 @@ def get_transform_by_id_status(transform_id, status=None, locking=False, session parameters = {} parameters['locking'] = TransformLocking.Locking parameters['updated_at'] = datetime.datetime.utcnow() + hostname, pid, thread_id, thread_name = get_process_thread_info() + parameters['locking_hostname'] = hostname + parameters['locking_pid'] = pid + parameters['locking_thread_id'] = thread_id + parameters['locking_thread_name'] = thread_name orm_transforms.update_transform(transform_id=tf['transform_id'], parameters=parameters, session=session) return tf diff --git a/main/lib/idds/orm/base/alembic/versions/a844dae57021_add_process_thread_locking_information.py b/main/lib/idds/orm/base/alembic/versions/a844dae57021_add_process_thread_locking_information.py new file mode 100644 index 00000000..e279505e --- /dev/null +++ b/main/lib/idds/orm/base/alembic/versions/a844dae57021_add_process_thread_locking_information.py @@ -0,0 +1,67 @@ +#!/usr/bin/env python +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0OA +# +# Authors: +# - Wen Guan, , 2024 + +"""add process thread locking information + +Revision ID: a844dae57021 +Revises: 3073c5de8f73 +Create Date: 2024-10-15 13:55:49.485737+00:00 + +""" +from alembic import op +from alembic import context +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = 'a844dae57021' +down_revision = '3073c5de8f73' +branch_labels = None +depends_on = None + + +def upgrade() -> None: + if context.get_context().dialect.name in ['oracle', 'mysql', 'postgresql']: + schema = context.get_context().version_table_schema if context.get_context().version_table_schema else '' + + op.add_column('requests', sa.Column('locking_hostname', sa.String(50)), schema=schema) + op.add_column('requests', sa.Column('locking_pid', sa.BigInteger()), schema=schema) + op.add_column('requests', sa.Column('locking_thread_id', sa.BigInteger()), schema=schema) + op.add_column('requests', sa.Column('locking_thread_name', sa.String(50)), schema=schema) + + op.add_column('transforms', sa.Column('locking_hostname', sa.String(50)), schema=schema) + op.add_column('transforms', sa.Column('locking_pid', sa.BigInteger()), schema=schema) + op.add_column('transforms', sa.Column('locking_thread_id', sa.BigInteger()), schema=schema) + op.add_column('transforms', sa.Column('locking_thread_name', sa.String(50)), schema=schema) + + op.add_column('processings', sa.Column('locking_hostname', sa.String(50)), schema=schema) + op.add_column('processings', sa.Column('locking_pid', sa.BigInteger()), schema=schema) + op.add_column('processings', sa.Column('locking_thread_id', sa.BigInteger()), schema=schema) + op.add_column('processings', sa.Column('locking_thread_name', sa.String(50)), schema=schema) + + +def downgrade() -> None: + if context.get_context().dialect.name in ['oracle', 'mysql', 'postgresql']: + schema = context.get_context().version_table_schema if context.get_context().version_table_schema else '' + + op.drop_column('requests', 'locking_hostname', schema=schema) + op.drop_column('requests', 'locking_pid', schema=schema) + op.drop_column('requests', 'locking_thread_id', schema=schema) + op.drop_column('requests', 'locking_thread_name', schema=schema) + + op.drop_column('transforms', 'locking_hostname', schema=schema) + op.drop_column('transforms', 'locking_pid', schema=schema) + op.drop_column('transforms', 'locking_thread_id', schema=schema) + op.drop_column('transforms', 'locking_thread_name', schema=schema) + + op.drop_column('processings', 'locking_hostname', schema=schema) + op.drop_column('processings', 'locking_pid', schema=schema) + op.drop_column('processings', 'locking_thread_id', schema=schema) + op.drop_column('processings', 'locking_thread_name', schema=schema) diff --git a/main/lib/idds/orm/base/models.py b/main/lib/idds/orm/base/models.py index fb228d8a..7b955f2f 100644 --- a/main/lib/idds/orm/base/models.py +++ b/main/lib/idds/orm/base/models.py @@ -160,6 +160,10 @@ class Request(BASE, ModelBase): new_poll_period = Column(Interval(), default=datetime.timedelta(seconds=1)) update_poll_period = Column(Interval(), default=datetime.timedelta(seconds=10)) site = Column(String(50)) + locking_hostname = Column(String(50)) + locking_pid = Column(BigInteger, autoincrement=False) + locking_thread_id = Column(BigInteger, autoincrement=False) + locking_thread_name = Column(String(255)) campaign = Column(String(50)) campaign_group = Column(String(250)) campaign_tag = Column(String(20)) @@ -317,6 +321,10 @@ class Transform(BASE, ModelBase): new_poll_period = Column(Interval(), default=datetime.timedelta(seconds=1)) update_poll_period = Column(Interval(), default=datetime.timedelta(seconds=10)) site = Column(String(50)) + locking_hostname = Column(String(50)) + locking_pid = Column(BigInteger, autoincrement=False) + locking_thread_id = Column(BigInteger, autoincrement=False) + locking_thread_name = Column(String(255)) name = Column(String(NAME_LENGTH)) has_previous_conditions = Column(Integer()) loop_index = Column(Integer()) @@ -430,6 +438,10 @@ class Processing(BASE, ModelBase): new_poll_period = Column(Interval(), default=datetime.timedelta(seconds=1)) update_poll_period = Column(Interval(), default=datetime.timedelta(seconds=10)) site = Column(String(50)) + locking_hostname = Column(String(50)) + locking_pid = Column(BigInteger, autoincrement=False) + locking_thread_id = Column(BigInteger, autoincrement=False) + locking_thread_name = Column(String(255)) errors = Column(JSONString(1024)) _processing_metadata = Column('processing_metadata', JSON()) _running_metadata = Column('running_metadata', JSON()) @@ -714,7 +726,7 @@ class Health(BASE, ModelBase): status = Column(EnumWithValue(HealthStatus), default=0, nullable=False) thread_id = Column(BigInteger, autoincrement=False) thread_name = Column(String(255)) - payload = Column(String(2048)) + # payload = Column(String(2048)) created_at = Column("created_at", DateTime, default=datetime.datetime.utcnow) updated_at = Column("updated_at", DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow) payload = Column(String(2048)) diff --git a/main/lib/idds/orm/base/utils.py b/main/lib/idds/orm/base/utils.py index 6e723739..10c64edb 100644 --- a/main/lib/idds/orm/base/utils.py +++ b/main/lib/idds/orm/base/utils.py @@ -39,10 +39,12 @@ def build_database(echo=True, tests=False): if config_has_option('database', 'schema'): schema = config_get('database', 'schema') - if schema and not engine.dialect.has_schema(engine, schema): + if schema: print('Schema set in config, trying to create schema:', schema) try: - engine.execute(CreateSchema(schema)) + with engine.connect() as conn: + with conn.begin(): + conn.execute(CreateSchema(schema)) except Exception as e: print('Cannot create schema, please validate manually if schema creation is needed, continuing:', e) print(traceback.format_exc()) diff --git a/main/lib/idds/tests/panda_test.py b/main/lib/idds/tests/panda_test.py index bc405a36..d29ad1b5 100644 --- a/main/lib/idds/tests/panda_test.py +++ b/main/lib/idds/tests/panda_test.py @@ -3,7 +3,6 @@ import sys import datetime -""" os.environ['PANDA_URL'] = 'http://pandaserver-doma.cern.ch:25080/server/panda' # os.environ['PANDA_URL_SSL'] = 'https://pandaserver-doma.cern.ch:25443/server/panda' os.environ['PANDA_URL_SSL'] = 'https://pandaserver-doma.cern.ch:443/server/panda' @@ -20,7 +19,6 @@ # os.environ['PANDA_URL_SSL'] = 'https://pandaserver01.sdcc.bnl.gov:25443/server/panda' # os.environ['PANDA_URL'] = 'https://pandaserver01.sdcc.bnl.gov:25443/server/panda' -""" from pandaclient import Client # noqa E402 @@ -82,6 +80,7 @@ task_ids = [i for i in range(2921, 2927)] task_ids = [124, 68, 75, 78, 79] task_ids = [19654] +task_ids = [16700, 16704, 17055, 17646, 17792, 18509, 19754, 21666, 21714, 21739, 16148, 16149, 16150] for task_id in task_ids: print("Killing %s" % task_id) ret = Client.killTask(task_id, verbose=True)