Skip to content

Commit

Permalink
Merge pull request #250 from HSF/dev
Browse files Browse the repository at this point in the history
update sls and bulk_update_mapping
  • Loading branch information
wguanicedew authored Nov 29, 2023
2 parents 4b0c96c + e02159b commit 5cde928
Show file tree
Hide file tree
Showing 7 changed files with 149 additions and 13 deletions.
21 changes: 21 additions & 0 deletions common/lib/idds/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@

from enum import Enum
from functools import wraps
from itertools import groupby
from operator import itemgetter
from packaging import version as packaging_version

from idds.common.config import (config_has_section, config_has_option,
Expand Down Expand Up @@ -609,3 +611,22 @@ def report_availability(availability):
error = "Failed to report availablity: %s" % str(ex)
print(error)
logging.debug(error)


def split_chunks_not_continous(data):
rets = []
for k, g in groupby(enumerate(data), lambda i_x: i_x[0] - i_x[1]):
rets.append(list(map(itemgetter(1), g)))
return rets


def group_list(input_list, key):
update_groups = {}
for item in input_list:
item_key = item[key]
del item[key]
item_tuple = tuple(sorted(item.items()))
if item_tuple not in update_groups:
update_groups[item_tuple] = {'keys': [], 'items': item}
update_groups[item_tuple]['keys'].append(item_key)
return update_groups
1 change: 1 addition & 0 deletions main/config_default/supervisord_idds.ini
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ environment =
RUCIO_HOME=/opt/idds/,
RUCIO_ACCOUNT=pilot,
RUCIO_AUTH_TYPE=x509_proxy,
PANDA_USE_NATIVE_HTTPLIB=true,
X509_USER_PROXY=/opt/idds/config/x509up
;command=/opt/idds/bin/run-idds
;command=bash -c "source /etc/profile.d/conda.sh && conda activate /opt/idds && /opt/idds/bin/run-idds"
Expand Down
10 changes: 6 additions & 4 deletions main/lib/idds/core/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ def update_input_collection_with_contents(coll, parameters, contents, bulk_size=


@transactional_session
def update_contents(parameters, session=None):
def update_contents(parameters, request_id=None, transform_id=None, use_bulk_update_mappings=False, session=None):
"""
updatecontents.
Expand All @@ -272,7 +272,8 @@ def update_contents(parameters, session=None):
:raises DatabaseException: If there is a database error.
"""
return orm_contents.update_contents(parameters, session=session)
return orm_contents.update_contents(parameters, request_id=request_id, transform_id=transform_id,
use_bulk_update_mappings=use_bulk_update_mappings, session=session)


@transactional_session
Expand Down Expand Up @@ -763,7 +764,7 @@ def add_contents_ext(contents, bulk_size=10000, session=None):


@transactional_session
def update_contents_ext(parameters, session=None):
def update_contents_ext(parameters, request_id=None, transform_id=None, use_bulk_update_mappings=False, session=None):
"""
update contents ext.
Expand All @@ -774,7 +775,8 @@ def update_contents_ext(parameters, session=None):
:raises DatabaseException: If there is a database error.
"""
return orm_contents.update_contents_ext(parameters, session=session)
return orm_contents.update_contents_ext(parameters, request_id=request_id, transform_id=transform_id,
use_bulk_update_mappings=use_bulk_update_mappings, session=session)


@read_session
Expand Down
10 changes: 7 additions & 3 deletions main/lib/idds/core/processings.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ def update_processing_contents(update_processing, update_contents=None, update_m
update_dep_contents=None, update_collections=None, messages=None,
new_update_contents=None, new_input_dependency_contents=None,
new_contents_ext=None, update_contents_ext=None,
request_id=None, transform_id=None, use_bulk_update_mappings=False,
message_bulk_size=2000, session=None):
"""
Update processing with contents.
Expand All @@ -330,7 +331,8 @@ def update_processing_contents(update_processing, update_contents=None, update_m
if update_contents:
chunks = get_list_chunks(update_contents)
for chunk in chunks:
orm_contents.update_contents(chunk, session=session)
orm_contents.update_contents(chunk, request_id=request_id, transform_id=transform_id,
use_bulk_update_mappings=use_bulk_update_mappings, session=session)
if new_update_contents:
# first add and then delete, to trigger the trigger 'update_content_dep_status'.
# too slow
Expand All @@ -350,7 +352,8 @@ def update_processing_contents(update_processing, update_contents=None, update_m
if update_contents_ext:
chunks = get_list_chunks(update_contents_ext)
for chunk in chunks:
orm_contents.update_contents_ext(chunk, session=session)
orm_contents.update_contents_ext(chunk, request_id=request_id, transform_id=transform_id,
use_bulk_update_mappings=use_bulk_update_mappings, session=session)
if new_input_dependency_contents:
new_input_dependency_contents = resolve_input_dependency_id(new_input_dependency_contents, session=session)
chunks = get_list_chunks(new_input_dependency_contents)
Expand All @@ -372,7 +375,8 @@ def update_processing_contents(update_processing, update_contents=None, update_m
if update_messages:
chunks = get_list_chunks(update_messages)
for chunk in chunks:
orm_messages.update_messages(chunk, bulk_size=message_bulk_size, session=session)
orm_messages.update_messages(chunk, bulk_size=message_bulk_size, request_id=request_id, transform_id=transform_id,
use_bulk_update_mappings=use_bulk_update_mappings, session=session)
if messages:
if not type(messages) in [list, tuple]:
messages = [messages]
Expand Down
37 changes: 33 additions & 4 deletions main/lib/idds/orm/contents.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from idds.common import exceptions
from idds.common.constants import (ContentType, ContentStatus, ContentLocking,
ContentFetchStatus, ContentRelationType)
from idds.common.utils import group_list
from idds.orm.base.session import read_session, transactional_session
from idds.orm.base import models

Expand Down Expand Up @@ -472,7 +473,7 @@ def update_content(content_id, parameters, session=None):


@transactional_session
def update_contents(parameters, session=None):
def update_contents(parameters, use_bulk_update_mappings=False, request_id=None, transform_id=None, session=None):
"""
update contents.
Expand All @@ -487,7 +488,21 @@ def update_contents(parameters, session=None):
for parameter in parameters:
parameter['updated_at'] = datetime.datetime.utcnow()

session.bulk_update_mappings(models.Content, parameters)
if use_bulk_update_mappings:
session.bulk_update_mappings(models.Content, parameters)
else:
groups = group_list(parameters, key='content_id')
for group_key in groups:
group = groups[group_key]
keys = group['keys']
items = group['items']
query = session.query(models.Content)
if request_id:
query = query.filter(models.Content.request_id == request_id)
if transform_id:
query = query.filter(models.Content.transform_id == transform_id)
query = query.filter(models.Content.content_id.in_(keys))\
.update(items, synchronize_session=False)
except sqlalchemy.orm.exc.NoResultFound as error:
raise exceptions.NoObject('Content cannot be found: %s' % (error))

Expand Down Expand Up @@ -867,7 +882,7 @@ def add_contents_ext(contents, bulk_size=10000, session=None):


@transactional_session
def update_contents_ext(parameters, session=None):
def update_contents_ext(parameters, use_bulk_update_mappings=True, request_id=None, transform_id=None, session=None):
"""
update contents ext.
Expand All @@ -879,7 +894,21 @@ def update_contents_ext(parameters, session=None):
"""
try:
session.bulk_update_mappings(models.Content_ext, parameters)
if use_bulk_update_mappings:
session.bulk_update_mappings(models.Content_ext, parameters)
else:
groups = group_list(parameters, key='content_id')
for group_key in groups:
group = groups[group_key]
keys = group['keys']
items = group['items']
query = session.query(models.Content_ext)
if request_id:
query = query.filter(models.Content.request_id == request_id)
if transform_id:
query = query.filter(models.Content.transform_id == transform_id)
query = query.filter(models.Content.content_id.in_(keys))\
.update(items, synchronize_session=False)
except sqlalchemy.orm.exc.NoResultFound as error:
raise exceptions.NoObject('Content cannot be found: %s' % (error))

Expand Down
19 changes: 17 additions & 2 deletions main/lib/idds/orm/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

from idds.common import exceptions
from idds.common.constants import MessageDestination
from idds.common.utils import group_list
from idds.orm.base import models
from idds.orm.base.session import read_session, transactional_session

Expand Down Expand Up @@ -101,9 +102,23 @@ def add_messages(messages, bulk_size=1000, session=None):


@transactional_session
def update_messages(messages, bulk_size=1000, session=None):
def update_messages(messages, bulk_size=1000, use_bulk_update_mappings=False, request_id=None, transform_id=None, session=None):
try:
session.bulk_update_mappings(models.Message, messages)
if use_bulk_update_mappings:
session.bulk_update_mappings(models.Message, messages)
else:
groups = group_list(messages, key='msg_id')
for group_key in groups:
group = groups[group_key]
keys = group['keys']
items = group['items']
query = session.query(models.Message)
if request_id:
query = query.filter(models.Message.request_id == request_id)
if transform_id:
query = query.filter(models.Message.transform_id == transform_id)
query = query.filter(models.Message.msg_id.in_(keys))\
.update(items, synchronize_session=False)
except TypeError as e:
raise exceptions.DatabaseException('Invalid JSON for msg_content: %s' % str(e))
except DatabaseError as e:
Expand Down
64 changes: 64 additions & 0 deletions main/tools/sls/idds_sls.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,12 @@ def http_availability(host):
avail = check_command(curl, 'IDDSException')
if options.debug:
print("http check availability (without proxy): %s" % avail)

if not avail or avail == 0:
logrotate_running = is_logrotate_running()
restarting = is_restarting()
if logrotate_running or restarting:
return 1
return avail


Expand Down Expand Up @@ -199,6 +205,64 @@ def heartbeat_availability(log_location):
return avail, hang_workers


def is_logrotate_running():
# get the count of logrotate processes - if >=1 then logrotate is running
output = (
subprocess.Popen(
"ps -eo pgid,args | grep logrotate | grep -v grep | wc -l",
stdout=subprocess.PIPE,
shell=True,
)
.communicate()[0]
.decode("ascii")
)

try:
cleaned_output = output.strip()
n_logrotate_processes = int(cleaned_output)
except ValueError:
print(
"The string has an unexpected format and couldn't be converted to an integer."
)

# logrotate process found
if n_logrotate_processes >= 1:
if options.debug:
print("Logrotate is running")
return True

return False


def is_restarting():
# get the count of logrotate processes - if >=1 then logrotate is running
output = (
subprocess.Popen(
"ps -eo pgid,args | grep restart|grep http | grep -v grep | wc -l",
stdout=subprocess.PIPE,
shell=True,
)
.communicate()[0]
.decode("ascii")
)

try:
cleaned_output = output.strip()
n_restarting_processes = int(cleaned_output)
except ValueError:
print(
"The string has an unexpected format and couldn't be converted to an integer."
)

# logrotate process found
if n_restarting_processes >= 1:
if options.debug:
print("http is restarting")
return True

return False


def idds_availability(host, log_location):
infos = {}
http_avail = http_availability(host)
Expand Down

0 comments on commit 5cde928

Please sign in to comment.