Skip to content

Commit

Permalink
Merge pull request #223 from HSF/dev
Browse files Browse the repository at this point in the history
improve
  • Loading branch information
wguanicedew authored Oct 16, 2023
2 parents 169ab17 + 43cbe7d commit 0930457
Show file tree
Hide file tree
Showing 11 changed files with 101 additions and 21 deletions.
9 changes: 8 additions & 1 deletion atlas/lib/idds/atlas/workflow/atlasstageinwork.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,14 @@ def poll_rule(self, processing):
# p = processing
# rule_id = p['processing_metadata']['rule_id']
proc = processing['processing_metadata']['processing']
rule_id = proc.external_id

if proc.external_id:
rule_id = proc.external_id
elif self.rule_id:
rule_id = self.rule_id
else:
rule_id = proc.processing_metadata.get('rule_id', None)
self.logger.debug("rule_id: %s" % rule_id)

replicases_status = {}
if rule_id:
Expand Down
9 changes: 8 additions & 1 deletion atlas/lib/idds/atlas/workflowv2/atlasstageinwork.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,14 @@ def poll_rule(self, processing):
# p = processing
# rule_id = p['processing_metadata']['rule_id']
proc = processing['processing_metadata']['processing']
rule_id = proc.external_id

if proc.external_id:
rule_id = proc.external_id
elif self.rule_id:
rule_id = self.rule_id
else:
rule_id = proc.processing_metadata.get('rule_id', None)
self.logger.debug("rule_id: %s" % rule_id)

replicases_status = {}
if rule_id:
Expand Down
4 changes: 2 additions & 2 deletions main/config_default/idds.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,13 @@ retrieve_bulk_size = 32
pending_time = 30

[transformer]
num_threads = 16
num_threads = 3
poll_period = 600
new_poll_period = 60
update_poll_period = 600

poll_time_period = 60
retrieve_bulk_size = 32
retrieve_bulk_size = 64
poll_operation_time_period = 240
message_bulk_size = 1000

Expand Down
3 changes: 2 additions & 1 deletion main/lib/idds/agents/carrier/submitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ def __init__(self, num_threads=1, max_number_workers=3, poll_period=10, retries=
self.set_max_workers()
num_threads = self.max_number_workers

super(Submitter, self).__init__(num_threads=num_threads, max_number_workers=self.max_number_workers, name=name, **kwargs)
super(Submitter, self).__init__(num_threads=num_threads, max_number_workers=self.max_number_workers,
name=name, retrieve_bulk_size=retrieve_bulk_size, **kwargs)

def get_new_processings(self):
"""
Expand Down
3 changes: 2 additions & 1 deletion main/lib/idds/agents/carrier/trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ def __init__(self, num_threads=1, trigger_max_number_workers=3, max_number_worke
self.set_max_workers()

num_threads = int(self.max_number_workers)
super(Trigger, self).__init__(num_threads=num_threads, name=name, max_number_workers=self.max_number_workers, **kwargs)
super(Trigger, self).__init__(num_threads=num_threads, name=name, max_number_workers=self.max_number_workers,
retrieve_bulk_size=retrieve_bulk_size, **kwargs)
self.logger.info("num_threads: %s" % num_threads)

if hasattr(self, 'trigger_max_number_workers'):
Expand Down
16 changes: 12 additions & 4 deletions main/lib/idds/agents/carrier/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
TransformType2MessageTypeMap,
MessageType, MessageTypeStr,
MessageStatus, MessageSource,
MessageDestination)
MessageDestination,
get_work_status_from_transform_processing_status)
from idds.common.utils import setup_logging
from idds.core import (transforms as core_transforms,
processings as core_processings,
Expand Down Expand Up @@ -1665,7 +1666,9 @@ def sync_collection_status(request_id, transform_id, workload_id, work, input_ou
return update_collections, all_updates_flushed, messages


def sync_work_status(request_id, transform_id, workload_id, work, substatus=None):
def sync_work_status(request_id, transform_id, workload_id, work, substatus=None, log_prefix=""):
logger = get_logger()

input_collections = work.get_input_collections()
output_collections = work.get_output_collections()
log_collections = work.get_log_collections()
Expand All @@ -1686,6 +1689,10 @@ def sync_work_status(request_id, transform_id, workload_id, work, substatus=None
is_all_files_failed = False

if is_all_collections_closed:
logger.debug(log_prefix + "has_files: %s, is_all_files_processed: %s, is_all_files_failed: %s, substatus: %s" % (has_files,
is_all_files_processed,
is_all_files_failed,
substatus))
if has_files:
if is_all_files_processed:
work.status = WorkStatus.Finished
Expand All @@ -1695,9 +1702,10 @@ def sync_work_status(request_id, transform_id, workload_id, work, substatus=None
work.status = WorkStatus.SubFinished
else:
if substatus:
work.status = substatus
work.status = get_work_status_from_transform_processing_status(substatus)
else:
work.status = WorkStatus.Failed
logger.debug(log_prefix + "work status: %s, substatus: %s" % (str(work.status), substatus))


def sync_processing(processing, agent_attributes, terminate=False, abort=False, logger=None, log_prefix=""):
Expand All @@ -1724,7 +1732,7 @@ def sync_processing(processing, agent_attributes, terminate=False, abort=False,

messages += msgs

sync_work_status(request_id, transform_id, workload_id, work, processing['substatus'])
sync_work_status(request_id, transform_id, workload_id, work, processing['substatus'], log_prefix)
logger.info(log_prefix + "sync_processing: work status: %s" % work.get_status())
if terminate and work.is_terminated():
msgs = generate_messages(request_id, transform_id, workload_id, work, msg_type='work')
Expand Down
17 changes: 15 additions & 2 deletions main/lib/idds/core/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,7 @@ def get_content_status_statistics(coll_id=None, transform_ids=None, session=None


@read_session
def get_content_status_statistics_by_relation_type(transform_ids, session=None):
def get_content_status_statistics_by_relation_type(transform_ids, bulk_size=500, session=None):
"""
Get statistics group by status
Expand All @@ -521,7 +521,20 @@ def get_content_status_statistics_by_relation_type(transform_ids, session=None):
:returns: statistics group by status, as a dict.
"""
return orm_contents.get_content_status_statistics_by_relation_type(transform_ids, session=session)
if transform_ids and not isinstance(transform_ids, (list, tuple)):
transform_ids = [transform_ids]
if transform_ids and len(transform_ids) == 1:
transform_ids = [transform_ids[0], transform_ids[0]]

if transform_ids and len(transform_ids) > bulk_size:
chunks = [transform_ids[i:i + bulk_size] for i in range(0, len(transform_ids), bulk_size)]
ret = []
for chunk in chunks:
tmp = orm_contents.get_content_status_statistics_by_relation_type(chunk, session=session)
ret += tmp
return ret
else:
return orm_contents.get_content_status_statistics_by_relation_type(transform_ids, session=session)


@transactional_session
Expand Down
7 changes: 5 additions & 2 deletions main/lib/idds/tests/panda_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
os.environ['PANDA_URL_SSL'] = 'https://pandaserver-doma.cern.ch:25443/server/panda'

os.environ['PANDA_BEHIND_REAL_LB'] = "1"
# os.environ['PANDA_URL'] = 'http://rubin-panda-server-dev.slac.stanford.edu:80/server/panda'
# os.environ['PANDA_URL_SSL'] = 'https://rubin-panda-server-dev.slac.stanford.edu:8443/server/panda'
os.environ['PANDA_URL'] = 'http://rubin-panda-server-dev.slac.stanford.edu:80/server/panda'
os.environ['PANDA_URL_SSL'] = 'https://rubin-panda-server-dev.slac.stanford.edu:8443/server/panda'

from pandaclient import Client # noqa E402

Expand All @@ -31,6 +31,9 @@
task_ids += [162282, 162283, 162588]
task_ids = [i for i in range(163930, 164147)]
task_ids = [161142, 160648]
task_ids = [165124, 165130, 165135] + [i for i in range(165143, 165149)]
task_ids = [i for i in range(251, 282)]
task_ids = [282, 322, 323, 324, 325]
for task_id in task_ids:
print("Killing %s" % task_id)
ret = Client.killTask(task_id, verbose=True)
Expand Down
23 changes: 18 additions & 5 deletions main/lib/idds/tests/test_migrate_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@


from idds.client.clientmanager import ClientManager
from idds.common.constants import RequestStatus
from idds.common.utils import json_dumps # noqa F401
from idds.rest.v1.utils import convert_old_req_2_workflow_req
from idds.rest.v1.utils import convert_old_req_2_workflow_req # noqa F401
from idds.common.utils import setup_logging


Expand All @@ -42,7 +43,8 @@ def migrate():
cern_k8s_dev_host = 'https://panda-idds-dev.cern.ch/idds' # noqa F841

# cm1 = ClientManager(host=atlas_host)
cm1 = ClientManager(host=doma_host)
# cm1 = ClientManager(host=doma_host)
cm1 = ClientManager(host=atlas_host)
# cm1 = ClientManager(host=slac_k8s_dev_host)
# reqs = cm1.get_requests(request_id=290)
# old_request_id = 298163
Expand All @@ -64,17 +66,23 @@ def migrate():

# old_request_ids = [21]

old_request_ids = [551889]

old_request_ids = [i for i in range(551911, 556618)]
old_request_ids = [i for i in range(556618, 556712)]
old_request_ids = [i for i in range(556712, 556940)]
old_request_ids = [i for i in range(556940, 557110)]
# old_request_id = 1
# for old_request_id in [152]:
# for old_request_id in [60]: # noqa E115
# for old_request_id in [200]: # noqa E115
for old_request_id in old_request_ids: # noqa E115 # doma 183
reqs = cm1.get_requests(request_id=old_request_id, with_metadata=True)

# cm2 = ClientManager(host=dev_host)
cm2 = ClientManager(host=dev_host)
# cm2 = ClientManager(host=doma_host)
# cm2 = ClientManager(host=atlas_host)
cm2 = ClientManager(host=slac_k8s_dev_host)
# cm2 = ClientManager(host=slac_k8s_dev_host)
# cm2 = ClientManager(host=slac_k8s_prod_host)
# cm2 = ClientManager(host=cern_k8s_dev_host)
# print(reqs)
Expand All @@ -85,7 +93,12 @@ def migrate():
# workflow = req['request_metadata']['workflow']
# print(json_dumps(workflow, sort_keys=True, indent=4))

req = convert_old_req_2_workflow_req(req)
# req = convert_old_req_2_workflow_req(req)
print(req['status'])
if req['status'] in [RequestStatus.Finished]:
print('request stutus: %s, skip' % str(req['status']))
continue

workflow = req['request_metadata']['workflow']
workflow.clean_works()

Expand Down
16 changes: 15 additions & 1 deletion workflow/lib/idds/workflow/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1575,10 +1575,14 @@ def sync_works(self, to_cancel=False):
work = self.works[k]
self.log_debug("work %s is_terminated(%s:%s)" % (work.get_internal_id(), work.is_terminated(), work.get_status()))

if work.get_internal_id() not in self.current_running_works and work.get_status() in [WorkStatus.Transforming]:
self.current_running_works.append(work.get_internal_id())

for work in [self.works[k] for k in self.new_to_run_works]:
if work.transforming:
self.new_to_run_works.remove(work.get_internal_id())
self.current_running_works.append(work.get_internal_id())
if work.get_internal_id() not in self.current_running_works:
self.current_running_works.append(work.get_internal_id())

for work in [self.works[k] for k in self.current_running_works]:
if isinstance(work, Workflow):
Expand Down Expand Up @@ -1614,6 +1618,16 @@ def sync_works(self, to_cancel=False):
self.terminated_works.append(work.get_internal_id())
self.current_running_works.remove(work.get_internal_id())

self.num_finished_works = 0
self.num_subfinished_works = 0
self.num_failed_works = 0
self.num_expired_works = 0
self.num_cancelled_works = 0
self.num_suspended_works = 0

for k in self.works:
work = self.works[k]
if work.is_terminated():
if work.is_finished():
self.num_finished_works += 1
elif work.is_subfinished():
Expand Down
15 changes: 14 additions & 1 deletion workflow/lib/idds/workflowv2/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1697,13 +1697,16 @@ def sync_works(self, to_cancel=False):
work.get_status(),
work.submitted,
work.transforming))
if work.get_internal_id() not in self.current_running_works and work.get_status() in [WorkStatus.Transforming]:
self.current_running_works.append(work.get_internal_id())

submitting_works = self.submitting_works
for work in [self.works[k] for k in self.new_to_run_works]:
if work.transforming:
self.new_to_run_works.remove(work.get_internal_id())
submitting_works.append(work.get_internal_id())
self.current_running_works.append(work.get_internal_id())
if work.get_internal_id() not in self.current_running_works:
self.current_running_works.append(work.get_internal_id())
self.submitting_works = submitting_works
for work in [self.works[k] for k in self.submitting_works]:
self.log_info("Work %s is_submitted(%s)" % (work.get_internal_id(), work.submitted))
Expand Down Expand Up @@ -1746,6 +1749,16 @@ def sync_works(self, to_cancel=False):
self.terminated_works.append(work.get_internal_id())
self.current_running_works.remove(work.get_internal_id())

self.num_finished_works = 0
self.num_subfinished_works = 0
self.num_failed_works = 0
self.num_expired_works = 0
self.num_cancelled_works = 0
self.num_suspended_works = 0

for k in self.works:
work = self.works[k]
if work.is_terminated():
if work.is_finished(synchronize=False):
self.num_finished_works += 1
elif work.is_subfinished(synchronize=False):
Expand Down

0 comments on commit 0930457

Please sign in to comment.