Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into release
Browse files Browse the repository at this point in the history
  • Loading branch information
mightqxc committed Jan 15, 2021
2 parents f308fa0 + 68f62c7 commit 019cc99
Show file tree
Hide file tree
Showing 33 changed files with 491 additions and 441 deletions.
2 changes: 1 addition & 1 deletion pandaharvester/commit_timestamp.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
timestamp = "14-08-2020 09:04:50 on release (by fahui)"
timestamp = "15-01-2021 05:19:17 on release (by fahui)"
36 changes: 35 additions & 1 deletion pandaharvester/harvesterbody/cred_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,10 @@ def run(self):
while True:
# update plugin cores from queue config
self.update_cores_from_queue_config()

# execute
self.execute()
self.execute() # this is the main run

# check if being terminated
if self.terminated(harvester_config.credmanager.sleepTime, randomize=False):
return
Expand Down Expand Up @@ -199,3 +201,35 @@ def execute(self):
except Exception:
core_utils.dump_error_message(mainLog)
mainLog.debug('done')

# monit main
def execute_monit(self):
self.update_cores_from_queue_config()

metrics = {}
# loop over all plugins
for exeCore in itertools.chain(self.exeCores, self.queue_exe_cores):
# do nothing
if exeCore is None:
continue

# make logger
if hasattr(exeCore, 'setup_name'):
credmanager_name = exeCore.setup_name
else:
credmanager_name = '{0} {1}'.format(exeCore.inCertFile, exeCore.outCertFile)

subLog = self.make_logger(_logger, '{0} {1}'.format(exeCore.__class__.__name__, credmanager_name),
method_name='execute_monit')
try:
# check credential
subLog.debug('check credential lifetime')
lifetime = exeCore.check_credential_lifetime()
if lifetime is not None:
metrics[exeCore.outCertFile] = lifetime
except Exception:
core_utils.dump_error_message(subLog)

subLog.debug('done')

return metrics
10 changes: 5 additions & 5 deletions pandaharvester/harvesterbody/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -403,10 +403,10 @@ def monitor_agent_core(self, lockedBy, queueName, workSpecsList, from_fifo=False
isCheckedList.append(isChecked)
if monStatus == WorkSpec.ST_failed:
if not workSpec.has_pilot_error() and workSpec.errorCode is None:
workSpec.set_pilot_error(PilotErrors.ERR_GENERALERROR, diagMessage)
workSpec.set_pilot_error(PilotErrors.GENERALERROR, diagMessage)
elif monStatus == WorkSpec.ST_cancelled:
if not workSpec.has_pilot_error() and workSpec.errorCode is None:
workSpec.set_pilot_error(PilotErrors.ERR_PANDAKILL, diagMessage)
workSpec.set_pilot_error(PilotErrors.PANDAKILL, diagMessage)
if monStatus in [WorkSpec.ST_finished, WorkSpec.ST_failed, WorkSpec.ST_cancelled]:
workSpec.set_work_params({'finalMonStatus': monStatus})
# request events
Expand Down Expand Up @@ -658,7 +658,7 @@ def check_workers(self, mon_core, messenger, all_workers, queue_config, tmp_log,
self.dbProxy.kill_worker(workSpec.workerID)
newStatus = WorkSpec.ST_cancelled
diagMessage = 'Killed by Harvester due to consecutive worker check failures. ' + diagMessage
workSpec.set_pilot_error(PilotErrors.ERR_FAILEDBYSERVER, diagMessage)
workSpec.set_pilot_error(PilotErrors.FAILEDBYSERVER, diagMessage)
else:
# use original status
newStatus = workSpec.status
Expand All @@ -673,7 +673,7 @@ def check_workers(self, mon_core, messenger, all_workers, queue_config, tmp_log,
workerID, workerQueueTimeLimit))
self.dbProxy.kill_worker(workSpec.workerID)
diagMessage = 'Killed by Harvester due to worker queuing too long. ' + diagMessage
workSpec.set_pilot_error(PilotErrors.ERR_FAILEDBYSERVER, diagMessage)
workSpec.set_pilot_error(PilotErrors.FAILEDBYSERVER, diagMessage)
# set closed
workSpec.set_pilot_closed()
# expired heartbeat - only when requested in the configuration
Expand All @@ -693,7 +693,7 @@ def check_workers(self, mon_core, messenger, all_workers, queue_config, tmp_log,
workerID))
self.dbProxy.kill_worker(workSpec.workerID)
diagMessage = 'Killed by Harvester due to worker heartbeat expired. ' + diagMessage
workSpec.set_pilot_error(PilotErrors.ERR_FAILEDBYSERVER, diagMessage)
workSpec.set_pilot_error(PilotErrors.FAILEDBYSERVER, diagMessage)
# get work attributes
workAttributes = messenger.get_work_attributes(workSpec)
retMap[workerID]['workAttributes'] = workAttributes
Expand Down
6 changes: 4 additions & 2 deletions pandaharvester/harvesterbody/preparator.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ def run(self):
'subStatus': oldSubStatus})
tmpLog.error('failed to resolve input file paths : {0}'.format(tmpStr))
continue
# manipulate container-related job params
jobSpec.manipulate_job_params_for_container()
# update job
jobSpec.lockedBy = None
jobSpec.set_all_input_ready()
Expand Down Expand Up @@ -131,7 +133,7 @@ def run(self):
jobSpec.preparatorTime = None
jobSpec.stateChangeTime = datetime.datetime.utcnow()
errStr = 'stage-in failed with {0}'.format(tmpStr)
jobSpec.set_pilot_error(PilotErrors.ERR_STAGEINFAILED, errStr)
jobSpec.set_pilot_error(PilotErrors.STAGEINFAILED, errStr)
jobSpec.trigger_propagation()
self.dbProxy.update_job(jobSpec, {'lockedBy': lockedBy,
'subStatus': oldSubStatus})
Expand Down Expand Up @@ -294,7 +296,7 @@ def run(self):
jobSpec.preparatorTime = None
jobSpec.stateChangeTime = datetime.datetime.utcnow()
errStr = 'stage-in failed with {0}'.format(tmpStr)
jobSpec.set_pilot_error(PilotErrors.ERR_STAGEINFAILED, errStr)
jobSpec.set_pilot_error(PilotErrors.STAGEINFAILED, errStr)
jobSpec.trigger_propagation()
self.dbProxy.update_job(jobSpec, {'lockedBy': lockedBy,
'subStatus': oldSubStatus})
Expand Down
4 changes: 2 additions & 2 deletions pandaharvester/harvesterbody/propagator.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ def run(self):
# no workers
tmpJobSpec.status = 'cancelled'
tmpJobSpec.subStatus = 'killed'
tmpJobSpec.set_pilot_error(PilotErrors.ERR_PANDAKILL,
PilotErrors.pilotError[PilotErrors.ERR_PANDAKILL])
tmpJobSpec.set_pilot_error(PilotErrors.PANDAKILL,
PilotErrors.pilot_error_msg[PilotErrors.PANDAKILL])
tmpJobSpec.stateChangeTime = datetime.datetime.utcnow()
tmpJobSpec.trigger_propagation()
self.dbProxy.update_job(tmpJobSpec, {'propagatorLock': self.get_pid()},
Expand Down
17 changes: 17 additions & 0 deletions pandaharvester/harvesterbody/service_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
from pandaharvester.harvestercore import core_utils
from pandaharvester.harvestercore.db_proxy_pool import DBProxyPool as DBProxy
from pandaharvester.harvestercore.service_metrics_spec import ServiceMetricSpec
from pandaharvester.harvestercore.queue_config_mapper import QueueConfigMapper
from pandaharvester.harvesterbody.cred_manager import CredManager

# logger
_logger = core_utils.setup_logger('service_monitor')
Expand All @@ -37,6 +39,8 @@ def __init__(self, pid_file, single_mode=False):
self.children = self.master_process.children(recursive=True)

self.cpu_count = multiprocessing.cpu_count()
self.queue_config_mapper = QueueConfigMapper()
self.cred_manager = CredManager(self.queue_config_mapper, single_mode=True)

def get_master_pid(self):
"""
Expand Down Expand Up @@ -122,6 +126,14 @@ def volume_use(self, volume_name):

return used_amount_float

def cert_validities(self):
try:
cert_validities = self.cred_manager.execute_monit()
return cert_validities
except Exception:
_logger.error('Could not extract ')
return {}

# main loop
def run(self):
while True:
Expand All @@ -146,6 +158,11 @@ def run(self):
_logger.debug('Disk usage of {0}: {1} %'.format(volume, volume_use))
service_metrics['volume_{0}_pc'.format(volume)] = volume_use

# get certificate validities. Not all plugins have implemented it
_logger.debug('Getting cert validities')
service_metrics['cert_lifetime'] = self.cert_validities()
_logger.debug('Got cert validities: {0}'.format(service_metrics['cert_lifetime']))

service_metrics_spec = ServiceMetricSpec(service_metrics)
self.db_proxy.insert_service_metrics(service_metrics_spec)

Expand Down
8 changes: 4 additions & 4 deletions pandaharvester/harvesterbody/stager.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def run(self):
if fileSpec.status != 'finished':
fileSpec.status = 'failed'
errStr = 'stage-out failed with {0}'.format(tmpStr)
jobSpec.set_pilot_error(PilotErrors.ERR_STAGEOUTFAILED, errStr)
jobSpec.set_pilot_error(PilotErrors.STAGEOUTFAILED, errStr)
jobSpec.trigger_propagation()
newSubStatus = self.dbProxy.update_job_for_stage_out(jobSpec, True, lockedBy)
tmpLog.debug('updated new subStatus={0}'.format(newSubStatus))
Expand Down Expand Up @@ -147,7 +147,7 @@ def run(self):
if fileSpec.status != 'finished':
fileSpec.status = 'failed'
errStr = 'stage-out failed with {0}'.format(tmpStr)
jobSpec.set_pilot_error(PilotErrors.ERR_STAGEOUTFAILED, errStr)
jobSpec.set_pilot_error(PilotErrors.STAGEOUTFAILED, errStr)
jobSpec.trigger_propagation()
newSubStatus = self.dbProxy.update_job_for_stage_out(jobSpec, True, lockedBy)
tmpLog.debug('updated new subStatus={0}'.format(newSubStatus))
Expand Down Expand Up @@ -237,7 +237,7 @@ def run(self):
if fileSpec.status == 'zipping':
fileSpec.status = 'failed'
errStr = 'zip-output failed with {0}'.format(tmpStr)
jobSpec.set_pilot_error(PilotErrors.ERR_STAGEOUTFAILED, errStr)
jobSpec.set_pilot_error(PilotErrors.STAGEOUTFAILED, errStr)
jobSpec.trigger_propagation()
newSubStatus = self.dbProxy.update_job_for_stage_out(jobSpec, True, lockedBy)
tmpLog.debug('updated new subStatus={0}'.format(newSubStatus))
Expand Down Expand Up @@ -304,7 +304,7 @@ def run(self):
if fileSpec.status == 'post_zipping':
fileSpec.status = 'failed'
errStr = 'post-zipping failed with {0}'.format(tmpStr)
jobSpec.set_pilot_error(PilotErrors.ERR_STAGEOUTFAILED, errStr)
jobSpec.set_pilot_error(PilotErrors.STAGEOUTFAILED, errStr)
jobSpec.trigger_propagation()
newSubStatus = self.dbProxy.update_job_for_stage_out(jobSpec, True, lockedBy)
tmpLog.debug('updated new subStatus={0}'.format(newSubStatus))
Expand Down
11 changes: 7 additions & 4 deletions pandaharvester/harvesterbody/submitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ def run(self):
job_spec.stateChangeTime = timeNow
job_spec.locked_by = None
errStr = 'failed to make a worker'
job_spec.set_pilot_error(PilotErrors.ERR_SETUPFAILURE, errStr)
job_spec.set_pilot_error(PilotErrors.SETUPFAILURE, errStr)
job_spec.trigger_propagation()
self.dbProxy.update_job(job_spec, {'locked_by': locked_by,
'subStatus': 'prepared'})
Expand Down Expand Up @@ -318,7 +318,7 @@ def run(self):
tmp_log.error(errStr)
work_spec.set_status(WorkSpec.ST_missed)
work_spec.set_dialog_message(tmpStr)
work_spec.set_pilot_error(PilotErrors.ERR_SETUPFAILURE, errStr)
work_spec.set_pilot_error(PilotErrors.SETUPFAILURE, errStr)
work_spec.set_pilot_closed()
if jobList is not None:
# increment attempt number
Expand Down Expand Up @@ -363,7 +363,8 @@ def run(self):
'taskID': job_spec.taskID,
'jobsetID': job_spec.jobParams['jobsetID'],
'nRanges': max(int(math.ceil(work_spec.nCore / len(jobList))),
job_spec.jobParams['coreCount']),
job_spec.jobParams['coreCount']) * \
queue_config.initEventsMultipler,
}
if 'isHPO' in job_spec.jobParams:
if 'sourceURL' in job_spec.jobParams:
Expand Down Expand Up @@ -422,7 +423,9 @@ def run(self):
+ sw_main.get_elapsed_time())
main_log.debug('done')
# define sleep interval
if site_name is None:
if site_name is None or \
(hasattr(harvester_config.submitter, 'respectSleepTime') and
harvester_config.submitter.respectSleepTime):
sleepTime = harvester_config.submitter.sleepTime
else:
sleepTime = 0
Expand Down
60 changes: 60 additions & 0 deletions pandaharvester/harvesterbody/sweeper.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
import os
try:
from os import walk
except ImportError:
from scandir import walk

from future.utils import iteritems
from pandaharvester.harvesterconfig import harvester_config
from pandaharvester.harvestercore import core_utils
Expand Down Expand Up @@ -165,6 +171,60 @@ def run(self):
# delete orphaned job info
self.dbProxy.delete_orphaned_job_info()
mainLog.debug('done deletion of old jobs' + sw_delete.get_elapsed_time())
# disk cleanup
if hasattr(harvester_config.sweeper, 'diskCleanUpInterval') and \
hasattr(harvester_config.sweeper, 'diskHighWatermark'):
locked = self.dbProxy.get_process_lock('sweeper', self.get_pid(),
harvester_config.sweeper.diskCleanUpInterval*60*60)
if locked:
try:
all_active_files = None
for item in harvester_config.sweeper.diskHighWatermark.split(','):
# dir name and watermark in GB
dir_name, watermark = item.split('|')
mainLog.debug('checking {0} for cleanup with watermark {1} GB'.format(dir_name, watermark))
watermark = int(watermark) * 10**9
total_size = 0
file_dict = {}
# scan dir
for root, dirs, filenames in walk(dir_name):
for base_name in filenames:
full_name = os.path.join(root, base_name)
f_size = os.path.getsize(full_name)
total_size += f_size
mtime = os.path.getmtime(full_name)
file_dict.setdefault(mtime, set())
file_dict[mtime].add((base_name, full_name, f_size))
# delete if necessary
if total_size < watermark:
mainLog.debug(
'skip cleanup {0} due to total_size {1} GB < watermark {2} GB'.format(
dir_name, total_size//(10**9), watermark//(10**9)))
else:
mainLog.debug(
'cleanup {0} due to total_size {1} GB >= watermark {2} GB'.format(
dir_name, total_size//(10**9), watermark//(10**9)))
# get active input files
if all_active_files is None:
all_active_files = self.dbProxy.get_all_active_input_files()
deleted_size = 0
mtimes = sorted(file_dict.keys())
for mtime in mtimes:
for base_name, full_name, f_size in file_dict[mtime]:
# keep if active
if base_name in all_active_files:
continue
try:
os.remove(full_name)
except Exception:
core_utils.dump_error_message(mainLog)
deleted_size += f_size
if total_size - deleted_size < watermark:
break
if total_size - deleted_size < watermark:
break
except Exception:
core_utils.dump_error_message(mainLog)
# time the cycle
mainLog.debug('done a sweeper cycle' + sw_main.get_elapsed_time())
# check if being terminated
Expand Down
Loading

0 comments on commit 019cc99

Please sign in to comment.