diff --git a/main/lib/idds/agents/carrier/utils.py b/main/lib/idds/agents/carrier/utils.py index 443facb5..0c69c797 100644 --- a/main/lib/idds/agents/carrier/utils.py +++ b/main/lib/idds/agents/carrier/utils.py @@ -1936,7 +1936,8 @@ def sync_collection_status(request_id, transform_id, workload_id, work, input_ou coll_status[content['coll_id']]['bytes'] += content['bytes'] elif content['status'] in [ContentStatus.New]: coll_status[content['coll_id']]['new_files'] += 1 - elif content['status'] in [ContentStatus.Failed, ContentStatus.FinalFailed]: + elif content['status'] in [ContentStatus.Failed, ContentStatus.FinalFailed, + ContentStatus.SubAvailable, ContentStatus.FinalSubAvailable]: coll_status[content['coll_id']]['failed_files'] += 1 elif content['status'] in [ContentStatus.Lost, ContentStatus.Deleted, ContentStatus.Missing]: coll_status[content['coll_id']]['missing_files'] += 1 @@ -1958,7 +1959,8 @@ def sync_collection_status(request_id, transform_id, workload_id, work, input_ou ContentStatus.FakeAvailable, ContentStatus.FakeAvailable.value]: coll_status[content['coll_id']]['processed_ext_files'] += 1 # elif content['status'] in [ContentStatus.Failed, ContentStatus.FinalFailed]: - elif content['status'] in [ContentStatus.Failed, ContentStatus.FinalFailed]: + elif content['status'] in [ContentStatus.Failed, ContentStatus.FinalFailed, + ContentStatus.SubAvailable, ContentStatus.FinalSubAvailable]: coll_status[content['coll_id']]['failed_ext_files'] += 1 elif content['status'] in [ContentStatus.Lost, ContentStatus.Deleted, ContentStatus.Missing]: coll_status[content['coll_id']]['missing_ext_files'] += 1 diff --git a/main/lib/idds/agents/transformer/transformer.py b/main/lib/idds/agents/transformer/transformer.py index 9f1ec46e..99c27299 100644 --- a/main/lib/idds/agents/transformer/transformer.py +++ b/main/lib/idds/agents/transformer/transformer.py @@ -19,6 +19,7 @@ TransformStatus, TransformLocking, CollectionType, CollectionStatus, CollectionRelationType, + ContentStatus, ContentRelationType, CommandType, ProcessingStatus, WorkflowType, ConditionStatus, get_processing_type_from_transform_type, @@ -26,6 +27,7 @@ from idds.common.utils import setup_logging, truncate_string from idds.core import (transforms as core_transforms, processings as core_processings, + catalog as core_catalog, throttlers as core_throttlers, conditions as core_conditions) from idds.agents.common.baseagent import BaseAgent @@ -103,7 +105,7 @@ def __init__(self, num_threads=1, max_number_workers=8, poll_period=1800, retrie if hasattr(self, 'cache_expire_seconds'): self.cache_expire_seconds = int(self.cache_expire_seconds) else: - self.cache_expire_seconds = 3600 + self.cache_expire_seconds = 300 def is_ok_to_run_more_transforms(self): if self.number_workers >= self.max_number_workers: @@ -117,6 +119,9 @@ def show_queue_size(self): self.logger.debug(q_str) def get_throttlers(self): + """ + Use throttler + """ cache = get_redis_cache() throttlers = cache.get("throttlers", default=None) if throttlers is None: @@ -133,6 +138,101 @@ def get_throttlers(self): cache.set("throttlers", throttlers, expire_seconds=self.cache_expire_seconds) return throttlers + def get_num_active_transforms(self, site_name): + cache = get_redis_cache() + num_transforms = cache.get("num_transforms", default=None) + if num_transforms is None: + num_transforms = {} + active_status = [TransformStatus.New, TransformStatus.Ready] + active_status1 = [TransformStatus.Transforming, TransformStatus.Terminating] + rets = core_transforms.get_num_active_transforms(active_status + active_status1) + for ret in rets: + status, site, count = ret + if site is None: + site = 'Default' + if site not in num_transforms: + num_transforms[site] = {'new': 0, 'processing': 0} + if status in active_status: + num_transforms[site]['new'] += count + elif status in active_status1: + num_transforms[site]['processing'] += count + cache.set("num_transforms", num_transforms, expire_seconds=self.cache_expire_seconds) + default_value = {'new': 0, 'processing': 0} + return num_transforms.get(site_name, default_value) + + def get_num_active_processings(self, site_name): + cache = get_redis_cache() + num_processings = cache.get("num_processings", default=None) + active_transforms = cache.get("active_transforms", default={}) + if num_processings is None: + num_processings = {} + active_transforms = {} + active_status = [ProcessingStatus.New] + active_status1 = [ProcessingStatus.Submitting, ProcessingStatus.Submitted, + ProcessingStatus.Running, ProcessingStatus.Terminating, ProcessingStatus.ToTrigger, + ProcessingStatus.Triggering] + rets = core_processings.get_active_processings(active_status + active_status1) + for ret in rets: + req_id, trf_id, pr_id, site, status = ret + if site is None: + site = 'Default' + if site not in num_processings: + num_processings[site] = {'new': 0, 'processing': 0} + active_transforms[site] = [] + if status in active_status: + num_processings[site]['new'] += 1 + elif status in active_status1: + num_processings[site]['processing'] += 1 + active_transforms[site].append(trf_id) + cache.set("num_processings", num_processings, expire_seconds=self.cache_expire_seconds) + cache.set("active_transforms", active_transforms, expire_seconds=self.cache_expire_seconds) + default_value = {'new': 0, 'processing': 0} + return num_processings.get(site_name, default_value), active_transforms + + def get_num_active_contents(self, site_name, active_transform_ids): + cache = get_redis_cache() + # 1. input contents not terminated + # 2. output contents not terminated + tf_id_site_map = {} + all_tf_ids = [] + for site in active_transform_ids: + all_tf_ids += active_transform_ids[site] + for tf_id in active_transform_ids[site]: + tf_id_site_map[tf_id] = site + + num_input_contents = cache.get("num_input_contents", default=None) + num_output_contents = cache.get("num_output_contents", default=None) + if num_input_contents is None or num_output_contents is None: + num_input_contents, num_output_contents = {}, {} + if all_tf_ids: + ret = core_catalog.get_content_status_statistics_by_relation_type(all_tf_ids) + for item in ret: + status, relation_type, transform_id, count = item + site = tf_id_site_map[transform_id] + if site not in num_input_contents: + num_input_contents[site] = {'new': 0, 'activated': 0, 'processed': 0} + num_output_contents[site] = {'new': 0, 'activated': 0, 'processed': 0} + if status in [ContentStatus.New]: + if relation_type == ContentRelationType.Input: + num_input_contents[site]['new'] += count + elif relation_type == ContentRelationType.Output: + num_output_contents[site]['new'] += count + if status in [ContentStatus.Activated]: + if relation_type == ContentRelationType.Input: + num_input_contents[site]['activated'] += count + elif relation_type == ContentRelationType.Output: + num_output_contents[site]['activated'] += count + else: + if relation_type == ContentRelationType.Input: + num_input_contents[site]['processed'] += count + elif relation_type == ContentRelationType.Output: + num_output_contents[site]['processed'] += count + + cache.set("num_input_contents", num_input_contents, expire_seconds=self.cache_expire_seconds) + cache.set("num_output_contents", num_output_contents, expire_seconds=self.cache_expire_seconds) + default_value = {'new': 0, 'activated': 0, 'processed': 0} + return num_input_contents.get(site_name, default_value), num_output_contents.get(site_name, default_value) + def whether_to_throttle(self, transform): try: site = transform['site'] diff --git a/main/lib/idds/orm/base/alembic/versions/a844dae57021_add_process_thread_locking_information.py b/main/lib/idds/orm/base/alembic/versions/a844dae57021_add_process_thread_locking_information.py index e279505e..def6acff 100644 --- a/main/lib/idds/orm/base/alembic/versions/a844dae57021_add_process_thread_locking_information.py +++ b/main/lib/idds/orm/base/alembic/versions/a844dae57021_add_process_thread_locking_information.py @@ -34,17 +34,17 @@ def upgrade() -> None: op.add_column('requests', sa.Column('locking_hostname', sa.String(50)), schema=schema) op.add_column('requests', sa.Column('locking_pid', sa.BigInteger()), schema=schema) op.add_column('requests', sa.Column('locking_thread_id', sa.BigInteger()), schema=schema) - op.add_column('requests', sa.Column('locking_thread_name', sa.String(50)), schema=schema) + op.add_column('requests', sa.Column('locking_thread_name', sa.String(100)), schema=schema) op.add_column('transforms', sa.Column('locking_hostname', sa.String(50)), schema=schema) op.add_column('transforms', sa.Column('locking_pid', sa.BigInteger()), schema=schema) op.add_column('transforms', sa.Column('locking_thread_id', sa.BigInteger()), schema=schema) - op.add_column('transforms', sa.Column('locking_thread_name', sa.String(50)), schema=schema) + op.add_column('transforms', sa.Column('locking_thread_name', sa.String(100)), schema=schema) op.add_column('processings', sa.Column('locking_hostname', sa.String(50)), schema=schema) op.add_column('processings', sa.Column('locking_pid', sa.BigInteger()), schema=schema) op.add_column('processings', sa.Column('locking_thread_id', sa.BigInteger()), schema=schema) - op.add_column('processings', sa.Column('locking_thread_name', sa.String(50)), schema=schema) + op.add_column('processings', sa.Column('locking_thread_name', sa.String(100)), schema=schema) def downgrade() -> None: diff --git a/main/lib/idds/orm/base/models.py b/main/lib/idds/orm/base/models.py index 7b955f2f..e65085df 100644 --- a/main/lib/idds/orm/base/models.py +++ b/main/lib/idds/orm/base/models.py @@ -163,7 +163,7 @@ class Request(BASE, ModelBase): locking_hostname = Column(String(50)) locking_pid = Column(BigInteger, autoincrement=False) locking_thread_id = Column(BigInteger, autoincrement=False) - locking_thread_name = Column(String(255)) + locking_thread_name = Column(String(100)) campaign = Column(String(50)) campaign_group = Column(String(250)) campaign_tag = Column(String(20)) @@ -324,7 +324,7 @@ class Transform(BASE, ModelBase): locking_hostname = Column(String(50)) locking_pid = Column(BigInteger, autoincrement=False) locking_thread_id = Column(BigInteger, autoincrement=False) - locking_thread_name = Column(String(255)) + locking_thread_name = Column(String(100)) name = Column(String(NAME_LENGTH)) has_previous_conditions = Column(Integer()) loop_index = Column(Integer()) @@ -441,7 +441,7 @@ class Processing(BASE, ModelBase): locking_hostname = Column(String(50)) locking_pid = Column(BigInteger, autoincrement=False) locking_thread_id = Column(BigInteger, autoincrement=False) - locking_thread_name = Column(String(255)) + locking_thread_name = Column(String(100)) errors = Column(JSONString(1024)) _processing_metadata = Column('processing_metadata', JSON()) _running_metadata = Column('running_metadata', JSON()) diff --git a/main/lib/idds/orm/collections.py b/main/lib/idds/orm/collections.py index ddf5f8f8..85842b8f 100644 --- a/main/lib/idds/orm/collections.py +++ b/main/lib/idds/orm/collections.py @@ -330,10 +330,13 @@ def get_collections_by_request_ids(request_ids, session=None): if request_ids and type(request_ids) not in (list, tuple): request_ids = [request_ids] - query = session.query(models.Collection.coll_id, - models.Collection.request_id, - models.Collection.transform_id, - models.Collection.workload_id) + columns = [models.Collection.coll_id, + models.Collection.request_id, + models.Collection.transform_id, + models.Collection.workload_id] + column_names = [column.name for column in columns] + query = session.query(*columns) + if request_ids: query = query.filter(models.Collection.request_id.in_(request_ids)) @@ -342,7 +345,7 @@ def get_collections_by_request_ids(request_ids, session=None): if tmp: for t in tmp: # rets.append(t.to_dict()) - t2 = dict(zip(t.keys(), t)) + t2 = dict(zip(column_names, t)) rets.append(t2) return rets except Exception as error: diff --git a/main/lib/idds/orm/contents.py b/main/lib/idds/orm/contents.py index d4524616..601a7576 100644 --- a/main/lib/idds/orm/contents.py +++ b/main/lib/idds/orm/contents.py @@ -601,8 +601,10 @@ def get_update_contents_from_others_by_dep_id(request_id=None, transform_id=None .filter(models.Content.substatus != ContentStatus.New) subquery = subquery.subquery() - query = session.query(models.Content.content_id, - subquery.c.substatus) + columns = [models.Content.content_id, subquery.c.substatus] + column_names = [column.name for column in columns] + + query = session.query(*columns) if request_id: query = query.filter(models.Content.request_id == request_id) if transform_id: @@ -615,7 +617,7 @@ def get_update_contents_from_others_by_dep_id(request_id=None, transform_id=None rets = [] if tmp: for t in tmp: - t2 = dict(zip(t.keys(), t)) + t2 = dict(zip(column_names, t)) rets.append(t2) return rets except Exception as ex: @@ -642,10 +644,12 @@ def get_updated_transforms_by_content_status(request_id=None, transform_id=None, subquery = subquery.filter(models.Content.content_relation_type == 1) subquery = subquery.subquery() - query = session.query(models.Content.request_id, - models.Content.transform_id, - models.Content.workload_id, - models.Content.coll_id) + columns = [models.Content.request_id, + models.Content.transform_id, + models.Content.workload_id, + models.Content.coll_id] + column_names = [column.name for column in columns] + query = session.query(*columns) # query = query.with_hint(models.Content, "INDEX(CONTENTS CONTENTS_REQ_TF_COLL_IDX)", 'oracle') if request_id: @@ -661,7 +665,7 @@ def get_updated_transforms_by_content_status(request_id=None, transform_id=None, rets = [] if tmp: for t in tmp: - t2 = dict(zip(t.keys(), t)) + t2 = dict(zip(column_names, t)) rets.append(t2) return rets except Exception as error: @@ -986,13 +990,15 @@ def get_contents_ext_ids(request_id=None, transform_id=None, workload_id=None, c if not isinstance(status, (tuple, list)): status = [status] - query = session.query(models.Content_ext.request_id, - models.Content_ext.transform_id, - models.Content_ext.workload_id, - models.Content_ext.coll_id, - models.Content_ext.content_id, - models.Content_ext.panda_id, - models.Content_ext.status) + columns = [models.Content_ext.request_id, + models.Content_ext.transform_id, + models.Content_ext.workload_id, + models.Content_ext.coll_id, + models.Content_ext.content_id, + models.Content_ext.panda_id, + models.Content_ext.status] + column_names = [column.name for column in columns] + query = session.query(*columns) if request_id: query = query.filter(models.Content_ext.request_id == request_id) if transform_id: @@ -1009,7 +1015,7 @@ def get_contents_ext_ids(request_id=None, transform_id=None, workload_id=None, c rets = [] if tmp: for t in tmp: - t2 = dict(zip(t.keys(), t)) + t2 = dict(zip(column_names, t)) rets.append(t2) return rets except sqlalchemy.orm.exc.NoResultFound as error: