Skip to content

Commit

Permalink
Merge pull request #355 from HSF/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
wguanicedew authored Oct 15, 2024
2 parents 5d2e353 + fbd7063 commit 1ea71bb
Show file tree
Hide file tree
Showing 8 changed files with 123 additions and 6 deletions.
15 changes: 15 additions & 0 deletions common/lib/idds/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@
import re
import requests
import signal
import socket
import subprocess
import sys
import tarfile
import threading
import time
# import traceback

Expand Down Expand Up @@ -1067,3 +1069,16 @@ def wrapper(*args, **kwargs):
return TimeoutError(f"Function '{func.__name__}' timed out after {timeout} seconds.")
return wrapper
return decorator


def get_process_thread_info():
"""
Returns: hostname, process id, thread id and thread name
"""
hostname = socket.getfqdn()
hostname = hostname.split('.')[0]
pid = os.getpid()
hb_thread = threading.current_thread()
thread_id = hb_thread.ident
thread_name = hb_thread.name
return hostname, pid, thread_id, thread_name
12 changes: 11 additions & 1 deletion main/lib/idds/core/processings.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

from idds.orm.base.session import read_session, transactional_session
from idds.common.constants import ProcessingLocking, ProcessingStatus, ProcessingType, GranularityType, ContentRelationType
from idds.common.utils import get_list_chunks
from idds.common.utils import get_list_chunks, get_process_thread_info
from idds.orm import (processings as orm_processings,
collections as orm_collections,
contents as orm_contents,
Expand Down Expand Up @@ -126,6 +126,11 @@ def get_processing_by_id_status(processing_id, status=None, locking=False, lock_
parameters = {}
parameters['locking'] = ProcessingLocking.Locking
parameters['updated_at'] = datetime.datetime.utcnow()
hostname, pid, thread_id, thread_name = get_process_thread_info()
parameters['locking_hostname'] = hostname
parameters['locking_pid'] = pid
parameters['locking_thread_id'] = thread_id
parameters['locking_thread_name'] = thread_name
orm_processings.update_processing(processing_id=pr['processing_id'], parameters=parameters, session=session)
return pr
else:
Expand All @@ -134,6 +139,11 @@ def get_processing_by_id_status(processing_id, status=None, locking=False, lock_
parameters = {}
parameters['locking'] = ProcessingLocking.Locking
parameters['updated_at'] = datetime.datetime.utcnow()
hostname, pid, thread_id, thread_name = get_process_thread_info()
parameters['locking_hostname'] = hostname
parameters['locking_pid'] = pid
parameters['locking_thread_id'] = thread_id
parameters['locking_thread_name'] = thread_name
orm_processings.update_processing(processing_id=pr['processing_id'], parameters=parameters, session=session)
return pr
return pr
Expand Down
6 changes: 6 additions & 0 deletions main/lib/idds/core/requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from idds.common.constants import (RequestStatus, RequestLocking, WorkStatus,
CollectionType, CollectionStatus, CollectionRelationType,
MessageStatus, MetaStatus)
from idds.common.utils import get_process_thread_info
from idds.orm.base.session import read_session, transactional_session
from idds.orm import requests as orm_requests
from idds.orm import transforms as orm_transforms
Expand Down Expand Up @@ -178,6 +179,11 @@ def get_request_by_id_status(request_id, status=None, locking=False, session=Non
parameters = {}
parameters['locking'] = RequestLocking.Locking
parameters['updated_at'] = datetime.datetime.utcnow()
hostname, pid, thread_id, thread_name = get_process_thread_info()
parameters['locking_hostname'] = hostname
parameters['locking_pid'] = pid
parameters['locking_thread_id'] = thread_id
parameters['locking_thread_name'] = thread_name
orm_requests.update_request(request_id=req['request_id'], parameters=parameters, session=session)
return req

Expand Down
6 changes: 6 additions & 0 deletions main/lib/idds/core/transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

from idds.common.constants import (TransformStatus, ContentRelationType, ContentStatus,
TransformLocking, CollectionRelationType)
from idds.common.utils import get_process_thread_info
from idds.orm.base.session import read_session, transactional_session
from idds.orm import (transforms as orm_transforms,
collections as orm_collections,
Expand Down Expand Up @@ -103,6 +104,11 @@ def get_transform_by_id_status(transform_id, status=None, locking=False, session
parameters = {}
parameters['locking'] = TransformLocking.Locking
parameters['updated_at'] = datetime.datetime.utcnow()
hostname, pid, thread_id, thread_name = get_process_thread_info()
parameters['locking_hostname'] = hostname
parameters['locking_pid'] = pid
parameters['locking_thread_id'] = thread_id
parameters['locking_thread_name'] = thread_name
orm_transforms.update_transform(transform_id=tf['transform_id'], parameters=parameters, session=session)
return tf

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
#!/usr/bin/env python
#
# Licensed under the Apache License, Version 2.0 (the "License");
# You may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0OA
#
# Authors:
# - Wen Guan, <[email protected]>, 2024

"""add process thread locking information
Revision ID: a844dae57021
Revises: 3073c5de8f73
Create Date: 2024-10-15 13:55:49.485737+00:00
"""
from alembic import op
from alembic import context
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = 'a844dae57021'
down_revision = '3073c5de8f73'
branch_labels = None
depends_on = None


def upgrade() -> None:
if context.get_context().dialect.name in ['oracle', 'mysql', 'postgresql']:
schema = context.get_context().version_table_schema if context.get_context().version_table_schema else ''

op.add_column('requests', sa.Column('locking_hostname', sa.String(50)), schema=schema)
op.add_column('requests', sa.Column('locking_pid', sa.BigInteger()), schema=schema)
op.add_column('requests', sa.Column('locking_thread_id', sa.BigInteger()), schema=schema)
op.add_column('requests', sa.Column('locking_thread_name', sa.String(50)), schema=schema)

op.add_column('transforms', sa.Column('locking_hostname', sa.String(50)), schema=schema)
op.add_column('transforms', sa.Column('locking_pid', sa.BigInteger()), schema=schema)
op.add_column('transforms', sa.Column('locking_thread_id', sa.BigInteger()), schema=schema)
op.add_column('transforms', sa.Column('locking_thread_name', sa.String(50)), schema=schema)

op.add_column('processings', sa.Column('locking_hostname', sa.String(50)), schema=schema)
op.add_column('processings', sa.Column('locking_pid', sa.BigInteger()), schema=schema)
op.add_column('processings', sa.Column('locking_thread_id', sa.BigInteger()), schema=schema)
op.add_column('processings', sa.Column('locking_thread_name', sa.String(50)), schema=schema)


def downgrade() -> None:
if context.get_context().dialect.name in ['oracle', 'mysql', 'postgresql']:
schema = context.get_context().version_table_schema if context.get_context().version_table_schema else ''

op.drop_column('requests', 'locking_hostname', schema=schema)
op.drop_column('requests', 'locking_pid', schema=schema)
op.drop_column('requests', 'locking_thread_id', schema=schema)
op.drop_column('requests', 'locking_thread_name', schema=schema)

op.drop_column('transforms', 'locking_hostname', schema=schema)
op.drop_column('transforms', 'locking_pid', schema=schema)
op.drop_column('transforms', 'locking_thread_id', schema=schema)
op.drop_column('transforms', 'locking_thread_name', schema=schema)

op.drop_column('processings', 'locking_hostname', schema=schema)
op.drop_column('processings', 'locking_pid', schema=schema)
op.drop_column('processings', 'locking_thread_id', schema=schema)
op.drop_column('processings', 'locking_thread_name', schema=schema)
14 changes: 13 additions & 1 deletion main/lib/idds/orm/base/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,10 @@ class Request(BASE, ModelBase):
new_poll_period = Column(Interval(), default=datetime.timedelta(seconds=1))
update_poll_period = Column(Interval(), default=datetime.timedelta(seconds=10))
site = Column(String(50))
locking_hostname = Column(String(50))
locking_pid = Column(BigInteger, autoincrement=False)
locking_thread_id = Column(BigInteger, autoincrement=False)
locking_thread_name = Column(String(255))
campaign = Column(String(50))
campaign_group = Column(String(250))
campaign_tag = Column(String(20))
Expand Down Expand Up @@ -317,6 +321,10 @@ class Transform(BASE, ModelBase):
new_poll_period = Column(Interval(), default=datetime.timedelta(seconds=1))
update_poll_period = Column(Interval(), default=datetime.timedelta(seconds=10))
site = Column(String(50))
locking_hostname = Column(String(50))
locking_pid = Column(BigInteger, autoincrement=False)
locking_thread_id = Column(BigInteger, autoincrement=False)
locking_thread_name = Column(String(255))
name = Column(String(NAME_LENGTH))
has_previous_conditions = Column(Integer())
loop_index = Column(Integer())
Expand Down Expand Up @@ -430,6 +438,10 @@ class Processing(BASE, ModelBase):
new_poll_period = Column(Interval(), default=datetime.timedelta(seconds=1))
update_poll_period = Column(Interval(), default=datetime.timedelta(seconds=10))
site = Column(String(50))
locking_hostname = Column(String(50))
locking_pid = Column(BigInteger, autoincrement=False)
locking_thread_id = Column(BigInteger, autoincrement=False)
locking_thread_name = Column(String(255))
errors = Column(JSONString(1024))
_processing_metadata = Column('processing_metadata', JSON())
_running_metadata = Column('running_metadata', JSON())
Expand Down Expand Up @@ -714,7 +726,7 @@ class Health(BASE, ModelBase):
status = Column(EnumWithValue(HealthStatus), default=0, nullable=False)
thread_id = Column(BigInteger, autoincrement=False)
thread_name = Column(String(255))
payload = Column(String(2048))
# payload = Column(String(2048))
created_at = Column("created_at", DateTime, default=datetime.datetime.utcnow)
updated_at = Column("updated_at", DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow)
payload = Column(String(2048))
Expand Down
6 changes: 4 additions & 2 deletions main/lib/idds/orm/base/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,12 @@ def build_database(echo=True, tests=False):

if config_has_option('database', 'schema'):
schema = config_get('database', 'schema')
if schema and not engine.dialect.has_schema(engine, schema):
if schema:
print('Schema set in config, trying to create schema:', schema)
try:
engine.execute(CreateSchema(schema))
with engine.connect() as conn:
with conn.begin():
conn.execute(CreateSchema(schema))
except Exception as e:
print('Cannot create schema, please validate manually if schema creation is needed, continuing:', e)
print(traceback.format_exc())
Expand Down
3 changes: 1 addition & 2 deletions main/lib/idds/tests/panda_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import sys
import datetime

"""
os.environ['PANDA_URL'] = 'http://pandaserver-doma.cern.ch:25080/server/panda'
# os.environ['PANDA_URL_SSL'] = 'https://pandaserver-doma.cern.ch:25443/server/panda'
os.environ['PANDA_URL_SSL'] = 'https://pandaserver-doma.cern.ch:443/server/panda'
Expand All @@ -20,7 +19,6 @@

# os.environ['PANDA_URL_SSL'] = 'https://pandaserver01.sdcc.bnl.gov:25443/server/panda'
# os.environ['PANDA_URL'] = 'https://pandaserver01.sdcc.bnl.gov:25443/server/panda'
"""

from pandaclient import Client # noqa E402

Expand Down Expand Up @@ -82,6 +80,7 @@
task_ids = [i for i in range(2921, 2927)]
task_ids = [124, 68, 75, 78, 79]
task_ids = [19654]
task_ids = [16700, 16704, 17055, 17646, 17792, 18509, 19754, 21666, 21714, 21739, 16148, 16149, 16150]
for task_id in task_ids:
print("Killing %s" % task_id)
ret = Client.killTask(task_id, verbose=True)
Expand Down

0 comments on commit 1ea71bb

Please sign in to comment.