Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into release
Browse files Browse the repository at this point in the history
  • Loading branch information
mightqxc committed Jul 6, 2020
2 parents a4c3dfa + 98f51bd commit 157cbdf
Show file tree
Hide file tree
Showing 17 changed files with 323 additions and 155 deletions.
2 changes: 1 addition & 1 deletion pandaharvester/commit_timestamp.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
timestamp = "16-06-2020 14:57:09 on release (by fahui)"
timestamp = "06-07-2020 11:53:08 on release (by fahui)"
27 changes: 26 additions & 1 deletion pandaharvester/harvesterbody/cacher.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def get_data(self, info_url, tmp_log):
pass
except Exception:
core_utils.dump_error_message(tmp_log)
elif info_url.startswith('http'):
elif info_url.startswith('http:'):
try:
res = requests.get(info_url, timeout=60)
if res.status_code == 200:
Expand All @@ -121,6 +121,31 @@ def get_data(self, info_url, tmp_log):
tmp_log.error('read timeout when getting data from {0}'.format(info_url))
except Exception:
core_utils.dump_error_message(tmp_log)
elif info_url.startswith('https:'):
try:
try:
# try with pandacon certificate
cert_file = harvester_config.pandacon.cert_file
key_file = harvester_config.pandacon.key_file
ca_cert = harvester_config.pandacon.ca_cert
res = requests.get(info_url, cert=(cert_file, key_file), verify=ca_cert, timeout=60)
except requests.exceptions.SSLError:
# try without certificate
res = requests.get(info_url, timeout=60)
except requests.exceptions.ReadTimeout:
tmp_log.error('read timeout when getting data from {0}'.format(info_url))
except Exception:
core_utils.dump_error_message(tmp_log)
else:
if res.status_code == 200:
try:
retVal = res.json()
except Exception:
errMsg = 'corrupted json from {0} : {1}'.format(info_url, res.text)
tmp_log.error(errMsg)
else:
errMsg = 'failed to get {0} with StatusCode={1} {2}'.format(info_url, res.status_code, res.text)
tmp_log.error(errMsg)
elif info_url.startswith('panda_cache:'):
try:
publicKey, privateKey = info_url.split(':')[-1].split('&')
Expand Down
145 changes: 121 additions & 24 deletions pandaharvester/harvesterbody/cred_manager.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import re
import itertools

from pandaharvester.harvesterconfig import harvester_config
from pandaharvester.harvestercore import core_utils
from pandaharvester.harvestercore.plugin_factory import PluginFactory
Expand All @@ -12,28 +15,61 @@
class CredManager(AgentBase):

# constructor
def __init__(self, single_mode=False):
def __init__(self, queue_config_mapper, single_mode=False):
AgentBase.__init__(self, single_mode)
self.queue_config_mapper = queue_config_mapper
self.pluginFactory = PluginFactory()
self.dbProxy = DBProxy()
# plugin cores
self.exeCores = []
self.queue_exe_cores = []
# get plugin from harvester config
self.get_cores_from_harvester_config()
# update plugin cores from queue config
self.update_cores_from_queue_config()

# get list
def get_list(self, data):
if isinstance(data, list):
return data
else:
return [data]

# get plugin cores from harvester config
def get_cores_from_harvester_config(self):
# get module and class names
moduleNames = self.get_list(harvester_config.credmanager.moduleName)
classNames = self.get_list(harvester_config.credmanager.className)
if hasattr(harvester_config.credmanager, 'moduleName'):
moduleNames = self.get_list(harvester_config.credmanager.moduleName)
else:
moduleNames = []
if hasattr(harvester_config.credmanager, 'className'):
classNames = self.get_list(harvester_config.credmanager.className)
else:
classNames = []
# file names of original certificates
if hasattr(harvester_config.credmanager, 'inCertFile'):
inCertFiles = self.get_list(harvester_config.credmanager.inCertFile)
else:
elif hasattr(harvester_config.credmanager, 'certFile'):
inCertFiles = self.get_list(harvester_config.credmanager.certFile)
else:
inCertFiles = []
# file names of certificates to be generated
if hasattr(harvester_config.credmanager, 'outCertFile'):
outCertFiles = self.get_list(harvester_config.credmanager.outCertFile)
else:
# use the file name of the certificate for panda connection as output name
outCertFiles = self.get_list(harvester_config.pandacon.cert_file)
# VOMS
vomses = self.get_list(harvester_config.credmanager.voms)
# get plugin
self.exeCores = []
if hasattr(harvester_config.credmanager, 'voms'):
vomses = self.get_list(harvester_config.credmanager.voms)
else:
vomses = []
# direct and merged plugin configuration in json
if hasattr(harvester_config.credmanager, 'pluginConfigs'):
pluginConfigs = harvester_config.credmanager.pluginConfigs
else:
pluginConfigs = []
# from traditional attributes
for moduleName, className, inCertFile, outCertFile, voms in \
zip(moduleNames, classNames, inCertFiles, outCertFiles, vomses):
pluginPar = {}
Expand All @@ -45,28 +81,86 @@ def __init__(self, single_mode=False):
try:
exeCore = self.pluginFactory.get_plugin(pluginPar)
self.exeCores.append(exeCore)
except Exception as e:
_logger.error('Problem instantiating cred manager for {0}'.format(pluginPar))
_logger.error('Exception {0}'.format(e))

except Exception:
_logger.error('failed to launch credmanager with traditional attributes for {0}'.format(pluginPar))
core_utils.dump_error_message(_logger)
# from pluginConfigs
for pc in pluginConfigs:
try:
setup_maps = pc['configs']
for setup_name, setup_map in setup_maps.items():
try:
pluginPar = {}
pluginPar['module'] = pc['module']
pluginPar['name'] = pc['name']
pluginPar['setup_name'] = setup_name
pluginPar.update(setup_map)
exeCore = self.pluginFactory.get_plugin(pluginPar)
self.exeCores.append(exeCore)
except Exception:
_logger.error('failed to launch credmanager in pluginConfigs for {0}'.format(pluginPar))
core_utils.dump_error_message(_logger)
except Exception:
_logger.error('failed to parse pluginConfigs {0}'.format(pc))
core_utils.dump_error_message(_logger)

# get list
def get_list(self, data):
if isinstance(data, list):
return data
else:
return [data]
# update plugin cores from queue config
def update_cores_from_queue_config(self):
self.queue_exe_cores = []
for queue_name, queue_config in self.queue_config_mapper.get_all_queues().items():
if queue_config.queueStatus == 'offline' \
or not hasattr(queue_config, 'credmanagers') \
or not isinstance(queue_config.credmanagers, list):
continue
for cm_setup in queue_config.credmanagers:
try:
pluginPar = {}
pluginPar['module'] = cm_setup['module']
pluginPar['name'] = cm_setup['name']
pluginPar['setup_name'] = queue_name
for k, v in cm_setup.items():
if k in ('module', 'name'):
pass
if isinstance(v, str) and '$' in v:
# replace placeholders
value = v
patts = re.findall('\$\{([a-zA-Z\d_.]+)\}', v)
for patt in patts:
tmp_ph = '${' + patt + '}'
tmp_val = None
if patt == 'harvesterID':
tmp_val = harvester_config.master.harvester_id
elif patt == 'queueName':
tmp_val = queue_name
elif patt.startswith('common.'):
# values from common blocks
attr = patt.replace('common.', '')
if hasattr(queue_config, 'common') and attr in queue_config.common:
tmp_val = queue_config.common[attr]
if tmp_val is not None:
value = value.replace(tmp_ph, tmp_val)
# fill in
pluginPar[k] = value
else:
# fill in
pluginPar[k] = v
exe_core = self.pluginFactory.get_plugin(pluginPar)
self.queue_exe_cores.append(exe_core)
except Exception:
_logger.error('failed to launch about queue={0} for {1}'.format(queue_name, pluginPar))
core_utils.dump_error_message(_logger)

# main loop
def run(self):
while True:
# update plugin cores from queue config
self.update_cores_from_queue_config()
# execute
self.execute()
# check if being terminated
if self.terminated(harvester_config.credmanager.sleepTime, randomize=False):
return


# main
def execute(self):
# get lock
Expand All @@ -75,16 +169,19 @@ def execute(self):
if not locked:
return
# loop over all plugins
for exeCore in self.exeCores:
for exeCore in itertools.chain(self.exeCores, self.queue_exe_cores):
# do nothing
if exeCore is None:
continue

# make logger
mainLog = self.make_logger(_logger, "{0} {1} {2}".format(exeCore.__class__.__name__,
exeCore.inCertFile,
exeCore.outCertFile),
method_name='execute')
credmanager_name = ''
if hasattr(exeCore, 'setup_name'):
credmanager_name = exeCore.setup_name
else:
credmanager_name = '{0} {1}'.format(exeCore.inCertFile, exeCore.outCertFile)
mainLog = self.make_logger(_logger,
'{0} {1}'.format(exeCore.__class__.__name__, credmanager_name),
method_name='execute')
try:
# check credential
mainLog.debug('check credential')
Expand Down
4 changes: 2 additions & 2 deletions pandaharvester/harvesterbody/master.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def start(self):
thrList = []
# Credential Manager
from pandaharvester.harvesterbody.cred_manager import CredManager
thr = CredManager(single_mode=self.singleMode)
thr = CredManager(self.queueConfigMapper, single_mode=self.singleMode)
thr.set_stop_event(self.stopEvent)
thr.execute()
thr.start()
Expand Down Expand Up @@ -158,7 +158,7 @@ def start(self):
# Service monitor
try:
sm_active = harvester_config.service_monitor.active
except:
except Exception:
sm_active = False

if sm_active:
Expand Down
30 changes: 24 additions & 6 deletions pandaharvester/harvesterbody/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,30 +23,49 @@
class Monitor(AgentBase):
# constructor
def __init__(self, queue_config_mapper, single_mode=False):
tmp_log = self.make_logger(_logger, method_name='__init__')
AgentBase.__init__(self, single_mode)
self.queueConfigMapper = queue_config_mapper
self.dbProxy = DBProxy()
self.pluginFactory = PluginFactory()
self.startTimestamp = time.time()
self.monitor_fifo = MonitorFIFO()
try:
self.monitor_fifo = MonitorFIFO()
except Exception:
tmp_log.error('failed to launch monitor-fifo')
core_utils.dump_error_message(tmp_log)
if self.monitor_fifo.enabled:
self.monitor_event_fifo = MonitorEventFIFO()
try:
self.monitor_event_fifo = MonitorEventFIFO()
except Exception:
tmp_log.error('failed to launch monitor-event-fifo')
core_utils.dump_error_message(tmp_log)
else:
self.monitor_event_fifo = None
self.apfmon = Apfmon(self.queueConfigMapper)
self.eventBasedMonCoreList = []
if getattr(harvester_config.monitor, 'eventBasedEnable', False):
for pluginConf in harvester_config.monitor.eventBasedPlugins:
pluginFactory = PluginFactory()
self.eventBasedMonCoreList.append(pluginFactory.get_plugin(pluginConf))
plugin_key = pluginFactory.get_plugin_key(pluginConf)
try:
self.eventBasedMonCoreList.append(pluginFactory.get_plugin(pluginConf))
except Exception:
tmp_log.error('failed to launch event-based-monitor plugin of {0}'.format(plugin_key))
core_utils.dump_error_message(tmp_log)

# main loop
def run(self):
lockedBy = 'monitor-{0}'.format(self.get_pid())
mainLog = self.make_logger(_logger, 'id={0}'.format(lockedBy), method_name='run')
# init messengers
for queueConfig in self.queueConfigMapper.get_all_queues().values():
for queueName, queueConfig in self.queueConfigMapper.get_all_queues().items():
# just import for module initialization
self.pluginFactory.get_plugin(queueConfig.messenger)
try:
self.pluginFactory.get_plugin(queueConfig.messenger)
except Exception:
mainLog.error('failed to launch messenger plugin for {0}'.format(queueName))
core_utils.dump_error_message(mainLog)
# main
fifoSleepTimeMilli = getattr(harvester_config.monitor, 'fifoSleepTimeMilli', 5000)
fifoCheckDuration = getattr(harvester_config.monitor, 'fifoCheckDuration', 30)
Expand All @@ -67,7 +86,6 @@ def run(self):
adjusted_sleepTime = sleepTime
if monitor_fifo.enabled:
monitor_fifo.restore()
mainLog = self.make_logger(_logger, 'id={0}'.format(lockedBy), method_name='run')
while True:
sw_main = core_utils.get_stopwatch()
mainLog.debug('start a monitor cycle')
Expand Down
7 changes: 6 additions & 1 deletion pandaharvester/harvesterbody/sweeper.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,12 @@ def run(self):
mainLog.error('queue config for {0}/{1} not found'.format(queueName, configID))
continue
queueConfig = self.queueConfigMapper.get_queue(queueName, configID)
sweeperCore = self.pluginFactory.get_plugin(queueConfig.sweeper)
try:
sweeperCore = self.pluginFactory.get_plugin(queueConfig.sweeper)
except Exception:
mainLog.error('failed to launch sweeper plugin for {0}/{1}'.format(queueName, configID))
core_utils.dump_error_message(mainLog)
continue
sw.reset()
n_workers = len(workspec_list)
try:
Expand Down
10 changes: 2 additions & 8 deletions pandaharvester/harvestercloud/pilots_starter.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,9 +222,10 @@ def get_configuration():
if resource_type:
resource_type_option = '--resource-type {0}'.format(resource_type)

psl_option = ''
if prodSourceLabel:
psl_option = '-j {0}'.format(prodSourceLabel)
else:
psl_option = '-j managed'

job_type_option = ''
if job_type:
Expand All @@ -233,13 +234,6 @@ def get_configuration():
wrapper_params = '-a {0} -s {1} -r {2} -q {3} {4} {5} {6}'.format(WORK_DIR, panda_site, panda_queue, panda_queue,
resource_type_option, psl_option, job_type_option)

# TODO: This should be removed once we start using prodSourceLabel
if not psl_option:
if 'ANALY' in panda_queue:
wrapper_params = '{0} -j user'.format(wrapper_params)
else:
wrapper_params = '{0} -j managed'.format(wrapper_params)

if submit_mode == 'PUSH':
# job configuration files need to be copied, because k8s configmap mounts as read-only file system
# and therefore the pilot cannot execute in the same directory
Expand Down
10 changes: 9 additions & 1 deletion pandaharvester/harvestercore/plugin_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,15 @@ def __init__(self, no_db=False):
self.classMap = {}
self.noDB = no_db

# get plugin
# get plugin key
def get_plugin_key(self, plugin_conf):
# use module + class as key
moduleName = plugin_conf['module']
className = plugin_conf['name']
pluginKey = '{0}.{1}'.format(moduleName, className)
return pluginKey

# get plugin instance
def get_plugin(self, plugin_conf):
# use module + class as key
moduleName = plugin_conf['module']
Expand Down
Loading

0 comments on commit 157cbdf

Please sign in to comment.