Skip to content

Commit

Permalink
Merge pull request #263 from HSF/dev
Browse files Browse the repository at this point in the history
add meta info table and use it for min_request_id
  • Loading branch information
wguanicedew authored Jan 9, 2024
2 parents cee7baa + f198073 commit 656e5be
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 6 deletions.
5 changes: 5 additions & 0 deletions common/lib/idds/common/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,11 @@ class CommandStatus(IDDSEnum):
UnknownCommand = 4


class MetaStatus(IDDSEnum):
UnActive = 0
Active = 1


class CommandLocking(IDDSEnum):
Idle = 0
Locking = 1
Expand Down
5 changes: 5 additions & 0 deletions main/lib/idds/agents/clerk/clerk.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ def get_new_requests(self):
for req_id in reqs_new:
if BaseAgent.min_request_id is None or BaseAgent.min_request_id > req_id:
BaseAgent.min_request_id = req_id
core_requests.set_min_request_id(BaseAgent.min_request_id)

event = NewRequestEvent(publisher_id=self.id, request_id=req_id)
events.append(event)
self.event_bus.send_bulk(events)
Expand Down Expand Up @@ -209,6 +211,8 @@ def get_running_requests(self):
for req_id in reqs:
if BaseAgent.min_request_id is None or BaseAgent.min_request_id > req_id:
BaseAgent.min_request_id = req_id
core_requests.set_min_request_id(BaseAgent.min_request_id)

event = UpdateRequestEvent(publisher_id=self.id, request_id=req_id)
events.append(event)
self.event_bus.send_bulk(events)
Expand Down Expand Up @@ -257,6 +261,7 @@ def get_operation_requests(self):

if BaseAgent.min_request_id is None or BaseAgent.min_request_id > request_id:
BaseAgent.min_request_id = request_id
core_requests.set_min_request_id(BaseAgent.min_request_id)

event = None
if cmd_status in [CommandStatus.New, CommandStatus.Processing]:
Expand Down
32 changes: 30 additions & 2 deletions main/lib/idds/core/requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@

from idds.common.constants import (RequestStatus, RequestLocking, WorkStatus,
CollectionType, CollectionStatus, CollectionRelationType,
MessageStatus)
MessageStatus, MetaStatus)
from idds.orm.base.session import read_session, transactional_session
from idds.orm import requests as orm_requests
from idds.orm import transforms as orm_transforms
from idds.orm import workprogress as orm_workprogresses
from idds.orm import collections as orm_collections
from idds.orm import messages as orm_messages
from idds.orm import meta as orm_meta
from idds.core import messages as core_messages


Expand Down Expand Up @@ -382,7 +383,9 @@ def get_requests_by_status_type(status, request_type=None, time_period=None, loc
:returns: list of Request.
"""
if min_request_id is None:
min_request_id = orm_requests.get_min_request_id(session=session)
min_request_id = get_min_request_id(session=session)
if not min_request_id:
min_request_id = 0

if locking:
if not only_return_id and bulk_size:
Expand Down Expand Up @@ -484,3 +487,28 @@ def get_num_active_requests(active_status=None, session=None):
@read_session
def get_active_requests(active_status=None, session=None):
return orm_requests.get_active_requests(active_status=active_status, session=session)


@transactional_session
def set_min_request_id(min_request_id, session=None):
"""
Set min request id
:param min_request_id: Int of min_request_id.
"""
orm_meta.add_meta_item(name='min_request_id', status=MetaStatus.Active, description="min request id",
metadata={"min_request_id": min_request_id}, session=None)


@read_session
def get_min_request_id(session=None):
"""
Get min request id
:returns min_request_id: Int of min_request_id.
"""
meta = orm_meta.get_meta_item(name='min_request_id', session=session)
if not meta:
return None
else:
return meta.get("min_request_id", None)
1 change: 1 addition & 0 deletions main/lib/idds/orm/base/alembic/script.py.mako
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#
# Authors:
# - Wen Guan, <[email protected]>, 2023
# - Wen Guan, <[email protected]>, 2024

"""${message}
Expand Down
27 changes: 23 additions & 4 deletions main/lib/idds/orm/base/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
# http://www.apache.org/licenses/LICENSE-2.0OA
#
# Authors:
# - Wen Guan, <[email protected]>, 2019 - 2023
# - Wen Guan, <[email protected]>, 2019 - 2024


"""
Expand All @@ -33,7 +33,7 @@
MessageType, MessageStatus, MessageLocking,
MessageSource, MessageDestination, ThrottlerStatus,
CommandType, CommandStatus, CommandLocking,
CommandLocation, HealthStatus)
CommandLocation, HealthStatus, MetaStatus)
from idds.common.event import (EventType, EventStatus)
from idds.common.utils import date_to_str
from idds.orm.base.enum import EnumSymbol
Expand Down Expand Up @@ -902,9 +902,11 @@ class EventArchive(BASE, ModelBase):


class Throttler(BASE, ModelBase):
"""Represents the operations events"""
"""Represents the throttlers"""
__tablename__ = 'throttlers'
throttler_id = Column(BigInteger(), primary_key=True)
throttler_id = Column(BigInteger().with_variant(Integer, "sqlite"),
Sequence('THROTTLER_ID_SEQ', schema=DEFAULT_SCHEMA_NAME),
primary_key=True)
site = Column(String(50), nullable=False)
status = Column(EnumWithValue(ThrottlerStatus), nullable=False)
num_requests = Column(Integer())
Expand All @@ -920,6 +922,23 @@ class Throttler(BASE, ModelBase):
UniqueConstraint('site', name='THROTTLER_SITE_UQ'))


class MetaInfo(BASE, ModelBase):
"""Represents the meta infos"""
__tablename__ = 'meta_info'
meta_id = Column(BigInteger().with_variant(Integer, "sqlite"),
Sequence('METAINFO_ID_SEQ', schema=DEFAULT_SCHEMA_NAME),
primary_key=True)
name = Column(String(50), nullable=False)
status = Column(EnumWithValue(MetaStatus), nullable=False)
created_at = Column("created_at", DateTime, default=datetime.datetime.utcnow, nullable=False)
updated_at = Column("updated_at", DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow, nullable=False)
description = Column(String(1000), nullable=True)
metadata = Column(JSON())

__table_args__ = (PrimaryKeyConstraint('meta_id', name='METAINFO_PK'),
UniqueConstraint('name', name='METAINFO_NAME_UQ'))


def create_trigger():
func = DDL("""
SET search_path TO %s;
Expand Down

0 comments on commit 656e5be

Please sign in to comment.