diff --git a/common/lib/idds/common/constants.py b/common/lib/idds/common/constants.py index 2129163a..25e6650f 100644 --- a/common/lib/idds/common/constants.py +++ b/common/lib/idds/common/constants.py @@ -474,6 +474,11 @@ class CommandStatus(IDDSEnum): UnknownCommand = 4 +class MetaStatus(IDDSEnum): + UnActive = 0 + Active = 1 + + class CommandLocking(IDDSEnum): Idle = 0 Locking = 1 diff --git a/main/lib/idds/agents/clerk/clerk.py b/main/lib/idds/agents/clerk/clerk.py index e64cf45b..a3fc3f8d 100644 --- a/main/lib/idds/agents/clerk/clerk.py +++ b/main/lib/idds/agents/clerk/clerk.py @@ -166,6 +166,8 @@ def get_new_requests(self): for req_id in reqs_new: if BaseAgent.min_request_id is None or BaseAgent.min_request_id > req_id: BaseAgent.min_request_id = req_id + core_requests.set_min_request_id(BaseAgent.min_request_id) + event = NewRequestEvent(publisher_id=self.id, request_id=req_id) events.append(event) self.event_bus.send_bulk(events) @@ -209,6 +211,8 @@ def get_running_requests(self): for req_id in reqs: if BaseAgent.min_request_id is None or BaseAgent.min_request_id > req_id: BaseAgent.min_request_id = req_id + core_requests.set_min_request_id(BaseAgent.min_request_id) + event = UpdateRequestEvent(publisher_id=self.id, request_id=req_id) events.append(event) self.event_bus.send_bulk(events) @@ -257,6 +261,7 @@ def get_operation_requests(self): if BaseAgent.min_request_id is None or BaseAgent.min_request_id > request_id: BaseAgent.min_request_id = request_id + core_requests.set_min_request_id(BaseAgent.min_request_id) event = None if cmd_status in [CommandStatus.New, CommandStatus.Processing]: diff --git a/main/lib/idds/core/requests.py b/main/lib/idds/core/requests.py index 83e388a6..fb6d3084 100644 --- a/main/lib/idds/core/requests.py +++ b/main/lib/idds/core/requests.py @@ -18,13 +18,14 @@ from idds.common.constants import (RequestStatus, RequestLocking, WorkStatus, CollectionType, CollectionStatus, CollectionRelationType, - MessageStatus) + MessageStatus, MetaStatus) from idds.orm.base.session import read_session, transactional_session from idds.orm import requests as orm_requests from idds.orm import transforms as orm_transforms from idds.orm import workprogress as orm_workprogresses from idds.orm import collections as orm_collections from idds.orm import messages as orm_messages +from idds.orm import meta as orm_meta from idds.core import messages as core_messages @@ -382,7 +383,9 @@ def get_requests_by_status_type(status, request_type=None, time_period=None, loc :returns: list of Request. """ if min_request_id is None: - min_request_id = orm_requests.get_min_request_id(session=session) + min_request_id = get_min_request_id(session=session) + if not min_request_id: + min_request_id = 0 if locking: if not only_return_id and bulk_size: @@ -484,3 +487,28 @@ def get_num_active_requests(active_status=None, session=None): @read_session def get_active_requests(active_status=None, session=None): return orm_requests.get_active_requests(active_status=active_status, session=session) + + +@transactional_session +def set_min_request_id(min_request_id, session=None): + """ + Set min request id + + :param min_request_id: Int of min_request_id. + """ + orm_meta.add_meta_item(name='min_request_id', status=MetaStatus.Active, description="min request id", + metadata={"min_request_id": min_request_id}, session=None) + + +@read_session +def get_min_request_id(session=None): + """ + Get min request id + + :returns min_request_id: Int of min_request_id. + """ + meta = orm_meta.get_meta_item(name='min_request_id', session=session) + if not meta: + return None + else: + return meta.get("min_request_id", None) diff --git a/main/lib/idds/orm/base/alembic/script.py.mako b/main/lib/idds/orm/base/alembic/script.py.mako index 16763eea..7436eb8b 100644 --- a/main/lib/idds/orm/base/alembic/script.py.mako +++ b/main/lib/idds/orm/base/alembic/script.py.mako @@ -7,6 +7,7 @@ # # Authors: # - Wen Guan, , 2023 +# - Wen Guan, , 2024 """${message} diff --git a/main/lib/idds/orm/base/models.py b/main/lib/idds/orm/base/models.py index 4bd9ad4b..7c06eecd 100644 --- a/main/lib/idds/orm/base/models.py +++ b/main/lib/idds/orm/base/models.py @@ -6,7 +6,7 @@ # http://www.apache.org/licenses/LICENSE-2.0OA # # Authors: -# - Wen Guan, , 2019 - 2023 +# - Wen Guan, , 2019 - 2024 """ @@ -33,7 +33,7 @@ MessageType, MessageStatus, MessageLocking, MessageSource, MessageDestination, ThrottlerStatus, CommandType, CommandStatus, CommandLocking, - CommandLocation, HealthStatus) + CommandLocation, HealthStatus, MetaStatus) from idds.common.event import (EventType, EventStatus) from idds.common.utils import date_to_str from idds.orm.base.enum import EnumSymbol @@ -902,9 +902,11 @@ class EventArchive(BASE, ModelBase): class Throttler(BASE, ModelBase): - """Represents the operations events""" + """Represents the throttlers""" __tablename__ = 'throttlers' - throttler_id = Column(BigInteger(), primary_key=True) + throttler_id = Column(BigInteger().with_variant(Integer, "sqlite"), + Sequence('THROTTLER_ID_SEQ', schema=DEFAULT_SCHEMA_NAME), + primary_key=True) site = Column(String(50), nullable=False) status = Column(EnumWithValue(ThrottlerStatus), nullable=False) num_requests = Column(Integer()) @@ -920,6 +922,23 @@ class Throttler(BASE, ModelBase): UniqueConstraint('site', name='THROTTLER_SITE_UQ')) +class MetaInfo(BASE, ModelBase): + """Represents the meta infos""" + __tablename__ = 'meta_info' + meta_id = Column(BigInteger().with_variant(Integer, "sqlite"), + Sequence('METAINFO_ID_SEQ', schema=DEFAULT_SCHEMA_NAME), + primary_key=True) + name = Column(String(50), nullable=False) + status = Column(EnumWithValue(MetaStatus), nullable=False) + created_at = Column("created_at", DateTime, default=datetime.datetime.utcnow, nullable=False) + updated_at = Column("updated_at", DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow, nullable=False) + description = Column(String(1000), nullable=True) + metadata = Column(JSON()) + + __table_args__ = (PrimaryKeyConstraint('meta_id', name='METAINFO_PK'), + UniqueConstraint('name', name='METAINFO_NAME_UQ')) + + def create_trigger(): func = DDL(""" SET search_path TO %s;