diff --git a/pandaharvester/commit_timestamp.py b/pandaharvester/commit_timestamp.py index 26401d71..a7a3cc47 100644 --- a/pandaharvester/commit_timestamp.py +++ b/pandaharvester/commit_timestamp.py @@ -1 +1 @@ -timestamp = "10-09-2019 13:14:16 on release (by fahui)" +timestamp = "04-10-2019 11:57:38 on release (by fahui)" diff --git a/pandaharvester/harvesterbody/monitor.py b/pandaharvester/harvesterbody/monitor.py index 0530a679..a6f250ab 100644 --- a/pandaharvester/harvesterbody/monitor.py +++ b/pandaharvester/harvesterbody/monitor.py @@ -694,7 +694,7 @@ def check_workers(self, mon_core, messenger, all_workers, queue_config, tmp_log, else: newStatus = WorkSpec.ST_idle elif not workSpec.is_post_processed(): - if not queue_config.is_no_heartbeat_status(newStatus): + if not queue_config.is_no_heartbeat_status(newStatus) and not queue_config.truePilot: # post processing unless heartbeat is suppressed jobSpecs = self.dbProxy.get_jobs_with_worker_id(workSpec.workerID, None, True, diff --git a/pandaharvester/harvesterbody/submitter.py b/pandaharvester/harvesterbody/submitter.py index 7c2df6b8..d69d07aa 100644 --- a/pandaharvester/harvesterbody/submitter.py +++ b/pandaharvester/harvesterbody/submitter.py @@ -57,7 +57,7 @@ def run(self): # get commands comStr = '{0}:{1}'.format(CommandSpec.COM_setNWorkers, siteName) commandSpecs = self.dbProxy.get_commands_for_receiver('submitter', comStr) - mainLog.debug('got {0} {1} commands'.format(commandSpecs, comStr)) + mainLog.debug('got {0} {1} commands'.format(len(commandSpecs), comStr)) for commandSpec in commandSpecs: newLimits = self.dbProxy.set_queue_limit(siteName, commandSpec.params) for tmpResource, tmpNewVal in iteritems(newLimits): diff --git a/pandaharvester/harvesterbody/sweeper.py b/pandaharvester/harvesterbody/sweeper.py index 70b68cd0..0119894f 100644 --- a/pandaharvester/harvesterbody/sweeper.py +++ b/pandaharvester/harvesterbody/sweeper.py @@ -4,6 +4,7 @@ from pandaharvester.harvestercore.db_proxy_pool import DBProxyPool as DBProxy from pandaharvester.harvestercore.plugin_factory import PluginFactory from pandaharvester.harvesterbody.agent_base import AgentBase +from pandaharvester.harvestercore.command_spec import CommandSpec # logger _logger = core_utils.setup_logger('sweeper') @@ -25,6 +26,16 @@ def run(self): while True: sw_main = core_utils.get_stopwatch() mainLog = self.make_logger(_logger, 'id={0}'.format(lockedBy), method_name='run') + # get commands to kill + sw_getcomm = core_utils.get_stopwatch() + mainLog.debug('try to get commands') + comStr = CommandSpec.COM_killWorkers + commandSpecs = self.dbProxy.get_commands_for_receiver('sweeper', comStr) + mainLog.debug('got {0} {1} commands'.format(len(commandSpecs), comStr)) + for commandSpec in commandSpecs: + n_to_kill = self.dbProxy.kill_workers_by_query(commandSpec.params) + mainLog.debug('will kill {0} workers with {1}'.format(n_to_kill, commandSpec.params)) + mainLog.debug('done handling commands' + sw_getcomm.get_elapsed_time()) # killing stage sw_kill = core_utils.get_stopwatch() mainLog.debug('try to get workers to kill') diff --git a/pandaharvester/harvestercore/command_spec.py b/pandaharvester/harvestercore/command_spec.py index 93e781a8..a1e674f5 100644 --- a/pandaharvester/harvestercore/command_spec.py +++ b/pandaharvester/harvestercore/command_spec.py @@ -17,10 +17,12 @@ class CommandSpec(SpecBase): # commands COM_reportWorkerStats = 'REPORT_WORKER_STATS' COM_setNWorkers = 'SET_N_WORKERS' + COM_killWorkers = 'KILL_WORKERS' # mapping between command and receiver receiver_map = { COM_reportWorkerStats: 'propagator', - COM_setNWorkers: 'submitter' + COM_setNWorkers: 'submitter', + COM_killWorkers: 'sweeper', } # constructor diff --git a/pandaharvester/harvestercore/core_utils.py b/pandaharvester/harvestercore/core_utils.py index 5546da68..a6dcea42 100644 --- a/pandaharvester/harvestercore/core_utils.py +++ b/pandaharvester/harvestercore/core_utils.py @@ -277,7 +277,7 @@ def get_output_file_report(jobspec): chksum = fileSpec.chksum.split(':')[-1] else: chksum = fileSpec.chksum - xml += """" + xml += """ diff --git a/pandaharvester/harvestercore/db_proxy.py b/pandaharvester/harvestercore/db_proxy.py index 5add6185..95ec3679 100644 --- a/pandaharvester/harvestercore/db_proxy.py +++ b/pandaharvester/harvestercore/db_proxy.py @@ -3032,7 +3032,7 @@ def refresh_cache(self, main_key, sub_key, new_info): return False # get a cached info - def get_cache(self, main_key, sub_key=None): + def get_cache(self, main_key, sub_key=None, from_local_cache=True): useDB = False try: # get logger @@ -3045,7 +3045,7 @@ def get_cache(self, main_key, sub_key=None): # lock dict globalDict.acquire() # found - if cacheKey in globalDict: + if from_local_cache and cacheKey in globalDict: # release dict globalDict.release() # make spec @@ -5370,3 +5370,64 @@ def get_workers_from_ids(self, ids): core_utils.dump_error_message(_logger) # return return {} + + # send kill command to workers by query + def kill_workers_by_query(self, params): + try: + # get logger + tmpLog = core_utils.make_logger(_logger, method_name='kill_workers_by_query') + tmpLog.debug('start') + # sql to set killTime + sqlL = "UPDATE {0} SET killTime=:setTime ".format(workTableName) + sqlL += "WHERE workerID=:workerID AND killTime IS NULL AND NOT status IN (:st1,:st2,:st3) " + # sql to get workers + constraints_query_string_list = [] + tmp_varMap = {} + constraint_map = {'status': params.get('status', [WorkSpec.ST_submitted]), + 'computingSite': params.get('computingSite', []), + 'computingElement': params.get('computingElement', []), + 'submissionHost': params.get('submissionHost', [])} + tmpLog.debug('query {0}'.format(constraint_map)) + for attribute, match_list in iteritems(constraint_map): + if match_list == 'ALL': + pass + elif not match_list: + tmpLog.debug('{0} constraint is not specified in the query. Skipped'.format(attribute)) + return 0 + else: + one_param_list = [ ':param_{0}_{1}'.format(attribute, v_i) for v_i in range(len(match_list)) ] + tmp_varMap.update(zip(one_param_list, match_list)) + params_string = '(' + ','.join(one_param_list) + ')' + constraints_query_string_list.append('{0} IN {1}'.format(attribute, params_string)) + constranits_query_string = ' AND '.join(constraints_query_string_list) + sqlW = "SELECT workerID FROM {0} ".format(workTableName) + sqlW += "WHERE {0} ".format(constranits_query_string) + # set an older time to trigger sweeper + setTime = datetime.datetime.utcnow() - datetime.timedelta(hours=6) + # get workers + varMap = dict() + varMap.update(tmp_varMap) + self.execute(sqlW, varMap) + resW = self.cur.fetchall() + nRow = 0 + for workerID, in resW: + # set killTime + varMap = dict() + varMap[':workerID'] = workerID + varMap[':setTime'] = setTime + varMap[':st1'] = WorkSpec.ST_finished + varMap[':st2'] = WorkSpec.ST_failed + varMap[':st3'] = WorkSpec.ST_cancelled + self.execute(sqlL, varMap) + nRow += self.cur.rowcount + # commit + self.commit() + tmpLog.debug('set killTime to {0} workers'.format(nRow)) + return nRow + except Exception: + # roll back + self.rollback() + # dump error + core_utils.dump_error_message(_logger) + # return + return None diff --git a/pandaharvester/harvestercore/fifos.py b/pandaharvester/harvestercore/fifos.py index 4f4afb7f..a6557e3a 100644 --- a/pandaharvester/harvestercore/fifos.py +++ b/pandaharvester/harvestercore/fifos.py @@ -27,7 +27,7 @@ _attribute_list = ['id', 'item', 'score'] # fifo object spec -FifoObject = collections.namedtuple('FifoObject', _attribute_list, verbose=False, rename=False) +FifoObject = collections.namedtuple('FifoObject', _attribute_list, rename=False) # logger _logger = core_utils.setup_logger('fifos') diff --git a/pandaharvester/harvestercore/queue_config_mapper.py b/pandaharvester/harvestercore/queue_config_mapper.py index 52be70d9..c197ee25 100644 --- a/pandaharvester/harvestercore/queue_config_mapper.py +++ b/pandaharvester/harvestercore/queue_config_mapper.py @@ -1,6 +1,7 @@ import os import json import copy +import time import datetime import threading import importlib @@ -178,6 +179,7 @@ def __init__(self, update_db=True): self.configFromCacher = harvester_config.qconf.configFromCacher except AttributeError: self.configFromCacher = False + self.updateInterval = 600 # load config from DB cache of URL with validation def _load_config_from_cache(self): @@ -242,15 +244,43 @@ def _get_resolver(): resolver = None return resolver + # update last reload time + def _update_last_reload_time(self): + new_info = '{0:.3f}'.format(time.time()) + return self.dbProxy.refresh_cache('_qconf_last_reload', '_universal', new_info) + + # get last reload time + def _get_last_reload_time(self): + cacheSpec = self.dbProxy.get_cache('_qconf_last_reload', '_universal', from_local_cache=False) + if cacheSpec is None: + return None + timestamp = float(cacheSpec.data) + return timestamp + # load data def load_data(self): mainLog = _make_logger(method_name='QueueConfigMapper.load_data') - # check interval - timeNow = datetime.datetime.utcnow() - if self.lastUpdate is not None and timeNow - self.lastUpdate < datetime.timedelta(minutes=10): - return + with self.lock: + # check if to update + timeNow_timestamp = time.time() + if self.lastUpdate is not None: + last_reload_timestamp = self._get_last_reload_time() + if (last_reload_timestamp is not None and self.lastUpdate is not None + and datetime.datetime.utcfromtimestamp(last_reload_timestamp) < self.lastUpdate + and timeNow_timestamp - last_reload_timestamp < self.updateInterval): + return # start with self.lock: + # update timesatmp of last reload, lock with check interval + got_timesatmp_update_lock = self.dbProxy.get_process_lock('qconf_reload', 'qconf_universal', self.updateInterval) + if got_timesatmp_update_lock: + retVal = self._update_last_reload_time() + if retVal: + mainLog.debug('updated last reload timestamp') + else: + mainLog.warning('failed to update last reload timestamp. Skipped') + else: + mainLog.debug('did not get qconf_reload timestamp lock. Skipped to update last reload timestamp') # init newQueueConfig = dict() localTemplatesDict = dict() diff --git a/pandaharvester/harvestermiddleware/direct_ssh_herder.py b/pandaharvester/harvestermiddleware/direct_ssh_herder.py index 101f5479..a200a62c 100644 --- a/pandaharvester/harvestermiddleware/direct_ssh_herder.py +++ b/pandaharvester/harvestermiddleware/direct_ssh_herder.py @@ -119,6 +119,7 @@ def __init__(self, **kwarg): self.sockDir = getattr(self, 'sockDir', '/tmp') self.numMasters = getattr(self, 'numMasters', 1) self.execStr = getattr(self, 'execStr', '') + self.connectionLifetime = getattr(self, 'connectionLifetime', None) try: self._get_connection() except Exception as e: @@ -151,7 +152,8 @@ def _get_connection(self): sshMasterPool.make_control_master(self.remoteHost, self.remotePort, self.numMasters, ssh_username=self.sshUserName, ssh_password=self.sshPassword, private_key=self.privateKey, pass_phrase=self.passPhrase, - jump_host=self.jumpHost, jump_port=self.jumpPort, sock_dir=self.sockDir) + jump_host=self.jumpHost, jump_port=self.jumpPort, sock_dir=self.sockDir, + connection_lifetime=self.connectionLifetime) conn = sshMasterPool.get_connection(self.remoteHost, self.remotePort, self.execStr) if conn is not None: tmpLog.debug('connected successfully') diff --git a/pandaharvester/harvestermiddleware/ssh_master_pool.py b/pandaharvester/harvestermiddleware/ssh_master_pool.py index 8e47119d..8d5bc3eb 100644 --- a/pandaharvester/harvestermiddleware/ssh_master_pool.py +++ b/pandaharvester/harvestermiddleware/ssh_master_pool.py @@ -2,6 +2,7 @@ import threading import uuid import os +import time import six import pexpect @@ -39,7 +40,7 @@ def make_dict_key(self, host, port): def make_control_master(self, remote_host, remote_port, num_masters=1, ssh_username=None, ssh_password=None, private_key=None, pass_phrase=None, jump_host=None, jump_port=None, login_timeout=60, reconnect=False, - with_lock=True, sock_dir=None): + with_lock=True, sock_dir=None, connection_lifetime=None): dict_key = self.make_dict_key(remote_host, remote_port) if with_lock: self.lock.acquire() @@ -56,7 +57,8 @@ def make_control_master(self, remote_host, remote_port, num_masters=1, 'jump_host': jump_host, 'jump_port': jump_port, 'login_timeout': login_timeout, - 'sock_dir': sock_dir + 'sock_dir': sock_dir, + 'connection_lifetime': connection_lifetime, } else: num_masters = self.params[dict_key]['num_masters'] @@ -68,6 +70,7 @@ def make_control_master(self, remote_host, remote_port, num_masters=1, jump_port = self.params[dict_key]['jump_port'] login_timeout = self.params[dict_key]['login_timeout'] sock_dir = self.params[dict_key]['sock_dir'] + connection_lifetime = self.params[dict_key]['connection_lifetime'] # make a master for i in range(num_masters - len(self.pool[dict_key])): # make a socket file @@ -94,6 +97,7 @@ def make_control_master(self, remote_host, remote_port, num_masters=1, loginString, ] c = pexpect_spawn(com, echo=False) + baseLogger.debug('pexpect_spawn') c.logfile_read = baseLogger.handlers[0].stream isOK = False for iTry in range(3): @@ -132,27 +136,35 @@ def make_control_master(self, remote_host, remote_port, num_masters=1, # exec to confirm login c.sendline('echo {0}'.format(loginString)) if isOK: - self.pool[dict_key].append((sock_file, c)) + conn_exp_time = (time.time() + connection_lifetime) if connection_lifetime is not None else None + self.pool[dict_key].append((sock_file, c, conn_exp_time)) if with_lock: self.lock.release() # get a connection def get_connection(self, remote_host, remote_port, exec_string): + baseLogger.debug('get_connection start') dict_key = self.make_dict_key(remote_host, remote_port) self.lock.acquire() active_masters = [] someClosed = False - for sock_file, child in self.pool[dict_key]: - if child.isalive(): - active_masters.append((sock_file, child)) + for sock_file, child, conn_exp_time in list(self.pool[dict_key]): + if child.isalive() and time.time() <= conn_exp_time: + active_masters.append((sock_file, child, conn_exp_time)) else: child.close() + self.pool[dict_key].remove((sock_file, child, conn_exp_time)) someClosed = True + if child.isalive(): + baseLogger.debug('a connection process is dead') + else: + baseLogger.debug('a connection is expired') if someClosed: self.make_control_master(remote_host, remote_port, reconnect=True, with_lock=False) active_masters = [item for item in self.pool[dict_key] if os.path.exists(item[0])] + baseLogger.debug('reconnected; now {0} active connections'.format(len(active_masters))) if len(active_masters) > 0: - sock_file, child = random.choice(active_masters) + sock_file, child, conn_exp_time = random.choice(active_masters) con = subprocess.Popen(['ssh', 'dummy', '-S', sock_file, exec_string], shell=False, stdin=subprocess.PIPE, diff --git a/pandaharvester/harvestermisc/htcondor_utils.py b/pandaharvester/harvestermisc/htcondor_utils.py index ed37ca03..c3778a12 100644 --- a/pandaharvester/harvestermisc/htcondor_utils.py +++ b/pandaharvester/harvestermisc/htcondor_utils.py @@ -246,18 +246,33 @@ def wrapper(self, *args, **kwargs): # Make logger tmpLog = core_utils.make_logger(baseLogger, 'submissionHost={0}'.format(self.submissionHost), method_name='CondorClient.renew_session_if_error') func_name = func.__name__ + try: + self.schedd + except AttributeError: + if self.lock.acquire(False): + is_renewed = self.renew_session() + self.lock.release() + if not is_renewed: + errStr = 'failed to communicate with {0}'.format(self.submissionHost) + tmpLog.error(errStr) + tmpLog.debug('got RuntimeError: {0}'.format(e)) + raise Exception(errStr) try: ret = func(self, *args, **kwargs) except RuntimeError as e: tmpLog.debug('got RuntimeError: {0}'.format(e)) if self.lock.acquire(False): - self.renew_session() + is_renewed = self.renew_session() self.lock.release() - if to_retry: - tmpLog.debug('condor session renewed. Retrying {0}'.format(func_name)) - ret = func(self, *args, **kwargs) + if is_renewed: + if to_retry: + tmpLog.debug('condor session renewed. Retrying {0}'.format(func_name)) + ret = func(self, *args, **kwargs) + else: + tmpLog.debug('condor session renewed') + raise else: - tmpLog.debug('condor session renewed') + tmpLog.error('failed to renew condor session') raise else: tmpLog.debug('another thread is renewing condor session; skipped...') @@ -324,11 +339,13 @@ def renew_session(self, retry=3, init=False): tmpLog.warning('Failed. Retry...') else: tmpLog.warning('Retry {0} times. Still failed. Skipped'.format(i_try)) + return False i_try += 1 self.secman.invalidateAllSessions() time.sleep(3) # Sleep time.sleep(3) + return True # Condor job query diff --git a/pandaharvester/harvestermonitor/htcondor_monitor.py b/pandaharvester/harvestermonitor/htcondor_monitor.py index 3d467e37..a6d783f1 100644 --- a/pandaharvester/harvestermonitor/htcondor_monitor.py +++ b/pandaharvester/harvestermonitor/htcondor_monitor.py @@ -211,11 +211,11 @@ def check_workers(self, workspec_list): job_ads_all_dict = {} for submissionHost, batchIDs_list in six.iteritems(get_host_batchid_map(workspec_list)): # Record batch job query result to this dict, with key = batchID - job_query = CondorJobQuery( cacheEnable=self.cacheEnable, - cacheRefreshInterval=self.cacheRefreshInterval, - useCondorHistory=self.useCondorHistory, - id=submissionHost) try: + job_query = CondorJobQuery( cacheEnable=self.cacheEnable, + cacheRefreshInterval=self.cacheRefreshInterval, + useCondorHistory=self.useCondorHistory, + id=submissionHost) host_job_ads_dict = job_query.get_all(batchIDs_list=batchIDs_list) except Exception as e: host_job_ads_dict = {} @@ -243,11 +243,15 @@ def report_updated_workers(self, time_window): ## Loop over submissionHost and get all jobs job_ads_all_dict = {} for submissionHost in self.submissionHost_list: - job_query = CondorJobQuery( cacheEnable=self.cacheEnable, - cacheRefreshInterval=self.cacheRefreshInterval, - useCondorHistory=self.useCondorHistory, - id=submissionHost) - job_ads_all_dict.update(job_query.get_all(allJobs=True)) + try: + job_query = CondorJobQuery( cacheEnable=self.cacheEnable, + cacheRefreshInterval=self.cacheRefreshInterval, + useCondorHistory=self.useCondorHistory, + id=submissionHost) + job_ads_all_dict.update(job_query.get_all(allJobs=True)) + except Exception as e: + ret_err_str = 'Exception {0}: {1}'.format(e.__class__.__name__, e) + tmpLog.error(ret_err_str) ## Choose workers updated within a time window workers_to_check_list = [] for condor_job_id, job_ads in six.iteritems(job_ads_all_dict): diff --git a/pandaharvester/harvestermonitor/slurm_monitor.py b/pandaharvester/harvestermonitor/slurm_monitor.py index 048f1fd9..77d215d7 100644 --- a/pandaharvester/harvestermonitor/slurm_monitor.py +++ b/pandaharvester/harvestermonitor/slurm_monitor.py @@ -39,13 +39,10 @@ def check_workers(self, workspec_list): retCode = p.returncode tmpLog.debug('retCode={0}'.format(retCode)) errStr = '' + stdOut_str = stdOut if (isinstance(stdOut, str) or stdOut is None) else stdOut.decode() + stdErr_str = stdErr if (isinstance(stdErr, str) or stdErr is None) else stdErr.decode() if retCode == 0: - # parse - if isinstance(stdOut, str): - stdout_str = stdOut - else: - stdout_str = stdOut.decode() - for tmpLine in stdout_str.split('\n'): + for tmpLine in stdOut_str.split('\n'): tmpMatch = re.search('{0} '.format(workSpec.batchID), tmpLine) if tmpMatch is not None: errStr = tmpLine @@ -66,7 +63,7 @@ def check_workers(self, workspec_list): retList.append((newStatus, errStr)) else: # failed - errStr = '{0} {1}'.format(stdOut, stdErr) + errStr = '{0} {1}'.format(stdOut_str, stdErr_str) tmpLog.error(errStr) if 'slurm_load_jobs error: Invalid job id specified' in errStr: newStatus = WorkSpec.ST_failed diff --git a/pandaharvester/harvesterpreparator/gridftp_preparator.py b/pandaharvester/harvesterpreparator/gridftp_preparator.py index e6381868..08d7eaa3 100644 --- a/pandaharvester/harvesterpreparator/gridftp_preparator.py +++ b/pandaharvester/harvesterpreparator/gridftp_preparator.py @@ -27,6 +27,8 @@ "localBasePath": "/data/rucio", # max number of attempts "maxAttempts": 3, + # check paths under localBasePath. Choose false if destination on remote node + "checkLocalPath": true, # options for globus-url-copy "gulOpts": "-cred /tmp/x509_u1234 -sync -sync-level 3 -verify-checksum -v" } @@ -37,6 +39,7 @@ def __init__(self, **kwarg): self.gulOpts = None self.maxAttempts = 3 self.timeout = None + self.checkLocalPath = True PluginBase.__init__(self, **kwarg) # trigger preparation @@ -57,16 +60,17 @@ def trigger_preparation(self, jobspec): # local access path accPath = mover_utils.construct_file_path(self.localBasePath, inFileInfo[tmpFileSpec.lfn]['scope'], tmpFileSpec.lfn) - # check if already exits - if os.path.exists(accPath): - # calculate checksum - checksum = core_utils.calc_adler32(accPath) - checksum = 'ad:{0}'.format(checksum) - if checksum == inFileInfo[tmpFileSpec.lfn]['checksum']: - continue - # make directories if needed - if not os.path.isdir(os.path.dirname(accPath)): - os.makedirs(os.path.dirname(accPath)) + if self.checkLocalPath: + # check if already exits + if os.path.exists(accPath): + # calculate checksum + checksum = core_utils.calc_adler32(accPath) + checksum = 'ad:{0}'.format(checksum) + if checksum == inFileInfo[tmpFileSpec.lfn]['checksum']: + continue + # make directories if needed + if not os.path.isdir(os.path.dirname(accPath)): + os.makedirs(os.path.dirname(accPath)) # make input for globus-url-copy if gucInput is None: gucInput = tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='_guc_in.tmp') diff --git a/pandaharvester/harvesterscripts/file_operation.py b/pandaharvester/harvesterscripts/file_operation.py index 80d2549e..e44815a3 100644 --- a/pandaharvester/harvesterscripts/file_operation.py +++ b/pandaharvester/harvesterscripts/file_operation.py @@ -8,6 +8,8 @@ import sys import argparse import zlib +import tempfile +import re #=== Command functions ========================================================= @@ -31,6 +33,20 @@ def adler32(arguments): retVal = hex(val)[2:10].zfill(8).lower() print(retVal) +# write data into a temporary file; return the file name +def write_tmpfile(arguments): + tmpArgFile = tempfile.NamedTemporaryFile(mode='w', delete=False, suffix=arguments.suffix, + dir=arguments.dir) + in_data = re.sub(r'\\\\', r'\\', arguments.data) + in_data = re.sub(r'\\n', '\n', in_data) + tmpArgFile.write(in_data) + tmpArgFile.close() + print(tmpArgFile.name) + +# remove file +def remove_file(arguments): + os.remove(arguments.path) + #=== Command map =============================================================== commandMap = { @@ -38,13 +54,15 @@ def adler32(arguments): 'test': test, # adler32 commands 'adler32': adler32, + 'write_tmpfile': write_tmpfile, + 'remove_file': remove_file, } #=== Main ====================================================================== def main(): # main parser - oparser = argparse.ArgumentParser(prog='harvester-admin', add_help=True) + oparser = argparse.ArgumentParser(prog='file_operations.py', add_help=True) subparsers = oparser.add_subparsers() # test command @@ -54,6 +72,16 @@ def main(): adler32_parser = subparsers.add_parser('adler32', help='get adler32 checksum of the file') adler32_parser.set_defaults(which='adler32') adler32_parser.add_argument('file', type=str, action='store', metavar='', help='file path') + # write_tmpfile command + write_tmpfile_parser = subparsers.add_parser('write_tmpfile', help='write data to a temporary file') + write_tmpfile_parser.set_defaults(which='write_tmpfile') + write_tmpfile_parser.add_argument('--suffix', type=str, action='store', metavar='', default='xxx.tmp', help='name suffix of temporary file') + write_tmpfile_parser.add_argument('--dir', type=str, action='store', metavar='', default='/tmp', help='directory of temorary file') + write_tmpfile_parser.add_argument('data', type=str, action='store', metavar='', help='data to write in temporary file') + # remove_file command + remove_file_parser = subparsers.add_parser('remove_file', help='remove a file') + remove_file_parser.set_defaults(which='remove_file') + remove_file_parser.add_argument('path', type=str, action='store', metavar='', help='file path') # start parsing if len(sys.argv) == 1: diff --git a/pandaharvester/harvesterscripts/harvester_admin.py b/pandaharvester/harvesterscripts/harvester_admin.py index b77bd411..94158280 100644 --- a/pandaharvester/harvesterscripts/harvester_admin.py +++ b/pandaharvester/harvesterscripts/harvester_admin.py @@ -196,6 +196,7 @@ def qconf_list(arguments): def qconf_refresh(arguments): from pandaharvester.harvestercore.queue_config_mapper import QueueConfigMapper qcm = QueueConfigMapper() + qcm._update_last_reload_time() qcm.lastUpdate = None qcm.load_data() @@ -233,6 +234,29 @@ def qconf_purge(arguments): else: mainLogger.critical('Failed to purge {0} . See panda-db_proxy.log'.format(queueName)) +def kill_workers(arguments): + status_in = 'ALL' if (len(arguments.status) == 1 and arguments.status[0] == 'ALL') else arguments.status + computingSite_in = 'ALL' if (len(arguments.sites) == 1 and arguments.sites[0] == 'ALL') else arguments.sites + computingElement_in = 'ALL' if (len(arguments.ces) == 1 and arguments.ces[0] == 'ALL') else arguments.ces + submissionHost_in = 'ALL' if (len(arguments.submissionhosts) == 1 and arguments.submissionhosts[0] == 'ALL') else arguments.submissionhosts + dbProxy = DBProxy() + retVal = dbProxy.kill_workers_by_query({'status': status_in, + 'computingSite': computingSite_in, + 'computingElement': computingElement_in, + 'submissionHost': submissionHost_in}) + if retVal is not None: + msg_temp = ( + 'Sweeper will soon kill {n_workers} workers, with ' + 'status in {status_in}, ' + 'computingSite in {computingSite_in}, ' + 'computingElement in {computingElement_in}, ' + 'submissionHost in {submissionHost_in}' + ) + print(msg_temp.format(n_workers=retVal, status_in=status_in, computingSite_in=computingSite_in, + computingElement_in=computingElement_in, submissionHost_in=submissionHost_in)) + else: + mainLogger.critical('Failed to kill workers. See panda-db_proxy.log') + #=== Command map ======================================================= commandMap = { @@ -250,6 +274,8 @@ def qconf_purge(arguments): 'qconf_dump': qconf_dump, 'qconf_refresh': qconf_refresh, 'qconf_purge': qconf_purge, + # kill commands + 'kill_workers': kill_workers, } #=== Main ====================================================== @@ -305,7 +331,16 @@ def main(): qconf_purge_parser = qconf_subparsers.add_parser('purge', help='Purge the queue thoroughly from harvester DB (Be careful !!)') qconf_purge_parser.set_defaults(which='qconf_purge') qconf_purge_parser.add_argument('queue', type=str, action='store', metavar='', help='Name of panda queue to purge') - + # kill parser + kill_parser = subparsers.add_parser('kill', help='kill something alive') + kill_subparsers = kill_parser.add_subparsers() + # kill workers command + kill_workers_parser = kill_subparsers.add_parser('workers', help='Kill active workers by query') + kill_workers_parser.set_defaults(which='kill_workers') + kill_workers_parser.add_argument('--status', nargs='+', dest='status', action='store', metavar='', default=['submitted'], help='worker status (only "submitted", "idle", "running" are valid)') + kill_workers_parser.add_argument('--sites', nargs='+', dest='sites', action='store', metavar='', required=True, help='site (computingSite); "ALL" for all sites') + kill_workers_parser.add_argument('--ces', nargs='+', dest='ces', action='store', metavar='', required=True, help='CE (computingElement); "ALL" for all CEs') + kill_workers_parser.add_argument('--submissionhosts', nargs='+', dest='submissionhosts', action='store', metavar='', required=True, help='submission host (submissionHost); "ALL" for all submission hosts') # start parsing if len(sys.argv) == 1: diff --git a/pandaharvester/harvesterstager/gridftp_stager.py b/pandaharvester/harvesterstager/gridftp_stager.py index c93eca80..b2678551 100644 --- a/pandaharvester/harvesterstager/gridftp_stager.py +++ b/pandaharvester/harvesterstager/gridftp_stager.py @@ -31,6 +31,10 @@ "maxAttempts": 3, # options for globus-url-copy "gulOpts":"-verify-checksum -v" + # A list of base paths of intermediate locations: [ p1, p2, p3, ..., pn ] . The transfers will be done in serial: srcPath -> p1, p1 ->p2, ..., pn -> dstPath + # where p1 can be a path (e.g. "gsiftp://..."), or a list of incoming and outgoing paths (if different, say due to FS mount) of the same files (e.g. ["file:///data/abc", "gsiftp://dtn.server//data/abc"]) + # If ommited, direct transfer srcPath -> dstPath occurs + "intermediateBasePaths":[ ["file:///nfs/at3/scratch/at3sgm001/", "gsiftp://some.dtn.server//nfs/at3/scratch/at3sgm001/"], "gsiftp://another.dtn.server//scratch/data/" ] } """ class GridFtpStager(BaseStager): @@ -42,6 +46,7 @@ def __init__(self, **kwarg): self.gulOpts = None self.maxAttempts = 3 self.timeout = None + self.intermediateBasePaths = None BaseStager.__init__(self, **kwarg) # check status @@ -60,6 +65,8 @@ def trigger_stage_out(self, jobspec): tmpLog.debug('start') # loop over all files gucInput = None + is_multistep = isinstance(self.intermediateBasePaths, list) and len(self.intermediateBasePaths) > 0 + guc_inputs_list = [None] * (len(self.intermediateBasePaths) + 1) if is_multistep else [] for fileSpec in jobspec.outFiles: # skip if already done if fileSpec.status in ['finished', 'failed']: @@ -74,54 +81,134 @@ def trigger_stage_out(self, jobspec): # construct source and destination paths srcPath = re.sub(self.srcOldBasePath, self.srcNewBasePath, fileSpec.path) dstPath = mover_utils.construct_file_path(self.dstBasePath, scope, fileSpec.lfn) - # make input for globus-url-copy - if gucInput is None: - gucInput = tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='_guc_out.tmp') - gucInput.write("{0} {1}\n".format(srcPath, dstPath)) + # make tempfiles of paths to transfer + if is_multistep: + # multi-step transfer + for ibp_i in range(len(self.intermediateBasePaths) + 1): + base_paths_old = self.intermediateBasePaths[ibp_i - 1] if ibp_i > 0 else '' + base_paths_new = self.intermediateBasePaths[ibp_i] if ibp_i < len(self.intermediateBasePaths) else '' + src_base = base_paths_old[1] if isinstance(base_paths_old, list) else base_paths_old + dst_base = base_paths_new[0] if isinstance(base_paths_new, list) else base_paths_new + # construct temporary source and destination paths + tmp_src_path = re.sub(self.srcNewBasePath, src_base, srcPath) + tmp_dest_path = re.sub(self.srcNewBasePath, dst_base, srcPath) + # make input for globus-url-copy + if guc_inputs_list[ibp_i] is None: + guc_inputs_list[ibp_i] = tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='_guc_out_{0}.tmp'.format(ibp_i)) + guc_input = guc_inputs_list[ibp_i] + if ibp_i == 0: + guc_input.write("{0} {1}\n".format(srcPath, tmp_dest_path)) + tmpLog.debug("step {0}: {1} {2}".format(ibp_i + 1, srcPath, tmp_dest_path)) + elif ibp_i == len(self.intermediateBasePaths): + guc_input.write("{0} {1}\n".format(tmp_src_path, dstPath)) + tmpLog.debug("step {0}: {1} {2}".format(ibp_i + 1, tmp_src_path, dstPath)) + else: + guc_input.write("{0} {1}\n".format(tmp_src_path, tmp_dest_path)) + tmpLog.debug("step {0}: {1} {2}".format(ibp_i + 1, tmp_src_path, tmp_dest_path)) + else: + # single-step transfer + # make input for globus-url-copy + if gucInput is None: + gucInput = tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='_guc_out.tmp') + gucInput.write("{0} {1}\n".format(srcPath, dstPath)) fileSpec.attemptNr += 1 # nothing to transfer - if gucInput is None: - tmpLog.debug('done with no transfers') - return True, '' + if is_multistep: + for guc_input in guc_inputs_list: + if guc_input is None: + tmpLog.debug('done with no transfers (multistep)') + return True, '' + else: + if gucInput is None: + tmpLog.debug('done with no transfers') + return True, '' # transfer - gucInput.close() - args = ['globus-url-copy', '-f', gucInput.name, '-cd'] - if self.gulOpts is not None: - args += self.gulOpts.split() - try: - tmpLog.debug('execute: ' + ' '.join(args)) - p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - try: - stdout, stderr = p.communicate(timeout=self.timeout) - except subprocess.TimeoutExpired: - p.kill() - stdout, stderr = p.communicate() - tmpLog.warning('command timeout') - return_code = p.returncode - if stdout is not None: - if not isinstance(stdout, str): - stdout = stdout.decode() - stdout = stdout.replace('\n', ' ') - if stderr is not None: - if not isinstance(stderr, str): - stderr = stderr.decode() - stderr = stderr.replace('\n', ' ') - tmpLog.debug("stdout: %s" % stdout) - tmpLog.debug("stderr: %s" % stderr) - except Exception: - core_utils.dump_error_message(tmpLog) - return_code = 1 - os.remove(gucInput.name) - if return_code == 0: - tmpLog.debug('succeeded') + if is_multistep: + [ guc_input.close() for guc_input in guc_inputs_list ] + tmpLog.debug('start multistep transfer') + guc_input_i = 1 + for guc_input in guc_inputs_list: + args = ['globus-url-copy', '-f', guc_input.name, '-cd'] + if self.gulOpts is not None: + args += self.gulOpts.split() + try: + tmpLog.debug('execute: ' + ' '.join(args)) + p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + try: + stdout, stderr = p.communicate(timeout=self.timeout) + except subprocess.TimeoutExpired: + p.kill() + stdout, stderr = p.communicate() + tmpLog.warning('command timeout') + return_code = p.returncode + if stdout is not None: + if not isinstance(stdout, str): + stdout = stdout.decode() + stdout = stdout.replace('\n', ' ') + if stderr is not None: + if not isinstance(stderr, str): + stderr = stderr.decode() + stderr = stderr.replace('\n', ' ') + tmpLog.debug("stdout: %s" % stdout) + tmpLog.debug("stderr: %s" % stderr) + except Exception: + core_utils.dump_error_message(tmpLog) + return_code = 1 + os.remove(guc_input.name) + if return_code == 0: + tmpLog.debug('step {0} succeeded'.format(guc_input_i)) + guc_input_i += 1 + else: + errMsg = 'step {0} failed with {1}'.format(guc_input_i, return_code) + tmpLog.error(errMsg) + # check attemptNr + for fileSpec in jobspec.inFiles: + if fileSpec.attemptNr >= self.maxAttempts: + errMsg = 'gave up due to max attempts' + tmpLog.error(errMsg) + return (False, errMsg) + return None, errMsg + tmpLog.debug('multistep transfer ({0} steps) succeeded'.format(len(guc_inputs_list))) return True, '' else: - errMsg = 'failed with {0}'.format(return_code) - tmpLog.error(errMsg) - # check attemptNr - for fileSpec in jobspec.inFiles: - if fileSpec.attemptNr >= self.maxAttempts: - errMsg = 'gave up due to max attempts' - tmpLog.error(errMsg) - return (False, errMsg) - return None, errMsg + gucInput.close() + args = ['globus-url-copy', '-f', gucInput.name, '-cd'] + if self.gulOpts is not None: + args += self.gulOpts.split() + try: + tmpLog.debug('execute: ' + ' '.join(args)) + p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + try: + stdout, stderr = p.communicate(timeout=self.timeout) + except subprocess.TimeoutExpired: + p.kill() + stdout, stderr = p.communicate() + tmpLog.warning('command timeout') + return_code = p.returncode + if stdout is not None: + if not isinstance(stdout, str): + stdout = stdout.decode() + stdout = stdout.replace('\n', ' ') + if stderr is not None: + if not isinstance(stderr, str): + stderr = stderr.decode() + stderr = stderr.replace('\n', ' ') + tmpLog.debug("stdout: %s" % stdout) + tmpLog.debug("stderr: %s" % stderr) + except Exception: + core_utils.dump_error_message(tmpLog) + return_code = 1 + os.remove(gucInput.name) + if return_code == 0: + tmpLog.debug('succeeded') + return True, '' + else: + errMsg = 'failed with {0}'.format(return_code) + tmpLog.error(errMsg) + # check attemptNr + for fileSpec in jobspec.inFiles: + if fileSpec.attemptNr >= self.maxAttempts: + errMsg = 'gave up due to max attempts' + tmpLog.error(errMsg) + return (False, errMsg) + return None, errMsg diff --git a/pandaharvester/harvestersubmitter/htcondor_submitter.py b/pandaharvester/harvestersubmitter/htcondor_submitter.py index 0bf5088b..063435d4 100644 --- a/pandaharvester/harvestersubmitter/htcondor_submitter.py +++ b/pandaharvester/harvestersubmitter/htcondor_submitter.py @@ -262,9 +262,9 @@ def submit_bag_of_workers(data_list): jdl_list = [ val[1] for val in val_list ] # condor job submit object tmpLog.debug('submitting to submissionHost={0}'.format(host)) - condor_job_submit = CondorJobSubmit(id=host) # submit try: + condor_job_submit = CondorJobSubmit(id=host) batchIDs_list, ret_err_str = condor_job_submit.submit(jdl_list, use_spool=use_spool) except Exception as e: batchIDs_list = None diff --git a/pandaharvester/harvestersubmitter/slurm_submitter.py b/pandaharvester/harvestersubmitter/slurm_submitter.py index ad0ca5dd..42e91468 100644 --- a/pandaharvester/harvestersubmitter/slurm_submitter.py +++ b/pandaharvester/harvestersubmitter/slurm_submitter.py @@ -47,9 +47,11 @@ def submit_workers(self, workspec_list): stdOut, stdErr = p.communicate() retCode = p.returncode tmpLog.debug('retCode={0}'.format(retCode)) + stdOut_str = stdOut if (isinstance(stdOut, str) or stdOut is None) else stdOut.decode() + stdErr_str = stdErr if (isinstance(stdErr, str) or stdErr is None) else stdErr.decode() if retCode == 0: # extract batchID - workSpec.batchID = re.search('[^0-9]*([0-9]+)[^0-9]*', '{0}'.format(stdOut)).group(1) + workSpec.batchID = re.search('[^0-9]*([0-9]+)[^0-9]*', '{0}'.format(stdOut_str)).group(1) tmpLog.debug('batchID={0}'.format(workSpec.batchID)) # set log files if self.uploadLog: @@ -65,7 +67,7 @@ def submit_workers(self, workspec_list): tmpRetVal = (True, '') else: # failed - errStr = '{0} {1}'.format(stdOut, stdErr) + errStr = '{0} {1}'.format(stdOut_str, stdErr_str) tmpLog.error(errStr) tmpRetVal = (False, errStr) retList.append(tmpRetVal) diff --git a/pandaharvester/harvestersweeper/htcondor_sweeper.py b/pandaharvester/harvestersweeper/htcondor_sweeper.py index 06e1589b..7cc30daa 100644 --- a/pandaharvester/harvestersweeper/htcondor_sweeper.py +++ b/pandaharvester/harvestersweeper/htcondor_sweeper.py @@ -92,8 +92,8 @@ def kill_workers(self, workspec_list): retList = [] # Kill for submissionHost, batchIDs_list in six.iteritems(get_host_batchid_map(workspec_list)): - condor_job_manage = CondorJobManage(id=submissionHost) try: + condor_job_manage = CondorJobManage(id=submissionHost) ret_map = condor_job_manage.remove(batchIDs_list) except Exception as e: ret_map = {} diff --git a/pandaharvester/harvesterzipper/base_zipper.py b/pandaharvester/harvesterzipper/base_zipper.py index 1c42886a..11a4f95e 100644 --- a/pandaharvester/harvesterzipper/base_zipper.py +++ b/pandaharvester/harvesterzipper/base_zipper.py @@ -54,7 +54,10 @@ def simple_zip_output(self, jobspec, tmp_log): argDictList.append(argDict) # parallel execution try: - nThreadsForZip = harvester_config.stager.nThreadsForZip + if hasattr(harvester_config, 'zipper'): + nThreadsForZip = harvester_config.zipper.nThreadsForZip + else: + nThreadsForZip = harvester_config.stager.nThreadsForZip except Exception: nThreadsForZip = multiprocessing.cpu_count() with Pool(max_workers=nThreadsForZip) as pool: @@ -149,23 +152,47 @@ def ssh_zip_output(self, jobspec, tmp_log): outFiles_list = list(jobspec.outFiles) try: try: - nThreadsForZip = harvester_config.stager.nThreadsForZip + if hasattr(harvester_config, 'zipper'): + nThreadsForZip = harvester_config.zipper.nThreadsForZip + else: + nThreadsForZip = harvester_config.stager.nThreadsForZip except Exception: nThreadsForZip = multiprocessing.cpu_count() # check associate file existence def _check_assfile_existence(fileSpec): - # ass_file_paths_str = ' '.join([ assFileSpec.path for assFileSpec in fileSpec.associatedFiles ]) - # tmpfile over shared fs - tmpArgFile = tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='_check-exist.tmp', - dir=os.path.dirname(next(iter(fileSpec.associatedFiles)).path)) - for assFileSpec in fileSpec.associatedFiles: - tmpArgFile.write('{0}\n'.format(assFileSpec.path)) - tmpArgFile.close() + in_data = '\\n'.join(['{0}'.format(assFileSpec.path) for assFileSpec in fileSpec.associatedFiles]) + com1 = ('ssh ' + '-o StrictHostKeyChecking=no ' + '-i {sshkey} ' + '{userhost} ' + '"{fileop_script} write_tmpfile --suffix {suffix} --dir {dir} \\"{data}\\" "' + ).format( + sshkey=self.sshkey, + userhost=self.userhost, + fileop_script=self.fileop_script, + suffix='_check-exist.tmp', + dir=os.path.dirname(next(iter(fileSpec.associatedFiles)).path), + data=in_data, + ) + # execute + p1 = subprocess.Popen(com1, + shell=True, + close_fds=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + stdOut, stdErr = p1.communicate() + retCode = p1.returncode + if retCode != 0: + msgStr = 'failed to make tmpargfile remotely with {0}:{1}'.format(stdOut, stdErr) + tmp_log.error(msgStr) + return False, 'failed to zip with {0}'.format(msgStr) + stdOut_str = stdOut if (isinstance(stdOut, str) or stdOut is None) else stdOut.decode() + tmpargfile_name = stdOut_str.strip('\n') + del p1, stdOut, stdErr # record set existence_set = set() # make command - # '"for i in $(cat {arg_file}); do test -f $i && echo \'T\' || echo \'F\'; done" ' - com = ( 'ssh ' + com2 = ( 'ssh ' '-o StrictHostKeyChecking=no ' '-i {sshkey} ' '{userhost} ' @@ -173,22 +200,23 @@ def _check_assfile_existence(fileSpec): ).format( sshkey=self.sshkey, userhost=self.userhost, - arg_file=tmpArgFile.name, + arg_file=tmpargfile_name, ) # execute - p = subprocess.Popen(com, + p2 = subprocess.Popen(com2, shell=True, close_fds=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - stdOut, stdErr = p.communicate() - retCode = p.returncode + stdOut, stdErr = p2.communicate() + retCode = p2.returncode if retCode != 0: msgStr = 'failed to existence of associate files with {0}:{1}'.format(stdOut, stdErr) tmp_log.error(msgStr) else: try: - ret_list = stdOut.strip('\n').split('\n') + stdOut_str = stdOut if (isinstance(stdOut, str) or stdOut is None) else stdOut.decode() + ret_list = stdOut_str.strip('\n').split('\n') if len(fileSpec.associatedFiles) == len(ret_list): for (assFileSpec, retVal) in zip(fileSpec.associatedFiles, ret_list): if retVal == 'T': @@ -198,8 +226,31 @@ def _check_assfile_existence(fileSpec): tmp_log.error(msgStr) except Exception: core_utils.dump_error_message(tmp_log) - os.remove(tmpArgFile.name) - del p, stdOut, stdErr + del p2, stdOut, stdErr, com2 + # delete tmpargfile + com3 = ('ssh ' + '-o StrictHostKeyChecking=no ' + '-i {sshkey} ' + '{userhost} ' + '"{fileop_script} remove_file {file_path} "' + ).format( + sshkey=self.sshkey, + userhost=self.userhost, + fileop_script=self.fileop_script, + file_path=tmpargfile_name, + ) + # execute + p3 = subprocess.Popen(com3, + shell=True, + close_fds=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + stdOut, stdErr = p3.communicate() + retCode = p3.returncode + if retCode != 0: + msgStr = 'failed to delete tmpargfile remotely with {0}:{1}'.format(stdOut, stdErr) + tmp_log.error(msgStr) + del p3, stdOut, stdErr gc.collect() return existence_set # parallel execution of check existence @@ -258,12 +309,35 @@ def ssh_make_one_zip(self, arg_dict): lfn = os.path.basename(zipPath) self.zip_tmp_log.debug('{0} start zipPath={1} with {2} files'.format(lfn, zipPath, len(arg_dict['associatedFiles']))) - # tmp arg file - tmpArgFile = tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='_tar-name.tmp', - dir=os.path.dirname(zipPath)) - for path in arg_dict['associatedFiles']: - tmpArgFile.write('{0}\n'.format(path)) - tmpArgFile.close() + in_data = '\\n'.join(['{0}'.format(path) for path in arg_dict['associatedFiles']]) + com0 = ('ssh ' + '-o StrictHostKeyChecking=no ' + '-i {sshkey} ' + '{userhost} ' + '"{fileop_script} write_tmpfile --suffix {suffix} --dir {dir} \\"{data}\\" "' + ).format( + sshkey=self.sshkey, + userhost=self.userhost, + fileop_script=self.fileop_script, + suffix='_tar-name.tmp', + dir=os.path.dirname(zipPath), + data=in_data, + ) + # execute + p0 = subprocess.Popen(com0, + shell=True, + close_fds=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + stdOut, stdErr = p0.communicate() + retCode = p0.returncode + if retCode != 0: + msgStr = 'failed to make tmpargfile remotely with {0}:{1}'.format(stdOut, stdErr) + tmp_log.error(msgStr) + return False, 'failed to zip with {0}'.format(msgStr) + stdOut_str = stdOut if (isinstance(stdOut, str) or stdOut is None) else stdOut.decode() + tmpargfile_name = stdOut_str.strip('\n') + del p0, stdOut, stdErr # tmp zip file names tmpZipPath = zipPath + '.' + str(uuid.uuid4()) com1 = ('ssh ' @@ -275,7 +349,7 @@ def ssh_make_one_zip(self, arg_dict): sshkey=self.sshkey, userhost=self.userhost, tmpZipPath=tmpZipPath, - arg_file=tmpArgFile.name, + arg_file=tmpargfile_name, ) # execute p1 = subprocess.Popen(com1, @@ -290,8 +364,31 @@ def ssh_make_one_zip(self, arg_dict): self.zip_tmp_log.error(msgStr) return None, msgStr, {} del p1, stdOut, stdErr + # delete tmpargfile + com1a = ('ssh ' + '-o StrictHostKeyChecking=no ' + '-i {sshkey} ' + '{userhost} ' + '"{fileop_script} remove_file {file_path} "' + ).format( + sshkey=self.sshkey, + userhost=self.userhost, + fileop_script=self.fileop_script, + file_path=tmpargfile_name, + ) + # execute + p1a = subprocess.Popen(com1a, + shell=True, + close_fds=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + stdOut, stdErr = p1a.communicate() + retCode = p1a.returncode + if retCode != 0: + msgStr = 'failed to delete tmpargfile remotely with {0}:{1}'.format(stdOut, stdErr) + tmp_log.error(msgStr) + del p1a, stdOut, stdErr gc.collect() - os.remove(tmpArgFile.name) # avoid overwriting lockName = 'zip.lock.{0}'.format(lfn) lockInterval = 60 @@ -333,8 +430,6 @@ def ssh_make_one_zip(self, arg_dict): fileInfo = dict() fileInfo['path'] = zipPath # get size - # statInfo = os.stat(zipPath) - # fileInfo['fsize'] = statInfo.st_size com3 = ('ssh ' '-o StrictHostKeyChecking=no ' '-i {sshkey} ' @@ -357,12 +452,12 @@ def ssh_make_one_zip(self, arg_dict): self.zip_tmp_log.error(msgStr) return None, msgStr, {} else: - file_size = int(stdOut.strip('\n')) + stdOut_str = stdOut if (isinstance(stdOut, str) or stdOut is None) else stdOut.decode() + file_size = int(stdOut_str.strip('\n')) fileInfo['fsize'] = file_size del p3, stdOut, stdErr gc.collect() # get checksum - # fileInfo['chksum'] = core_utils.calc_adler32(zipPath) com4 = ('ssh ' '-o StrictHostKeyChecking=no ' '-i {sshkey} ' @@ -386,7 +481,8 @@ def ssh_make_one_zip(self, arg_dict): self.zip_tmp_log.error(msgStr) return None, msgStr, {} else: - file_chksum = stdOut.strip('\n') + stdOut_str = stdOut if (isinstance(stdOut, str) or stdOut is None) else stdOut.decode() + file_chksum = stdOut_str.strip('\n') fileInfo['chksum'] = file_chksum del p4, stdOut, stdErr gc.collect() diff --git a/pandaharvester/panda_pkg_info.py b/pandaharvester/panda_pkg_info.py index c493c9db..d0efaf92 100644 --- a/pandaharvester/panda_pkg_info.py +++ b/pandaharvester/panda_pkg_info.py @@ -1 +1 @@ -release_version = "0.1.4" +release_version = "0.1.5"