Skip to content

Commit

Permalink
Merge pull request #372 from HSF/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
wguanicedew authored Dec 10, 2024
2 parents c5bb158 + c32a714 commit 9768394
Show file tree
Hide file tree
Showing 12 changed files with 130 additions and 55 deletions.
8 changes: 7 additions & 1 deletion client/lib/idds/client/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@

import logging
import os
import random
import requests
import time

try:
# Python 2
from urllib import urlencode, quote
Expand Down Expand Up @@ -48,7 +51,7 @@ def __init__(self, host=None, auth=None, timeout=None, client_proxy=None):
self.client_proxy = client_proxy
self.timeout = timeout
self.session = requests.session()
self.retries = 2
self.retries = 3

self.auth_type = None
self.oidc_token_file = None
Expand Down Expand Up @@ -241,6 +244,9 @@ def get_request_response(self, url, type='GET', data=None, headers=None, auth_se
logging.warning('ConnectionError: ' + str(error))
if retry >= self.retries - 1:
raise exceptions.ConnectionException('ConnectionError: ' + str(error))
else:
random_sleep = random.uniform(0, 30)
time.sleep(random_sleep)

if result is not None:
if return_result_directly:
Expand Down
19 changes: 19 additions & 0 deletions doma/lib/idds/doma/workflowv2/domapandawork.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ def __init__(self, executable=None, arguments=None, parameters=None, setup=None,
self.dependency_map = {}
self.dependency_map_deleted = []

self.additional_task_parameters = {}

def my_condition(self):
if self.is_finished():
return True
Expand Down Expand Up @@ -335,6 +337,15 @@ def set_agent_attributes(self, attrs, req_attributes=None):
self.num_retries = int(self.agent_attributes['num_retries'])
if 'poll_panda_jobs_chunk_size' in self.agent_attributes and self.agent_attributes['poll_panda_jobs_chunk_size']:
self.poll_panda_jobs_chunk_size = int(self.agent_attributes['poll_panda_jobs_chunk_size'])
if 'additional_task_parameters' in self.agent_attributes and self.agent_attributes['additional_task_parameters']:
if not self.additional_task_parameters:
self.additional_task_parameters = {}
try:
for key, value in self.agent_attributes['additional_task_parameters'].items():
if key not in self.additional_task_parameters:
self.additional_task_parameters[key] = value
except Exception as ex:
self.logger.warn(f"Failed to set additional_task_parameters: {ex}")

def depend_on(self, work):
self.logger.debug("checking depending on")
Expand Down Expand Up @@ -765,6 +776,14 @@ def create_processing(self, input_output_maps=[]):

task_param_map['reqID'] = self.get_request_id()

if self.additional_task_parameters:
try:
for key, value in self.additional_task_parameters.items():
if key not in task_param_map:
task_param_map[key] = value
except Exception as ex:
self.logger.warn(f"failed to set task parameter map with additional_task_parameters: {ex}")

processing_metadata = {'task_param': task_param_map}
proc = Processing(processing_metadata=processing_metadata)
proc.workload_id = None
Expand Down
10 changes: 9 additions & 1 deletion main/config_default/logrotate_daemon
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,12 @@

# while true; do /usr/sbin/logrotate -s /var/log/idds/logrotate.status /etc/logrotate.d/idds >> /var/log/idds/logrotate.log 2>&1; sleep 3600; done

while true; do /usr/sbin/logrotate -s /var/log/idds/logrotate.status /etc/logrotate.d/idds; sleep 86400; done
# one week
while true; do
/usr/sbin/logrotate -s /var/log/idds/logrotate.status /etc/logrotate.d/idds
sleep 604800

# random sleep to avoid all severs restart at the same time
RANDOM_SLEEP=$((RANDOM % 3600))
sleep $RANDOM_SLEEP
done
16 changes: 8 additions & 8 deletions main/lib/idds/orm/base/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -691,14 +691,14 @@ class Content_ext(BASE, ModelBase):
req_id = Column(BigInteger())
jedi_task_id = Column(BigInteger())
actual_core_count = Column(Integer())
max_rss = Column(Integer())
max_vmem = Column(Integer())
max_swap = Column(Integer())
max_pss = Column(Integer())
avg_rss = Column(Integer())
avg_vmem = Column(Integer())
avg_swap = Column(Integer())
avg_pss = Column(Integer())
max_rss = Column(BigInteger())
max_vmem = Column(BigInteger())
max_swap = Column(BigInteger())
max_pss = Column(BigInteger())
avg_rss = Column(BigInteger())
avg_vmem = Column(BigInteger())
avg_swap = Column(BigInteger())
avg_pss = Column(BigInteger())
max_walltime = Column(Integer())
disk_io = Column(Integer())
failed_attempt = Column(Integer())
Expand Down
9 changes: 5 additions & 4 deletions main/lib/idds/tests/panda_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@
# os.environ['PANDA_URL_SSL'] = 'https://panda-doma-k8s-panda.cern.ch/server/panda'
# os.environ['PANDA_URL'] = 'http://panda-doma-k8s-panda.cern.ch:25080/server/panda'

# os.environ['PANDA_URL'] = 'https://usdf-panda-server.slac.stanford.edu:8443/server/panda'
# os.environ['PANDA_URL_SSL'] = 'https://usdf-panda-server.slac.stanford.edu:8443/server/panda'
os.environ['PANDA_URL'] = 'https://usdf-panda-server.slac.stanford.edu:8443/server/panda'
os.environ['PANDA_URL_SSL'] = 'https://usdf-panda-server.slac.stanford.edu:8443/server/panda'

os.environ['PANDA_URL_SSL'] = 'https://pandaserver01.sdcc.bnl.gov:25443/server/panda'
os.environ['PANDA_URL'] = 'https://pandaserver01.sdcc.bnl.gov:25443/server/panda'
# os.environ['PANDA_URL_SSL'] = 'https://pandaserver01.sdcc.bnl.gov:25443/server/panda'
# os.environ['PANDA_URL'] = 'https://pandaserver01.sdcc.bnl.gov:25443/server/panda'

from pandaclient import Client # noqa E402

Expand Down Expand Up @@ -85,6 +85,7 @@
task_ids = [476, 477, 478]
task_ids = [937, 938, 940, 941]
task_ids = [124, 619]
task_ids = [22707, 22708, 22709, 22710, 23211, 23212, 22155, 22158]
for task_id in task_ids:
print("Killing %s" % task_id)
ret = Client.killTask(task_id, verbose=True)
Expand Down
2 changes: 1 addition & 1 deletion main/lib/idds/tests/test_domapanda.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@
task_queue5 = 'SLAC_Rubin_IO'
# task_queue = 'SLAC_Rubin_Extra_Himem_32Cores'
# task_queue = 'SLAC_Rubin_Merge'
# task_queue = 'SLAC_TEST'
task_queue2 = 'SLAC_TEST'
# task_queue4 = task_queue3 = task_queue2 = task_queue1 = task_queue

# task_cloud = None
Expand Down
57 changes: 44 additions & 13 deletions main/lib/idds/tests/test_domapanda_big.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,26 +45,28 @@
task_queue = 'CC-IN2P3_Rubin'
task_queue1 = 'CC-IN2P3_Rubin_Medium'
task_queue2 = 'CC-IN2P3_Rubin_Himem'
task_queue3 = 'CC-IN2P3_Rubin_Extra_Himem'
task_queue4 = 'CC-IN2P3_Rubin_Merge'
task_queue3 = 'CC-IN2P3_Rubin_Big_Himem'
task_queue4 = 'CC-IN2P3_Rubin_Extra_Himem'
task_queue5 = 'CC-IN2P3_Rubin_Merge'
elif len(sys.argv) > 1 and sys.argv[1] == "lancs":
site = 'lancs'
task_cloud = 'EU'
# task_queue = 'LANCS_TEST'
task_queue = 'LANCS_Rubin'
task_queue1 = 'LANCS_Rubin_Medium'
task_queue2 = 'LANCS_Rubin_Himem'
task_queue3 = 'LANCS_Rubin_Extra_Himem'
task_queue3 = 'LANCS_Rubin_Himem'
task_queue4 = 'LANCS_Rubin_Merge'
task_queue3 = 'LANCS_Rubin_Big_Himem'
task_queue4 = 'LANCS_Rubin_Extra_Himem'
# task_queue3 = 'LANCS_Rubin_Himem'
task_queue5 = 'LANCS_Rubin_Merge'
elif len(sys.argv) > 1 and sys.argv[1] == "ral":
site = 'RAL'
task_cloud = 'EU'
# task_queue = 'RAL_TEST'
task_queue = 'RAL_Rubin'
task_queue1 = 'RAL_Rubin_Medium'
task_queue2 = 'RAL_Rubin_Himem'
task_queue3 = 'RAL_Rubin_Extra_Himem'
task_queue3 = 'RAL_Rubin_Big_Himem'
# task_queue3 = 'RAL_Rubin_Himem'
task_queue4 = 'RAL_Rubin_Merge'
# task_queue5 = 'RAL_Rubin_IO'
Expand All @@ -81,8 +83,9 @@
task_queue = 'SLAC_Rubin'
task_queue1 = 'SLAC_Rubin_Medium'
task_queue2 = 'SLAC_Rubin_Himem'
task_queue3 = 'SLAC_Rubin_Extra_Himem'
task_queue4 = 'SLAC_Rubin_Merge'
task_queue3 = 'SLAC_Rubin_Big_Himem'
task_queue4 = 'SLAC_Rubin_Extra_Himem'
task_queue5 = 'SLAC_Rubin_Merge'
# task_queue = 'SLAC_Rubin_Extra_Himem_32Cores'
# task_queue = 'SLAC_Rubin_Merge'
# task_queue = 'SLAC_TEST'
Expand Down Expand Up @@ -185,7 +188,7 @@ def setup_workflow():
taskN4.dependencies = [
{"name": "00004" + str(k),
"dependencies": [],
"submitted": False} for k in range(100)
"submitted": False} for k in range(10000)
]

taskN5 = PanDATask()
Expand All @@ -197,6 +200,15 @@ def setup_workflow():
"submitted": False} for k in range(100)
]

taskN6 = PanDATask()
taskN6.step = "step6"
taskN6.name = site + "_" + taskN5.step + "_" + randStr()
taskN6.dependencies = [
{"name": "00005" + str(k),
"dependencies": [],
"submitted": False} for k in range(100)
]

work1 = DomaPanDAWork(executable='echo; sleep 180',
primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#1'},
output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#1'}],
Expand All @@ -212,7 +224,7 @@ def setup_workflow():
"type": "template",
"value": "log.tgz"},
task_cloud=task_cloud)
work2 = DomaPanDAWork(executable='echo; sleep 180',
work2 = DomaPanDAWork(executable='echo; sleep 180', # noqa F841
primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#2'},
output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#2'}],
log_collections=[], dependency_map=taskN2.dependencies,
Expand All @@ -227,7 +239,7 @@ def setup_workflow():
"type": "template",
"value": "log.tgz"},
task_cloud=task_cloud)
work3 = DomaPanDAWork(executable='echo; sleep 180',
work3 = DomaPanDAWork(executable='echo; sleep 180', # noqa F841
primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#3'},
output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#3'}],
log_collections=[], dependency_map=taskN3.dependencies,
Expand All @@ -243,7 +255,7 @@ def setup_workflow():
"value": "log.tgz"},
task_cloud=task_cloud)

work4 = DomaPanDAWork(executable='echo; sleep 180',
work4 = DomaPanDAWork(executable='echo; sleep 180', # noqa F841
primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#1'},
output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#1'}],
log_collections=[], dependency_map=taskN4.dependencies,
Expand All @@ -259,7 +271,7 @@ def setup_workflow():
"value": "log.tgz"},
task_cloud=task_cloud)

work5 = DomaPanDAWork(executable='echo; sleep 180',
work5 = DomaPanDAWork(executable='echo; sleep 180', # noqa F841
primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#1'},
output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#1'}],
log_collections=[], dependency_map=taskN5.dependencies,
Expand All @@ -275,14 +287,33 @@ def setup_workflow():
"value": "log.tgz"},
task_cloud=task_cloud)

work6 = DomaPanDAWork(executable='echo; sleep 180', # noqa F841
primary_input_collection={'scope': 'pseudo_dataset', 'name': 'pseudo_input_collection#1'},
output_collections=[{'scope': 'pseudo_dataset', 'name': 'pseudo_output_collection#1'}],
log_collections=[], dependency_map=taskN6.dependencies,
task_name=taskN6.name, task_queue=task_queue5,
encode_command_line=True,
task_priority=981,
prodSourceLabel='managed',
task_log={"dataset": "PandaJob_#{pandaid}/",
"destination": "local",
"param_type": "log",
"token": "local",
"type": "template",
"value": "log.tgz"},
task_cloud=task_cloud)

pending_time = 12
# pending_time = None
workflow = Workflow(pending_time=pending_time)
workflow.add_work(work1)
"""
workflow.add_work(work2)
workflow.add_work(work3)
workflow.add_work(work4)
workflow.add_work(work5)
workflow.add_work(work6)
"""
workflow.name = site + "_" + 'test_workflow.idds.%s.test' % time.time()
return workflow

Expand Down
24 changes: 12 additions & 12 deletions main/lib/idds/tests/test_get_request_info_panda.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
# from idds.common.constants import RequestStatus # noqa F401
# from idds.common.utils import json_loads # noqa F401

Expand All @@ -8,7 +9,7 @@
idds_client = pandaclient.idds_api.get_api(idds_utils.json_dumps, idds_host=None, compress=True, manager=True)

# wms_workflow_id = 4112
wms_workflow_id = 5194
wms_workflow_id = 2154
# only check the request status
ret = idds_client.get_requests(request_id=wms_workflow_id)
print(ret)
Expand All @@ -20,19 +21,18 @@
print(ret)

workloads = []
transforms = []
for workload in ret[1][1]:
workloads.append(workload['transform_workload_id'])
transforms.append(workload['transform_id'])
print(workloads)
print(transforms)

# show one workload file information
workload_0 = workloads[0]
ret = idds_client.get_contents_output_ext(request_id=wms_workflow_id, workload_id=workload_0)
print(ret)

workload_1 = workloads[1]
ret = idds_client.get_contents_output_ext(request_id=wms_workflow_id, workload_id=workload_1)
print(ret)
for transform_id in transforms:
ret = idds_client.get_transform(request_id=wms_workflow_id, transform_id=transform_id)
print(json.dumps(ret, indent=4, sort_keys=True))

workload_2 = workloads[2]
ret = idds_client.get_contents_output_ext(request_id=wms_workflow_id, workload_id=workload_2)
print(ret)
# show one workload file information
for workload_id in workloads:
ret = idds_client.get_contents_output_ext(request_id=wms_workflow_id, workload_id=workload_id)
print(ret)
12 changes: 6 additions & 6 deletions monitor/data/conf.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@

var appConfig = {
'iddsAPI_request': "https://lxplus947.cern.ch:443/idds/monitor_request/null/null",
'iddsAPI_transform': "https://lxplus947.cern.ch:443/idds/monitor_transform/null/null",
'iddsAPI_processing': "https://lxplus947.cern.ch:443/idds/monitor_processing/null/null",
'iddsAPI_request_detail': "https://lxplus947.cern.ch:443/idds/monitor/null/null/true/false/false",
'iddsAPI_transform_detail': "https://lxplus947.cern.ch:443/idds/monitor/null/null/false/true/false",
'iddsAPI_processing_detail': "https://lxplus947.cern.ch:443/idds/monitor/null/null/false/false/true"
'iddsAPI_request': "https://lxplus956.cern.ch:443/idds/monitor_request/null/null",
'iddsAPI_transform': "https://lxplus956.cern.ch:443/idds/monitor_transform/null/null",
'iddsAPI_processing': "https://lxplus956.cern.ch:443/idds/monitor_processing/null/null",
'iddsAPI_request_detail': "https://lxplus956.cern.ch:443/idds/monitor/null/null/true/false/false",
'iddsAPI_transform_detail': "https://lxplus956.cern.ch:443/idds/monitor/null/null/false/true/false",
'iddsAPI_processing_detail': "https://lxplus956.cern.ch:443/idds/monitor/null/null/false/false/true"
}
4 changes: 4 additions & 0 deletions workflow/bin/run_workflow
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ def run_workflow(name, context, original_args, current_job_kwargs):
with workflow:
ret = workflow.run()
logging.info("run workflow result: %s" % str(ret))
if not ret:
return -1
return 0


Expand All @@ -113,6 +115,8 @@ def run_work(name, context, original_args, current_job_kwargs):
logging.info("work: %s" % work)
ret = work.run()
logging.info("run work result: %s" % str(ret))
if not ret:
return -1
return 0


Expand Down
Loading

0 comments on commit 9768394

Please sign in to comment.