diff --git a/.github/workflows/docker-publish.yml b/.github/workflows/docker-publish.yml new file mode 100644 index 00000000..379354dd --- /dev/null +++ b/.github/workflows/docker-publish.yml @@ -0,0 +1,89 @@ +name: Docker + +# This workflow uses actions that are not certified by GitHub. +# They are provided by a third-party and are governed by +# separate terms of service, privacy policy, and support +# documentation. + +on: + release: + types: [published] + + workflow_dispatch: + +env: + # Use docker.io for Docker Hub if empty + REGISTRY: ghcr.io + # github.repository as / + IMAGE_NAME: ${{ github.repository }} + + +jobs: + build: + + runs-on: ubuntu-latest + permissions: + contents: read + packages: write + # This is used to complete the identity challenge + # with sigstore/fulcio when running outside of PRs. + id-token: write + + steps: + - name: Checkout repository + uses: actions/checkout@v2 + + # Install the cosign tool except on PR + # https://github.com/sigstore/cosign-installer + - name: Install cosign + if: github.event_name != 'pull_request' + uses: sigstore/cosign-installer@1e95c1de343b5b0c23352d6417ee3e48d5bcd422 + with: + cosign-release: 'v1.4.0' + + + # Workaround: https://github.com/docker/build-push-action/issues/461 + - name: Setup Docker buildx + uses: docker/setup-buildx-action@79abd3f86f79a9d68a23c75a09a9a85889262adf + + # Login against a Docker registry except on PR + # https://github.com/docker/login-action + - name: Log into registry ${{ env.REGISTRY }} + if: github.event_name != 'pull_request' + uses: docker/login-action@28218f9b04b4f3f62068d7b6ce6ca5b26e35336c + with: + registry: ${{ env.REGISTRY }} + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + + # Extract metadata (tags, labels) for Docker + # https://github.com/docker/metadata-action + - name: Extract Docker metadata + id: meta + uses: docker/metadata-action@98669ae865ea3cffbcbaa878cf57c20bbf1c6c38 + with: + images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }} + + # Build and push Docker image with Buildx (don't push on PR) + # https://github.com/docker/build-push-action + - name: Build and push Docker image + id: build-and-push + uses: docker/build-push-action@ad44023a93711e3deb337508980b4b5e9bcdc5dc + with: + context: . + push: ${{ github.event_name != 'pull_request' }} + tags: ${{ steps.meta.outputs.tags }} + labels: ${{ steps.meta.outputs.labels }} + + # Sign the resulting Docker image digest except on PRs. + # This will only write to the public Rekor transparency log when the Docker + # repository is public to avoid leaking data. If you would like to publish + # transparency data even for private images, pass --force to cosign below. + # https://github.com/sigstore/cosign + #- name: Sign the published Docker image + # if: ${{ github.event_name != 'pull_request' }} + # env: + # COSIGN_EXPERIMENTAL: "true" + # # This step uses the identity token to provision an ephemeral certificate + # # against the sigstore community Fulcio instance. + # run: cosign sign ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}@${{ steps.build-and-push.outputs.digest }} diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 00000000..4a4c4014 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,33 @@ +FROM docker.io/centos:7 + +RUN yum update -y +RUN yum install -y epel-release +RUN yum install -y python3 python3-devel gcc less git mysql-devel curl + +RUN curl -fsSL https://get.htcondor.org | /bin/bash -s -- --no-dry-run + +RUN python3 -m venv /opt/harvester +RUN /opt/harvester/bin/pip install -U pip +RUN /opt/harvester/bin/pip install -U setuptools +RUN /opt/harvester/bin/pip install -U mysqlclient uWSGI +RUN /opt/harvester/bin/pip install git+git://github.com/HSF/harvester.git + +RUN mv /opt/harvester/etc/sysconfig/panda_harvester.rpmnew.template /opt/harvester/etc/sysconfig/panda_harvester +RUN mv /opt/harvester/etc/panda/panda_common.cfg.rpmnew /opt/harvester/etc/panda/panda_common.cfg +RUN mv /opt/harvester/etc/panda/panda_harvester.cfg.rpmnew.template /opt/harvester/etc/panda/panda_harvester.cfg +RUN mv /opt/harvester/etc/panda/panda_harvester-uwsgi.ini.rpmnew.template /opt/harvester/etc/panda/panda_harvester-uwsgi.ini +RUN mv /opt/harvester/etc/rc.d/init.d/panda_harvester-uwsgi.rpmnew.template /opt/harvester/etc/rc.d/init.d/panda_harvester-uwsgi + +RUN ln -fs /opt/harvester/etc/queue_config/panda_queueconfig.json /opt/harvester/etc/panda/panda_queueconfig.json + +RUN adduser atlpan +RUN groupadd zp +RUN usermod -a -G zp atlpan + +RUN mkdir -p /var/log/panda +RUN chown -R atlpan:zp /var/log/panda + +RUN mkdir -p /data/harvester +RUN chown -R atlpan:zp /data/harvester + +CMD exec /bin/bash -c "trap : TERM INT; sleep infinity & wait" diff --git a/pandaharvester/commit_timestamp.py b/pandaharvester/commit_timestamp.py index 067b92a7..9d119f68 100644 --- a/pandaharvester/commit_timestamp.py +++ b/pandaharvester/commit_timestamp.py @@ -1 +1 @@ -timestamp = "17-09-2021 07:05:31 on release (by mightqxc)" +timestamp = "16-02-2022 08:11:12 on release (by mightqxc)" diff --git a/pandaharvester/harvesterbody/monitor.py b/pandaharvester/harvesterbody/monitor.py index d4750199..cb300106 100644 --- a/pandaharvester/harvesterbody/monitor.py +++ b/pandaharvester/harvesterbody/monitor.py @@ -182,19 +182,29 @@ def run(self): while time.time() < last_fifo_cycle_timestamp + fifoCheckDuration: sw.reset() n_loops += 1 - retVal, overhead_time = monitor_fifo.to_check_workers() + try: + retVal, overhead_time = monitor_fifo.to_check_workers() + except Exception as e: + mainLog.error('failed to check workers from FIFO: {0}'.format(e)) if overhead_time is not None: n_chunk_peeked_stat += 1 sum_overhead_time_stat += overhead_time if retVal: # check fifo size - fifo_size = monitor_fifo.size() - mainLog.debug('FIFO size is {0}'.format(fifo_size)) + try: + fifo_size = monitor_fifo.size() + mainLog.debug('FIFO size is {0}'.format(fifo_size)) + except Exception as e: + mainLog.error('failed to get size of FIFO: {0}'.format(e)) + time.sleep(2) + continue mainLog.debug('starting run with FIFO') try: obj_gotten = monitor_fifo.get(timeout=1, protective=fifoProtectiveDequeue) except Exception as errStr: mainLog.error('failed to get object from FIFO: {0}'.format(errStr)) + time.sleep(2) + continue else: if obj_gotten is not None: sw_fifo = core_utils.get_stopwatch() @@ -299,7 +309,10 @@ def run(self): mainLog.error('failed to put object from FIFO head: {0}'.format(errStr)) # delete protective dequeued objects if fifoProtectiveDequeue and len(obj_dequeued_id_list) > 0: - monitor_fifo.delete(ids=obj_dequeued_id_list) + try: + monitor_fifo.delete(ids=obj_dequeued_id_list) + except Exception as e: + mainLog.error('failed to delete object from FIFO: {0}'.format(e)) mainLog.debug('put {0} worker chunks into FIFO'.format(n_chunk_put) + sw.get_elapsed_time()) # adjust adjusted_sleepTime if n_chunk_peeked_stat > 0 and sum_overhead_time_stat > sleepTime: @@ -655,7 +668,7 @@ def check_workers(self, mon_core, messenger, all_workers, queue_config, tmp_log, timeNow - workSpec.checkTime > datetime.timedelta(seconds=checkTimeout): # kill due to timeout tmp_log.debug('kill workerID={0} due to consecutive check failures'.format(workerID)) - self.dbProxy.kill_worker(workSpec.workerID) + self.dbProxy.mark_workers_to_kill_by_workerids([workSpec.workerID]) newStatus = WorkSpec.ST_cancelled diagMessage = 'Killed by Harvester due to consecutive worker check failures. ' + diagMessage workSpec.set_pilot_error(PilotErrors.FAILEDBYSERVER, diagMessage) @@ -665,13 +678,13 @@ def check_workers(self, mon_core, messenger, all_workers, queue_config, tmp_log, # request kill if messenger.kill_requested(workSpec): tmp_log.debug('kill workerID={0} as requested'.format(workerID)) - self.dbProxy.kill_worker(workSpec.workerID) + self.dbProxy.mark_workers_to_kill_by_workerids([workSpec.workerID]) # stuck queuing for too long if workSpec.status == WorkSpec.ST_submitted \ and timeNow > workSpec.submitTime + datetime.timedelta(seconds=workerQueueTimeLimit): tmp_log.debug('kill workerID={0} due to queuing longer than {1} seconds'.format( workerID, workerQueueTimeLimit)) - self.dbProxy.kill_worker(workSpec.workerID) + self.dbProxy.mark_workers_to_kill_by_workerids([workSpec.workerID]) diagMessage = 'Killed by Harvester due to worker queuing too long. ' + diagMessage workSpec.set_pilot_error(PilotErrors.FAILEDBYSERVER, diagMessage) # set closed @@ -689,9 +702,8 @@ def check_workers(self, mon_core, messenger, all_workers, queue_config, tmp_log, if messenger.is_alive(workSpec, worker_heartbeat_limit): tmp_log.debug('heartbeat for workerID={0} is valid'.format(workerID)) else: - tmp_log.debug('heartbeat for workerID={0} expired: sending kill request'.format( - workerID)) - self.dbProxy.kill_worker(workSpec.workerID) + tmp_log.debug('heartbeat for workerID={0} expired: sending kill request'.format(workerID)) + self.dbProxy.mark_workers_to_kill_by_workerids([workSpec.workerID]) diagMessage = 'Killed by Harvester due to worker heartbeat expired. ' + diagMessage workSpec.set_pilot_error(PilotErrors.FAILEDBYSERVER, diagMessage) # get work attributes @@ -725,7 +737,7 @@ def check_workers(self, mon_core, messenger, all_workers, queue_config, tmp_log, newStatus = WorkSpec.ST_idle elif not workSpec.is_post_processed(): if (not queue_config.is_no_heartbeat_status(newStatus) and not queue_config.truePilot) \ - or (hasattr(messenger, 'forcePostProcessing') and messenger.forcePostProcessing): + or (hasattr(messenger, 'forcePostProcessing') and messenger.forcePostProcessing): # post processing unless heartbeat is suppressed jobSpecs = self.dbProxy.get_jobs_with_worker_id(workSpec.workerID, None, True, @@ -783,9 +795,12 @@ def monitor_event_deliverer(self, time_window): def monitor_event_digester(self, locked_by, max_events): tmpLog = self.make_logger(_logger, 'id=monitor-{0}'.format(self.get_pid()), method_name='monitor_event_digester') tmpLog.debug('start') - timeNow_timestamp = time.time() retMap = {} - obj_gotten_list = self.monitor_event_fifo.getmany(mode='first', count=max_events, protective=True) + try: + obj_gotten_list = self.monitor_event_fifo.getmany(mode='first', count=max_events, protective=True) + except Exception as e: + obj_gotten_list = [] + tmpLog.error('monitor_event_fifo excepted with {0}'.format(e)) workerID_list = [ obj_gotten.id for obj_gotten in obj_gotten_list ] tmpLog.debug('got {0} worker events'.format(len(workerID_list))) if len(workerID_list) > 0: @@ -797,8 +812,8 @@ def monitor_event_digester(self, locked_by, max_events): tmpLog.debug('checking workers of queueName={0} configID={1}'.format(*qc_key)) try: retVal = self.monitor_agent_core(locked_by, queueName, workSpecsList, - from_fifo=True, config_id=configID, - check_source='Event') + from_fifo=True, config_id=configID, + check_source='Event') except Exception as e: tmpLog.error('monitor_agent_core excepted with {0}'.format(e)) retVal = None # skip the loop @@ -813,10 +828,17 @@ def monitor_event_disposer(self, event_lifetime, max_events): tmpLog = self.make_logger(_logger, 'id=monitor-{0}'.format(self.get_pid()), method_name='monitor_event_disposer') tmpLog.debug('start') timeNow_timestamp = time.time() - obj_gotten_list = self.monitor_event_fifo.getmany(mode='first', + try: + obj_gotten_list = self.monitor_event_fifo.getmany(mode='first', maxscore=(timeNow_timestamp-event_lifetime), count=max_events, temporary=True) + except Exception as e: + obj_gotten_list = [] + tmpLog.error('monitor_event_fifo excepted with {0}'.format(e)) tmpLog.debug('removed {0} events'.format(len(obj_gotten_list))) - n_events = self.monitor_event_fifo.size() - tmpLog.debug('now {0} events in monitor-event fifo'.format(n_events)) + try: + n_events = self.monitor_event_fifo.size() + tmpLog.debug('now {0} events in monitor-event fifo'.format(n_events)) + except Exception as e: + tmpLog.error('failed to get size of monitor-event fifo: {0}'.format(e)) tmpLog.debug('done') diff --git a/pandaharvester/harvesterbody/propagator.py b/pandaharvester/harvesterbody/propagator.py index df118a52..cca25093 100644 --- a/pandaharvester/harvesterbody/propagator.py +++ b/pandaharvester/harvesterbody/propagator.py @@ -107,7 +107,7 @@ def run(self): tmpRet['command'] = 'tobekilled' # got kill command if 'command' in tmpRet and tmpRet['command'] in ['tobekilled']: - nWorkers = self.dbProxy.kill_workers_with_job(tmpJobSpec.PandaID) + nWorkers = self.dbProxy.mark_workers_to_kill_by_pandaid(tmpJobSpec.PandaID) if nWorkers == 0: # no workers tmpJobSpec.status = 'cancelled' diff --git a/pandaharvester/harvesterbody/sweeper.py b/pandaharvester/harvesterbody/sweeper.py index a96e440d..1aaa912c 100644 --- a/pandaharvester/harvesterbody/sweeper.py +++ b/pandaharvester/harvesterbody/sweeper.py @@ -24,153 +24,179 @@ def __init__(self, queue_config_mapper, single_mode=False): self.dbProxy = DBProxy() self.queueConfigMapper = queue_config_mapper self.pluginFactory = PluginFactory() + self.lockedBy = None + def process_kill_commands(self): + # process commands for marking workers that need to be killed + + tmp_log = self.make_logger(_logger, 'id={0}'.format(self.lockedBy), method_name='process_commands') + + # 1. KILL_WORKER commands that were sent to panda server and forwarded to harvester + stopwatch = core_utils.get_stopwatch() + command_string = CommandSpec.COM_killWorkers + tmp_log.debug('try to get {0} commands'.format(command_string)) + command_specs = self.dbProxy.get_commands_for_receiver('sweeper', command_string) + tmp_log.debug('got {0} {1} commands'.format(len(command_specs), command_string)) + for command_spec in command_specs: + n_to_kill = self.dbProxy.mark_workers_to_kill_by_query(command_spec.params) + tmp_log.debug('will kill {0} workers with {1}'.format(n_to_kill, command_spec.params)) + tmp_log.debug('done handling {0} commands took {1}s'.format(command_string, stopwatch.get_elapsed_time())) + + # 2. SYNC_WORKERS_KILL commands from comparing worker status provided by pilot and harvester + stopwatch = core_utils.get_stopwatch() + command_string = CommandSpec.COM_syncWorkersKill + tmp_log.debug('try to get {0} commands'.format(command_string)) + command_specs = self.dbProxy.get_commands_for_receiver('sweeper', command_string) + tmp_log.debug('got {0} {1} commands'.format(len(command_specs), command_string)) + for command_spec in command_specs: + n_to_kill = self.dbProxy.mark_workers_to_kill_by_workerids(command_spec.params) + tmp_log.debug('will kill {0} workers with {1}'.format(n_to_kill, command_spec.params)) + tmp_log.debug('done handling {0} commands took {1}s'.format(command_string, stopwatch.get_elapsed_time())) # main loop def run(self): - lockedBy = 'sweeper-{0}'.format(self.get_pid()) + self.lockedBy = 'sweeper-{0}'.format(self.get_pid()) while True: sw_main = core_utils.get_stopwatch() - mainLog = self.make_logger(_logger, 'id={0}'.format(lockedBy), method_name='run') - # get commands to kill - sw_getcomm = core_utils.get_stopwatch() - mainLog.debug('try to get commands') - comStr = CommandSpec.COM_killWorkers - commandSpecs = self.dbProxy.get_commands_for_receiver('sweeper', comStr) - mainLog.debug('got {0} {1} commands'.format(len(commandSpecs), comStr)) - for commandSpec in commandSpecs: - n_to_kill = self.dbProxy.kill_workers_by_query(commandSpec.params) - mainLog.debug('will kill {0} workers with {1}'.format(n_to_kill, commandSpec.params)) - mainLog.debug('done handling commands' + sw_getcomm.get_elapsed_time()) - # killing stage + main_log = self.make_logger(_logger, 'id={0}'.format(self.lockedBy), method_name='run') + + # process commands that mark workers to be killed + try: + self.process_kill_commands() + except Exception: + core_utils.dump_error_message(main_log) + + # actual killing stage sw_kill = core_utils.get_stopwatch() - mainLog.debug('try to get workers to kill') + main_log.debug('try to get workers to kill') # get workers to kill - workersToKill = self.dbProxy.get_workers_to_kill(harvester_config.sweeper.maxWorkers, + workers_to_kill = self.dbProxy.get_workers_to_kill(harvester_config.sweeper.maxWorkers, harvester_config.sweeper.checkInterval) - mainLog.debug('got {0} queues to kill workers'.format(len(workersToKill))) + main_log.debug('got {0} queues to kill workers'.format(len(workers_to_kill))) # loop over all workers sw = core_utils.get_stopwatch() - for queueName, configIdWorkSpecList in iteritems(workersToKill): + for queue_name, configIdWorkSpecList in iteritems(workers_to_kill): for configID, workspec_list in iteritems(configIdWorkSpecList): # get sweeper - if not self.queueConfigMapper.has_queue(queueName, configID): - mainLog.error('queue config for {0}/{1} not found'.format(queueName, configID)) + if not self.queueConfigMapper.has_queue(queue_name, configID): + main_log.error('queue config for {0}/{1} not found'.format(queue_name, configID)) continue - queueConfig = self.queueConfigMapper.get_queue(queueName, configID) + queue_config = self.queueConfigMapper.get_queue(queue_name, configID) try: - sweeperCore = self.pluginFactory.get_plugin(queueConfig.sweeper) + sweeper_core = self.pluginFactory.get_plugin(queue_config.sweeper) except Exception: - mainLog.error('failed to launch sweeper plugin for {0}/{1}'.format(queueName, configID)) - core_utils.dump_error_message(mainLog) + main_log.error('failed to launch sweeper plugin for {0}/{1}'.format(queue_name, configID)) + core_utils.dump_error_message(main_log) continue sw.reset() n_workers = len(workspec_list) try: # try bulk method - tmpLog = self.make_logger(_logger, 'id={0}'.format(lockedBy), method_name='run') - tmpLog.debug('start killing') - tmpList = sweeperCore.kill_workers(workspec_list) + tmp_log = self.make_logger(_logger, 'id={0}'.format(self.lockedBy), method_name='run') + tmp_log.debug('start killing') + tmp_list = sweeper_core.kill_workers(workspec_list) except AttributeError: # fall back to single-worker method for workspec in workspec_list: - tmpLog = self.make_logger(_logger, 'workerID={0}'.format(workspec.workerID), - method_name='run') + tmp_log = self.make_logger(_logger, 'workerID={0}'.format(workspec.workerID), + method_name='run') try: - tmpLog.debug('start killing one worker') - tmpStat, tmpOut = sweeperCore.kill_worker(workspec) - tmpLog.debug('done killing with status={0} diag={1}'.format(tmpStat, tmpOut)) + tmp_log.debug('start killing one worker') + tmp_stat, tmp_out = sweeper_core.kill_worker(workspec) + tmp_log.debug('done killing with status={0} diag={1}'.format(tmp_stat, tmp_out)) except Exception: - core_utils.dump_error_message(tmpLog) + core_utils.dump_error_message(tmp_log) except Exception: - core_utils.dump_error_message(mainLog) + core_utils.dump_error_message(main_log) else: # bulk method n_killed = 0 - for workspec, (tmpStat, tmpOut) in zip(workspec_list, tmpList): - tmpLog.debug('done killing workerID={0} with status={1} diag={2}'.format( - workspec.workerID, tmpStat, tmpOut)) - if tmpStat: + for workspec, (tmp_stat, tmp_out) in zip(workspec_list, tmp_list): + tmp_log.debug('done killing workerID={0} with status={1} diag={2}'.format( + workspec.workerID, tmp_stat, tmp_out)) + if tmp_stat: n_killed += 1 - tmpLog.debug('killed {0}/{1} workers'.format(n_killed, n_workers)) - mainLog.debug('done killing {0} workers'.format(n_workers) + sw.get_elapsed_time()) - mainLog.debug('done all killing' + sw_kill.get_elapsed_time()) + tmp_log.debug('killed {0}/{1} workers'.format(n_killed, n_workers)) + main_log.debug('done killing {0} workers'.format(n_workers) + sw.get_elapsed_time()) + main_log.debug('done all killing' + sw_kill.get_elapsed_time()) + # cleanup stage sw_cleanup = core_utils.get_stopwatch() # timeout for missed try: - keepMissed = harvester_config.sweeper.keepMissed + keep_missed = harvester_config.sweeper.keepMissed except Exception: - keepMissed = 24 + keep_missed = 24 try: - keepPending = harvester_config.sweeper.keepPending + keep_pending = harvester_config.sweeper.keepPending except Exception: - keepPending = 24 + keep_pending = 24 # get workers for cleanup statusTimeoutMap = {'finished': harvester_config.sweeper.keepFinished, 'failed': harvester_config.sweeper.keepFailed, 'cancelled': harvester_config.sweeper.keepCancelled, - 'missed': keepMissed, - 'pending': keepPending + 'missed': keep_missed, + 'pending': keep_pending } workersForCleanup = self.dbProxy.get_workers_for_cleanup(harvester_config.sweeper.maxWorkers, statusTimeoutMap) - mainLog.debug('got {0} queues for workers cleanup'.format(len(workersForCleanup))) + main_log.debug('got {0} queues for workers cleanup'.format(len(workersForCleanup))) sw = core_utils.get_stopwatch() - for queueName, configIdWorkSpecList in iteritems(workersForCleanup): + for queue_name, configIdWorkSpecList in iteritems(workersForCleanup): for configID, workspec_list in iteritems(configIdWorkSpecList): # get sweeper - if not self.queueConfigMapper.has_queue(queueName, configID): - mainLog.error('queue config for {0}/{1} not found'.format(queueName, configID)) + if not self.queueConfigMapper.has_queue(queue_name, configID): + main_log.error('queue config for {0}/{1} not found'.format(queue_name, configID)) continue - queueConfig = self.queueConfigMapper.get_queue(queueName, configID) - sweeperCore = self.pluginFactory.get_plugin(queueConfig.sweeper) - messenger = self.pluginFactory.get_plugin(queueConfig.messenger) + queue_config = self.queueConfigMapper.get_queue(queue_name, configID) + sweeper_core = self.pluginFactory.get_plugin(queue_config.sweeper) + messenger = self.pluginFactory.get_plugin(queue_config.messenger) sw.reset() n_workers = len(workspec_list) # make sure workers to clean up are all terminated - mainLog.debug('making sure workers to clean up are all terminated') + main_log.debug('making sure workers to clean up are all terminated') try: # try bulk method - tmpList = sweeperCore.kill_workers(workspec_list) + tmp_list = sweeper_core.kill_workers(workspec_list) except AttributeError: # fall back to single-worker method for workspec in workspec_list: - tmpLog = self.make_logger(_logger, 'workerID={0}'.format(workspec.workerID), - method_name='run') + tmp_log = self.make_logger(_logger, 'workerID={0}'.format(workspec.workerID), + method_name='run') try: - tmpStat, tmpOut = sweeperCore.kill_worker(workspec) + tmp_stat, tmp_out = sweeper_core.kill_worker(workspec) except Exception: - core_utils.dump_error_message(tmpLog) + core_utils.dump_error_message(tmp_log) except Exception: - core_utils.dump_error_message(mainLog) - mainLog.debug('made sure workers to clean up are all terminated') + core_utils.dump_error_message(main_log) + main_log.debug('made sure workers to clean up are all terminated') # start cleanup for workspec in workspec_list: - tmpLog = self.make_logger(_logger, 'workerID={0}'.format(workspec.workerID), - method_name='run') + tmp_log = self.make_logger(_logger, 'workerID={0}'.format(workspec.workerID), + method_name='run') try: - tmpLog.debug('start cleaning up one worker') + tmp_log.debug('start cleaning up one worker') # sweep worker - tmpStat, tmpOut = sweeperCore.sweep_worker(workspec) - tmpLog.debug('swept_worker with status={0} diag={1}'.format(tmpStat, tmpOut)) - tmpLog.debug('start messenger cleanup') - mc_tmpStat, mc_tmpOut = messenger.clean_up(workspec) - tmpLog.debug('messenger cleaned up with status={0} diag={1}'.format(mc_tmpStat, mc_tmpOut)) - if tmpStat: + tmp_stat, tmp_out = sweeper_core.sweep_worker(workspec) + tmp_log.debug('swept_worker with status={0} diag={1}'.format(tmp_stat, tmp_out)) + tmp_log.debug('start messenger cleanup') + mc_tmp_stat, mc_tmp_out = messenger.clean_up(workspec) + tmp_log.debug('messenger cleaned up with status={0} diag={1}'.format(mc_tmp_stat, mc_tmp_out)) + if tmp_stat: self.dbProxy.delete_worker(workspec.workerID) except Exception: - core_utils.dump_error_message(tmpLog) - mainLog.debug('done cleaning up {0} workers'.format(n_workers) + sw.get_elapsed_time()) - mainLog.debug('done all cleanup' + sw_cleanup.get_elapsed_time()) + core_utils.dump_error_message(tmp_log) + main_log.debug('done cleaning up {0} workers'.format(n_workers) + sw.get_elapsed_time()) + main_log.debug('done all cleanup' + sw_cleanup.get_elapsed_time()) + # old-job-deletion stage sw_delete = core_utils.get_stopwatch() - mainLog.debug('delete old jobs') + main_log.debug('delete old jobs') jobTimeout = max(statusTimeoutMap.values()) + 1 self.dbProxy.delete_old_jobs(jobTimeout) # delete orphaned job info self.dbProxy.delete_orphaned_job_info() - mainLog.debug('done deletion of old jobs' + sw_delete.get_elapsed_time()) + main_log.debug('done deletion of old jobs' + sw_delete.get_elapsed_time()) # disk cleanup if hasattr(harvester_config.sweeper, 'diskCleanUpInterval') and \ hasattr(harvester_config.sweeper, 'diskHighWatermark'): @@ -182,7 +208,7 @@ def run(self): for item in harvester_config.sweeper.diskHighWatermark.split(','): # dir name and watermark in GB dir_name, watermark = item.split('|') - mainLog.debug('checking {0} for cleanup with watermark {1} GB'.format(dir_name, watermark)) + main_log.debug('checking {0} for cleanup with watermark {1} GB'.format(dir_name, watermark)) watermark = int(watermark) * 10**9 total_size = 0 file_dict = {} @@ -197,11 +223,11 @@ def run(self): file_dict[mtime].add((base_name, full_name, f_size)) # delete if necessary if total_size < watermark: - mainLog.debug( + main_log.debug( 'skip cleanup {0} due to total_size {1} GB < watermark {2} GB'.format( dir_name, total_size//(10**9), watermark//(10**9))) else: - mainLog.debug( + main_log.debug( 'cleanup {0} due to total_size {1} GB >= watermark {2} GB'.format( dir_name, total_size//(10**9), watermark//(10**9))) # get active input files @@ -217,17 +243,17 @@ def run(self): try: os.remove(full_name) except Exception: - core_utils.dump_error_message(mainLog) + core_utils.dump_error_message(main_log) deleted_size += f_size if total_size - deleted_size < watermark: break if total_size - deleted_size < watermark: break except Exception: - core_utils.dump_error_message(mainLog) + core_utils.dump_error_message(main_log) # time the cycle - mainLog.debug('done a sweeper cycle' + sw_main.get_elapsed_time()) + main_log.debug('done a sweeper cycle' + sw_main.get_elapsed_time()) # check if being terminated if self.terminated(harvester_config.sweeper.sleepTime): - mainLog.debug('terminated') + main_log.debug('terminated') return diff --git a/pandaharvester/harvestercloud/aws_unhealthy_nodes.py b/pandaharvester/harvestercloud/aws_unhealthy_nodes.py index d6f5aecb..d03e9d1e 100644 --- a/pandaharvester/harvestercloud/aws_unhealthy_nodes.py +++ b/pandaharvester/harvestercloud/aws_unhealthy_nodes.py @@ -31,7 +31,6 @@ print('------------------------------------') print(command_with_id) print('return code: {0}'.format(p.returncode)) - print('return code: {0}'.format(p.returncode)) print('output: {0}'.format(output)) print('err: {0}'.format(err)) print('------------------------------------') \ No newline at end of file diff --git a/pandaharvester/harvestercloud/gke_unhealthy_nodes.py b/pandaharvester/harvestercloud/gke_unhealthy_nodes.py new file mode 100644 index 00000000..efe091be --- /dev/null +++ b/pandaharvester/harvestercloud/gke_unhealthy_nodes.py @@ -0,0 +1,42 @@ +from kubernetes import client, config +import datetime +from subprocess import Popen, PIPE +config.load_kube_config(config_file='PATH TO YOUR CONFIG') +namespace = 'default' + +nodes = [] +current_time = datetime.datetime.now().astimezone() + +corev1 = client.CoreV1Api() +aux = corev1.list_namespaced_pod(namespace=namespace, field_selector='status.phase=Pending') +for item in aux.items: + try: + if item.status.container_statuses[0].state.waiting.reason == 'ContainerCreating' and current_time - item.metadata.creation_timestamp > datetime.timedelta(minutes=30): + if item.spec.node_name not in nodes: + nodes.append(item.spec.node_name) + except Exception: + continue + +# delete the node +command_desc = '/bin/gcloud compute instances describe --format=value[](metadata.items.created-by) {0} --zone={1}' +command_del = "/bin/gcloud compute instance-groups managed delete-instances --instances={0} {1} --zone={2}" + +zones = ['europe-west1-b', 'europe-west1-c', 'europe-west1-d'] + +for node in nodes: + for zone in zones: + command_with_node = command_desc.format(node, zone) + command_list = command_with_node.split(' ') + p = Popen(command_list, stdin=PIPE, stdout=PIPE, stderr=PIPE) + output, err = p.communicate() + if output: + output_str = output[:-1].decode() + command_del_with_vars = command_del.format(node, output_str, zone) + command_del_list = command_del_with_vars.split(' ') + p = Popen(command_del_list, stdin=PIPE, stdout=PIPE, stderr=PIPE) + output, err = p.communicate() + print(command_del_with_vars) + print(output) + print(err) + print("--------------------") + diff --git a/pandaharvester/harvestercloud/pilots_starter.py b/pandaharvester/harvestercloud/pilots_starter.py index 87ef8226..313a95b8 100644 --- a/pandaharvester/harvestercloud/pilots_starter.py +++ b/pandaharvester/harvestercloud/pilots_starter.py @@ -147,6 +147,9 @@ def get_configuration(): python_option = os.environ.get('pythonOption', '') logging.debug('[main] got pythonOption: {0}'.format(python_option)) + pilot_version = os.environ.get('pilotVersion', '') + logging.debug('[main] got pilotVersion: {0}'.format(pilot_version)) + # get the Harvester ID harvester_id = os.environ.get('HARVESTER_ID') logging.debug('[main] got Harvester ID: {0}'.format(harvester_id)) @@ -182,16 +185,16 @@ def get_configuration(): WORK_DIR = tmpdir return proxy_path, panda_site, panda_queue, resource_type, prodSourceLabel, job_type, pilot_type, \ - pilot_url_option, python_option, harvester_id, worker_id, logs_frontend_w, logs_frontend_r, stdout_name, \ - submit_mode + pilot_url_option, python_option, pilot_version, harvester_id, worker_id, logs_frontend_w, \ + logs_frontend_r, stdout_name, submit_mode if __name__ == "__main__": # get all the configuration from environment proxy_path, panda_site, panda_queue, resource_type, prodSourceLabel, job_type, pilot_type, pilot_url_opt, \ - python_option, harvester_id, worker_id, logs_frontend_w, logs_frontend_r, destination_name, submit_mode \ - = get_configuration() + python_option, pilot_version, harvester_id, worker_id, logs_frontend_w, logs_frontend_r, \ + destination_name, submit_mode = get_configuration() # the pilot should propagate the download link via the pilotId field in the job table log_download_url = '{0}/{1}'.format(logs_frontend_r, destination_name) @@ -216,11 +219,15 @@ def get_configuration(): if pilot_type: pilot_type_option = '-i {0}'.format(pilot_type) - wrapper_params = '-a {0} -s {1} -r {2} -q {3} {4} {5} {6} {7} {8} {9}'.format(WORK_DIR, panda_site, panda_queue, - panda_queue, resource_type_option, - psl_option, pilot_type_option, - job_type_option, pilot_url_opt, - python_option) + pilot_version_option = '--pilotversion 2' + if pilot_version: + pilot_version_option = '--pilotversion {0}'.format(pilot_version) + + wrapper_params = '-a {0} -s {1} -r {2} -q {3} {4} {5} {6} {7} {8} {9} {10}'.format(WORK_DIR, panda_site, panda_queue, + panda_queue, resource_type_option, + psl_option, pilot_type_option, + job_type_option, pilot_url_opt, + python_option, pilot_version_option) if submit_mode == 'PUSH': # job configuration files need to be copied, because k8s configmap mounts as read-only file system diff --git a/pandaharvester/harvestercommunicator/panda_communicator.py b/pandaharvester/harvestercommunicator/panda_communicator.py index b61d5bb5..9a8d0541 100644 --- a/pandaharvester/harvestercommunicator/panda_communicator.py +++ b/pandaharvester/harvestercommunicator/panda_communicator.py @@ -44,6 +44,27 @@ def __init__(self): self.useInspect = True else: self.verbose = False + if hasattr(harvester_config.pandacon, 'auth_type'): + self.auth_type = harvester_config.pandacon.auth_type + else: + self.auth_type = 'x509' + self.auth_token = None + self.auth_token_last_update = None + + # renew token + def renew_token(self): + if hasattr(harvester_config.pandacon, 'auth_token'): + if harvester_config.pandacon.auth_token.startswith('file:'): + if self.auth_token_last_update is not None and \ + datetime.datetime.utcnow() - self.auth_token_last_update < datetime.timedelta(minutes=60): + return + with open(harvester_config.pandacon.auth_token.split(':')[-1]) as f: + self.auth_token = f.read() + self.auth_token_last_update = datetime.datetime.utcnow() + else: + if self.auth_token_last_update is None: + self.auth_token = harvester_config.pandacon.auth_token + self.auth_token_last_update = datetime.datetime.utcnow() # POST with http def post(self, path, data): @@ -92,15 +113,22 @@ def post_ssl(self, path, data, cert=None, base_url=None): url = '{0}/{1}'.format(base_url, path) if self.verbose: tmpLog.debug('exec={0} URL={1} data={2}'.format(tmpExec, url, str(data))) - if cert is None: - cert = (harvester_config.pandacon.cert_file, - harvester_config.pandacon.key_file) + headers = {"Accept": "application/json", + "Connection": "close"} + if self.auth_type == 'oidc': + self.renew_token() + cert = None + headers['Authorization'] = 'Bearer {0}'.format(self.auth_token) + headers['Origin'] = harvester_config.pandacon.auth_origin + else: + if cert is None: + cert = (harvester_config.pandacon.cert_file, + harvester_config.pandacon.key_file) session = get_http_adapter_with_random_dns_resolution() sw = core_utils.get_stopwatch() res = session.post(url, data=data, - headers={"Accept": "application/json", - "Connection": "close"}, + headers=headers, timeout=harvester_config.pandacon.timeout, verify=harvester_config.pandacon.ca_cert, cert=cert) @@ -134,12 +162,21 @@ def put_ssl(self, path, files, cert=None, base_url=None): url = '{0}/{1}'.format(base_url, path) if self.verbose: tmpLog.debug('exec={0} URL={1} files={2}'.format(tmpExec, url, files['file'][0])) - if cert is None: - cert = (harvester_config.pandacon.cert_file, - harvester_config.pandacon.key_file) + if self.auth_type == 'oidc': + self.renew_token() + cert = None + headers = dict() + headers['Authorization'] = 'Bearer {0}'.format(self.auth_token) + headers['Origin'] = harvester_config.pandacon.auth_origin + else: + headers = None + if cert is None: + cert = (harvester_config.pandacon.cert_file, + harvester_config.pandacon.key_file) session = get_http_adapter_with_random_dns_resolution() res = session.post(url, files=files, + headers=headers, timeout=harvester_config.pandacon.timeout, verify=harvester_config.pandacon.ca_cert, cert=cert) diff --git a/pandaharvester/harvesterconfig/harvester_config.py b/pandaharvester/harvesterconfig/harvester_config.py index f27ad5ed..5dc3daf8 100644 --- a/pandaharvester/harvesterconfig/harvester_config.py +++ b/pandaharvester/harvesterconfig/harvester_config.py @@ -1,6 +1,7 @@ import re import os import sys +import six import json from future.utils import iteritems @@ -26,10 +27,22 @@ def __init__(self): pass +# load configmap +config_map_data = {} +if 'PANDA_HOME' in os.environ: + config_map_name = 'panda_harvester_configmap.json' + config_map_path = os.path.join(os.environ['PANDA_HOME'], 'etc/configmap', config_map_name) + if os.path.exists(config_map_path): + with open(config_map_path) as f: + config_map_data = json.load(f) + # loop over all sections for tmpSection in tmpConf.sections(): # read section tmpDict = getattr(tmpConf, tmpSection) + # load configmap + if tmpSection in config_map_data: + tmpDict.update(config_map_data[tmpSection]) # make section class tmpSelf = _SectionClass() # update module dict @@ -37,14 +50,16 @@ def __init__(self): # expand all values for tmpKey, tmpVal in iteritems(tmpDict): # use env vars - if tmpVal.startswith('$'): + if isinstance(tmpVal, str) and tmpVal.startswith('$'): tmpMatch = re.search('\$\{*([^\}]+)\}*', tmpVal) envName = tmpMatch.group(1) if envName not in os.environ: raise KeyError('{0} in the cfg is an undefined environment variable.'.format(envName)) tmpVal = os.environ[envName] # convert string to bool/int - if tmpVal == 'True': + if not isinstance(tmpVal, six.string_types): + pass + elif tmpVal == 'True': tmpVal = True elif tmpVal == 'False': tmpVal = False diff --git a/pandaharvester/harvestercore/command_spec.py b/pandaharvester/harvestercore/command_spec.py index ac18f30a..fe6253d0 100644 --- a/pandaharvester/harvestercore/command_spec.py +++ b/pandaharvester/harvestercore/command_spec.py @@ -18,11 +18,14 @@ class CommandSpec(SpecBase): COM_reportWorkerStats = 'REPORT_WORKER_STATS' COM_setNWorkers = 'SET_N_WORKERS_JOBTYPE' COM_killWorkers = 'KILL_WORKERS' + COM_syncWorkersKill = 'SYNC_WORKERS_KILL' + # mapping between command and receiver receiver_map = { COM_reportWorkerStats: 'propagator', COM_setNWorkers: 'submitter', COM_killWorkers: 'sweeper', + COM_syncWorkersKill: 'sweeper' } # constructor diff --git a/pandaharvester/harvestercore/db_proxy.py b/pandaharvester/harvestercore/db_proxy.py index b038e7ab..1f6f0439 100644 --- a/pandaharvester/harvestercore/db_proxy.py +++ b/pandaharvester/harvestercore/db_proxy.py @@ -61,6 +61,7 @@ def __init__(self, thr_name=None, read_only=False): self.verbLog = None self.useInspect = False self.reconnectTimeout = 300 + self.read_only = read_only if hasattr(harvester_config.db, 'reconnectTimeout'): self.reconnectTimeout = harvester_config.db.reconnectTimeout if harvester_config.db.verbose: @@ -71,6 +72,17 @@ def __init__(self, thr_name=None, read_only=False): self.thrName = currentThr.ident if hasattr(harvester_config.db, 'useInspect') and harvester_config.db.useInspect is True: self.useInspect = True + # connect DB + self._connect_db() + self.lockDB = False + # using application side lock if DB doesn't have a mechanism for exclusive access + if harvester_config.db.engine == 'mariadb': + self.usingAppLock = False + else: + self.usingAppLock = True + + # connect DB + def _connect_db(self): if harvester_config.db.engine == 'mariadb': if hasattr(harvester_config.db, 'host'): host = harvester_config.db.host @@ -116,7 +128,7 @@ def fetchall(self): self.cur = self.con.cursor(named_tuple=True, buffered=True) else: import sqlite3 - if read_only: + if self.read_only: fd = os.open(harvester_config.db.database_filename, os.O_RDONLY) database_filename = '/dev/fd/{0}'.format(fd) else: @@ -134,12 +146,6 @@ def fetchall(self): self.cur.execute('PRAGMA journal_mode = WAL') # read to avoid database lock self.cur.fetchone() - self.lockDB = False - # using application side lock if DB doesn't have a mechanism for exclusive access - if harvester_config.db.engine == 'mariadb': - self.usingAppLock = False - else: - self.usingAppLock = True # exception handler for type of DBs def _handle_exception(self, exc): @@ -160,8 +166,19 @@ def _handle_exception(self, exc): try_timestamp = time.time() n_retry = 1 while time.time() - try_timestamp < self.reconnectTimeout: + # close DB cursor + try: + self.cur.close() + except Exception as e: + tmpLog.error('failed to close cursor: {0}'.format(e)) + # close DB connection try: - self.__init__() + self.con.close() + except Exception as e: + tmpLog.error('failed to close connection: {0}'.format(e)) + # restart the proxy instance + try: + self._connect_db() tmpLog.info('renewed connection') break except Exception as e: @@ -3499,11 +3516,11 @@ def get_worker_stats_bulk(self, active_ups_queues): return {} # send kill command to workers associated to a job - def kill_workers_with_job(self, panda_id): + def mark_workers_to_kill_by_pandaid(self, panda_id): try: # get logger tmpLog = core_utils.make_logger(_logger, 'PandaID={0}'.format(panda_id), - method_name='kill_workers_with_job') + method_name='mark_workers_to_kill_by_pandaid') tmpLog.debug('start') # sql to set killTime sqlL = "UPDATE {0} SET killTime=:setTime ".format(workTableName) @@ -3541,26 +3558,27 @@ def kill_workers_with_job(self, panda_id): # return return None - # send kill command to a worker - def kill_worker(self, worker_id): + # send kill command to workers + def mark_workers_to_kill_by_workerids(self, worker_ids): try: # get logger - tmpLog = core_utils.make_logger(_logger, 'workerID={0}'.format(worker_id), - method_name='kill_worker') + tmpLog = core_utils.make_logger(_logger, method_name='mark_workers_to_kill_by_workerids') tmpLog.debug('start') # sql to set killTime sqlL = "UPDATE {0} SET killTime=:setTime ".format(workTableName) sqlL += "WHERE workerID=:workerID AND killTime IS NULL AND NOT status IN (:st1,:st2,:st3) " # set an older time to trigger sweeper setTime = datetime.datetime.utcnow() - datetime.timedelta(hours=6) - # set killTime - varMap = dict() - varMap[':workerID'] = worker_id - varMap[':setTime'] = setTime - varMap[':st1'] = WorkSpec.ST_finished - varMap[':st2'] = WorkSpec.ST_failed - varMap[':st3'] = WorkSpec.ST_cancelled - self.execute(sqlL, varMap) + varMaps = [] + for worker_id in worker_ids: + varMap = dict() + varMap[':workerID'] = worker_id + varMap[':setTime'] = setTime + varMap[':st1'] = WorkSpec.ST_finished + varMap[':st2'] = WorkSpec.ST_failed + varMap[':st3'] = WorkSpec.ST_cancelled + varMaps.append(varMap) + self.executemany(sqlL, varMaps) nRow = self.cur.rowcount # commit self.commit() @@ -5498,21 +5516,23 @@ def get_workers_from_ids(self, ids): return {} # send kill command to workers by query - def kill_workers_by_query(self, params): + def mark_workers_to_kill_by_query(self, params): try: # get logger - tmpLog = core_utils.make_logger(_logger, method_name='kill_workers_by_query') + tmpLog = core_utils.make_logger(_logger, method_name='mark_workers_to_kill_by_query') tmpLog.debug('start') + # sql to set killTime sqlL = "UPDATE {0} SET killTime=:setTime ".format(workTableName) sqlL += "WHERE workerID=:workerID AND killTime IS NULL AND NOT status IN (:st1,:st2,:st3) " + # sql to get workers constraints_query_string_list = [] tmp_varMap = {} constraint_map = {'status': params.get('status', [WorkSpec.ST_submitted]), - 'computingSite': params.get('computingSite', []), - 'computingElement': params.get('computingElement', []), - 'submissionHost': params.get('submissionHost', [])} + 'computingSite': params.get('computingSite', []), + 'computingElement': params.get('computingElement', []), + 'submissionHost': params.get('submissionHost', [])} tmpLog.debug('query {0}'.format(constraint_map)) for attribute, match_list in iteritems(constraint_map): if match_list == 'ALL': @@ -5521,13 +5541,14 @@ def kill_workers_by_query(self, params): tmpLog.debug('{0} constraint is not specified in the query. Skipped'.format(attribute)) return 0 else: - one_param_list = [ ':param_{0}_{1}'.format(attribute, v_i) for v_i in range(len(match_list)) ] + one_param_list = [':param_{0}_{1}'.format(attribute, v_i) for v_i in range(len(match_list))] tmp_varMap.update(zip(one_param_list, match_list)) params_string = '(' + ','.join(one_param_list) + ')' constraints_query_string_list.append('{0} IN {1}'.format(attribute, params_string)) - constranits_query_string = ' AND '.join(constraints_query_string_list) + constraints_query_string = ' AND '.join(constraints_query_string_list) sqlW = "SELECT workerID FROM {0} ".format(workTableName) - sqlW += "WHERE {0} ".format(constranits_query_string) + sqlW += "WHERE {0} ".format(constraints_query_string) + # set an older time to trigger sweeper setTime = datetime.datetime.utcnow() - datetime.timedelta(hours=6) # get workers @@ -5546,6 +5567,7 @@ def kill_workers_by_query(self, params): varMap[':st3'] = WorkSpec.ST_cancelled self.execute(sqlL, varMap) nRow += self.cur.rowcount + # commit self.commit() tmpLog.debug('set killTime to {0} workers'.format(nRow)) diff --git a/pandaharvester/harvestercredmanager/iam_token_cred_manager.py b/pandaharvester/harvestercredmanager/iam_token_cred_manager.py new file mode 100644 index 00000000..1102bb90 --- /dev/null +++ b/pandaharvester/harvestercredmanager/iam_token_cred_manager.py @@ -0,0 +1,162 @@ +import os +import json +import re +import traceback + +from .base_cred_manager import BaseCredManager +from pandaharvester.harvestercore import core_utils +from pandaharvester.harvestermisc.info_utils import PandaQueuesDict +from pandaharvester.harvestermisc.token_utils import endpoint_to_filename, WLCG_scopes, IssuerBroker + +# logger +_logger = core_utils.setup_logger('iam_token_cred_manager') + +# allowed target types +ALL_TARGET_TYPES = ['ce'] + +# default port for CEs +default_port_map = { + 'htcondor-ce': 9619, + } + +# credential manager with IAM token +class IamTokenCredManager(BaseCredManager): + # constructor + def __init__(self, **kwarg): + BaseCredManager.__init__(self, **kwarg) + # make logger + tmp_log = self.make_logger(_logger, method_name='__init__') + # attributes + if hasattr(self, 'inFile'): + # parse inFile setup configuration + try: + with open(self.inFile) as f: + self.setupMap = json.load(f) + except Exception as e: + tmp_log.error('Error with inFile. {0}: {1}'.format(e.__class__.__name__, e)) + self.setupMap = {} + raise + else: + # set up with direct attributes + self.setupMap = dict(vars(self)) + # validate setupMap + try: + self.client_cred_file = self.setupMap['client_cred_file'] + with open(self.client_cred_file) as f: + client_cred_dict = json.load(f) + self.issuer = client_cred_dict['issuer'] + self.client_id = client_cred_dict['client_id'] + self.client_secret = client_cred_dict['client_secret'] + self.target_type = self.setupMap['target_type'] + self.out_dir = self.setupMap['out_dir'] + self.lifetime = self.setupMap.get('lifetime') + self.target_list_file = self.setupMap.get('target_list_file') + except KeyError as e: + tmp_log.error('Missing attributes in setup. {0}'.format(traceback.format_exc())) + raise + else: + if self.target_type not in ALL_TARGET_TYPES: + tmp_log.error('Unsupported target_type: {0}'.format(self.target_type)) + raise Exception('Unsupported target_type') + # initialize + self.targets_dict = dict() + # handle targets + self._handle_target_types() + # issuer broker + self.issuer_broker = IssuerBroker(self.issuer, self.client_id, self.client_secret, + name=self.setup_name) + + def _handle_target_types(self): + # make logger + tmp_log = self.make_logger(_logger, method_name='_handle_target_types') + try: + self.panda_queues_dict = PandaQueuesDict() + except Exception as e: + tmp_log.error('Problem calling PandaQueuesDict. {0}'.format(traceback.format_exc())) + raise + if self.target_type == 'ce': + try: + # retrieve CEs from CRIC + for site, val in self.panda_queues_dict.items(): + if val.get('status') == 'offline': + # do not generate token for offline PQs, but for online, brokeroff, pause, ... + continue + ce_q_list = val.get('queues') + if ce_q_list: + # loop over all ce queues + for ce_q in ce_q_list: + # ce_status = ce_q.get('ce_status') + # if not ce_status or ce_status == 'DISABLED': + # # skip disabled ce queues + # continue + ce_endpoint = ce_q.get('ce_endpoint') + ce_hostname = re.sub(':\w*', '', ce_endpoint) + ce_flavour = ce_q.get('ce_flavour') + ce_flavour_str = str(ce_flavour).lower() + ce_endpoint_modified = ce_endpoint + if ce_endpoint == ce_hostname: + # no port, add default port + if ce_flavour_str in default_port_map: + default_port = default_port_map[ce_flavour_str] + ce_endpoint_modified = '{0}:{1}'.format(ce_hostname, default_port) + if ce_endpoint_modified and ce_flavour: + target_attr_dict = { + 'ce_flavour': ce_flavour, + } + self.targets_dict[ce_endpoint_modified] = target_attr_dict + else: + # do not generate token if no queues of CE + continue + except Exception as e: + tmp_log.error('Problem retrieving CEs from CRIC. {0}'.format(traceback.format_exc())) + raise + # retrieve CEs from local file + if self.target_list_file: + try: + with open(self.target_list_file) as f: + for target_str in f.readlines(): + if target_str: + target = target_str.rstrip() + target_attr_dict = { + 'ce_flavour': None, + } + self.targets_dict[target] = target_attr_dict + except Exception as e: + tmp_log.error('Problem retrieving CEs from local file. {0}'.format(traceback.format_exc())) + raise + # scope for CE + self.scope = WLCG_scopes.COMPUTE_ALL + + # check proxy + def check_credential(self): + # make logger + # same update period as credmanager agent + return False + + # renew proxy + def renew_credential(self): + # make logger + tmp_log = self.make_logger(_logger, 'config={0}'.format(self.setup_name), method_name='renew_credential') + # go + all_ok = True + all_err_str = '' + for target in self.targets_dict: + try: + # get access token of target + access_token = self.issuer_broker.get_access_token(aud=target, scope=self.scope) + # write to file + token_filename = endpoint_to_filename(target) + token_path = os.path.join(self.out_dir, token_filename) + with open(token_path, 'w') as f: + f.write(access_token) + except Exception as e: + err_str = 'Problem getting token for {0}. {1}'.format(target, traceback.format_exc()) + tmp_log.error(err_str) + all_ok = False + all_err_str = 'failed to get some tokens. Check the plugin log for details ' + continue + else: + tmp_log.debug('got token for {0} at {1}'.format(target, token_path)) + tmp_log.debug('done') + # return + return all_ok, all_err_str diff --git a/pandaharvester/harvesterfifo/mysql_fifo.py b/pandaharvester/harvesterfifo/mysql_fifo.py index f763059e..dcba48ad 100644 --- a/pandaharvester/harvesterfifo/mysql_fifo.py +++ b/pandaharvester/harvesterfifo/mysql_fifo.py @@ -19,6 +19,19 @@ def __init__(self, **kwarg): self.reconnectTimeout = harvester_config.db.reconnectTimeout PluginBase.__init__(self, **kwarg) self.tableName = '{title}_FIFO'.format(title=self.titleName) + # get connection, cursor and error types + self._connect_db() + # create table for fifo + try: + self._make_table() + # self._make_index() + self.commit() + except Exception as _e: + self.rollback() + raise _e + + # get connection, cursor and error types + def _connect_db(self): # DB access attribues if hasattr(self, 'db_host'): db_host = self.db_host @@ -46,7 +59,6 @@ def __init__(self, **kwarg): db_schema = self.db_schema else: db_schema = harvester_config.fifo.db_schema - # get connection, cursor and error types try: import MySQLdb import MySQLdb.cursors @@ -61,28 +73,10 @@ def __init__(self, **kwarg): self.cur = self.con.cursor(buffered=True) self.OperationalError = mysql.connector.errors.OperationalError else: - class MyCursor (MySQLdb.cursors.Cursor): - def fetchone(self): - tmpRet = MySQLdb.cursors.Cursor.fetchone(self) - if tmpRet is None: - return None - return tmpRet - def fetchall(self): - tmpRets = MySQLdb.cursors.Cursor.fetchall(self) - return tmpRets self.con = MySQLdb.connect(user=db_user, passwd=db_password, - db=db_schema, host=db_host, port=db_port, - cursorclass=MyCursor) + db=db_schema, host=db_host, port=db_port) self.cur = self.con.cursor() self.OperationalError = MySQLdb.OperationalError - # create table for fifo - try: - self._make_table() - # self._make_index() - self.commit() - except Exception as _e: - self.rollback() - raise _e # decorator exception handler for type of DBs def _handle_exception(method): @@ -100,8 +94,19 @@ def _wrapped_method(self, *args, **kwargs): try_timestamp = time.time() n_retry = 1 while time.time() - try_timestamp < self.reconnectTimeout: + # close DB cursor + try: + self.cur.close() + except Exception as e: + pass + # close DB connection + try: + self.con.close() + except Exception as e: + pass + # restart the proxy instance try: - self.__init__() + self._connect_db() return except Exception as _e: exc = _e @@ -278,13 +283,17 @@ def _peek(self, mode='first', id=None, skip_item=False): 'idtemp': sql_peek_by_id_temp, } sql_peek = mode_sql_map[mode] - if mode in ('id', 'idtemp'): - params = (id,) - self.execute(sql_peek, params) - else: - self.execute(sql_peek) - res = self.cur.fetchall() - self.commit() + try: + if mode in ('id', 'idtemp'): + params = (id,) + self.execute(sql_peek, params) + else: + self.execute(sql_peek) + res = self.cur.fetchall() + self.commit() + except Exception as _e: + self.rollback() + raise _e if len(res) > 0: if skip_item: id, score = res[0] @@ -326,7 +335,11 @@ def _update(self, id, item=None, score=None, temporary=None, cond_score=None): params.append(id) if cond_score_str: params.append(score) - self.execute(sql_update, params) + try: + self.execute(sql_update, params) + except Exception as _e: + self.rollback() + raise _e n_row = self.cur.rowcount if n_row == 1: return True @@ -338,8 +351,12 @@ def size(self): sql_size = ( 'SELECT COUNT(id) FROM {table_name}' ).format(table_name=self.tableName) - self.execute(sql_size) - res = self.cur.fetchall() + try: + self.execute(sql_size) + res = self.cur.fetchall() + except Exception as _e: + self.rollback() + raise _e if len(res) > 0: return res[0][0] return None @@ -463,9 +480,13 @@ def peekmany(self, mode='first', minscore=None, maxscore=None, count=None, skip_ ).format(columns=columns_str, table_name=self.tableName, minscore_str=minscore_str, maxscore_str=maxscore_str, rank=mode_rank_map[mode], count_str=count_str) - self.execute(sql_peek_many) - res = self.cur.fetchall() - self.commit() + try: + self.execute(sql_peek_many) + res = self.cur.fetchall() + self.commit() + except Exception as _e: + self.rollback() + raise _e ret_list = [] for _rec in res: if skip_item: @@ -485,7 +506,11 @@ def clear(self): 'DROP TABLE IF EXISTS {table_name} ' ).format(table_name=self.tableName) # self.execute(sql_clear_index) - self.execute(sql_clear_table) + try: + self.execute(sql_clear_table) + except Exception as _e: + self.rollback() + raise _e self.__init__() # delete objects by list of id @@ -495,9 +520,13 @@ def delete(self, ids): placeholders_str = ','.join([' %s'] * len(ids)) sql_delete = sql_delete_template.format( table_name=self.tableName, placeholders=placeholders_str) - self.execute(sql_delete, ids) - n_row = self.cur.rowcount - self.commit() + try: + self.execute(sql_delete, ids) + n_row = self.cur.rowcount + self.commit() + except Exception as _e: + self.rollback() + raise _e return n_row else: raise TypeError('ids should be list or tuple') diff --git a/pandaharvester/harvestermisc/k8s_utils.py b/pandaharvester/harvestermisc/k8s_utils.py index 744d031e..9e6b92eb 100644 --- a/pandaharvester/harvestermisc/k8s_utils.py +++ b/pandaharvester/harvestermisc/k8s_utils.py @@ -41,7 +41,7 @@ def read_yaml_file(self, yaml_file): return yaml_content def create_job_from_yaml(self, yaml_content, work_spec, prod_source_label, pilot_type, pilot_url_str, - pilot_python_option, container_image, executable, args, + pilot_python_option, pilot_version, container_image, executable, args, cert, max_time=None): tmp_log = core_utils.make_logger(base_logger, 'queue_name={0}'.format(self.queue_name), @@ -165,6 +165,7 @@ def create_job_from_yaml(self, yaml_content, work_spec, prod_source_label, pilot {'name': 'pilotType', 'value': pilot_type}, {'name': 'pilotUrlOpt', 'value': pilot_url_str}, {'name': 'pythonOption', 'value': pilot_python_option}, + {'name': 'pilotVersion', 'value': pilot_version}, {'name': 'jobType', 'value': work_spec.jobType}, {'name': 'proxySecretPath', 'value': cert}, {'name': 'workerID', 'value': str(work_spec.workerID)}, diff --git a/pandaharvester/harvestermisc/token_utils.py b/pandaharvester/harvestermisc/token_utils.py new file mode 100644 index 00000000..7ac11ff3 --- /dev/null +++ b/pandaharvester/harvestermisc/token_utils.py @@ -0,0 +1,68 @@ +import copy +import hashlib +import json +import time + +import six +import requests + + +def _md5sum(data): + """ + get md5sum hexadecimal string of data + """ + hash = hashlib.md5() + hash.update(six.b(data)) + hash_hex = hash.hexdigest() + return hash_hex + +def endpoint_to_filename(endpoint): + """ + get token file name according to service (CE, storage, etc.) endpoint + currently directly take its md5sum hash as file name + """ + filename = _md5sum(endpoint) + return filename + + +class WLCG_scopes(object): + COMPUTE_ALL = 'compute.read compute.modify compute.create compute.cancel' + + +class IssuerBroker(object): + """ + Talk to token issuer with client credentials flow + """ + + def __init__(self, issuer, client_id, client_secret, name='unknown'): + self.issuer = issuer + self.client_id = client_id + self.client_secret = client_secret + self.name = name + self.timeout = 3 + # derived attributes + self.token_request_url = '{0}/token'.format(self.issuer) + self._base_post_data = { + 'grant_type': 'client_credentials', + 'client_id': self.client_id, + 'client_secret': self.client_secret, + } + + def _post(self, **kwarg): + data_dict = copy.deepcopy(self._base_post_data) + data_dict.update(kwarg) + data = copy.deepcopy(data_dict) + resp = requests.post(self.token_request_url, data=data, timeout=self.timeout) + return resp + + def get_access_token(self, aud=None, scope=None): + resp = self._post(audience=aud, scope=scope) + if resp.status_code == requests.codes.ok: + try: + resp_dict = json.loads(resp.text) + except Exception as e: + raise + token = resp_dict['access_token'] + return token + else: + resp.raise_for_status() diff --git a/pandaharvester/harvestersubmitter/htcondor_submitter.py b/pandaharvester/harvestersubmitter/htcondor_submitter.py index 49c1917c..97b240f6 100644 --- a/pandaharvester/harvestersubmitter/htcondor_submitter.py +++ b/pandaharvester/harvestersubmitter/htcondor_submitter.py @@ -15,6 +15,7 @@ from pandaharvester.harvestercore import core_utils from pandaharvester.harvestercore.plugin_base import PluginBase from pandaharvester.harvestermisc.info_utils import PandaQueuesDict +from pandaharvester.harvestermisc.token_utils import endpoint_to_filename from pandaharvester.harvestermisc.htcondor_utils import get_job_id_tuple_from_batchid from pandaharvester.harvestermisc.htcondor_utils import CondorJobSubmit from pandaharvester.harvestersubmitter import submitter_common @@ -315,7 +316,7 @@ def submit_bag_of_workers(data_list): def make_a_jdl(workspec, template, n_core_per_node, log_dir, panda_queue_name, executable_file, x509_user_proxy, log_subdir=None, ce_info_dict=dict(), batch_log_dict=dict(), pilot_url=None, special_par='', harvester_queue_config=None, is_unified_queue=False, - pilot_version='unknown', python_version='unknown', **kwarg): + pilot_version='unknown', python_version='unknown', token_dir=None, **kwarg): # make logger tmpLog = core_utils.make_logger(baseLogger, 'workerID={0}'.format(workspec.workerID), method_name='make_a_jdl') @@ -345,12 +346,14 @@ def make_a_jdl(workspec, template, n_core_per_node, log_dir, panda_queue_name, e tmpLog.debug('job attributes override by AGIS special_par: {0}={1}'.format(attr, str(_match.group(1)))) # derived job attributes n_node = _div_round_up(n_core_total, n_core_per_node) + request_ram_bytes = request_ram * 2**20 request_ram_per_core = _div_round_up(request_ram * n_node, n_core_total) + request_ram_bytes_per_core = _div_round_up(request_ram_bytes * n_node, n_core_total) request_cputime = request_walltime * n_core_total request_walltime_minute = _div_round_up(request_walltime, 60) request_cputime_minute = _div_round_up(request_cputime, 60) # decide prodSourceLabel - pilot_opt_dict = submitter_common.get_complicated_pilot_options(workspec.pilotType, pilot_url=pilot_url) + pilot_opt_dict = submitter_common.get_complicated_pilot_options(workspec.pilotType, pilot_url, pilot_version) if pilot_opt_dict is None: prod_source_label = harvester_queue_config.get_source_label(workspec.jobType) pilot_type_opt = workspec.pilotType @@ -359,6 +362,15 @@ def make_a_jdl(workspec, template, n_core_per_node, log_dir, panda_queue_name, e prod_source_label = pilot_opt_dict['prod_source_label'] pilot_type_opt = pilot_opt_dict['pilot_type_opt'] pilot_url_str = pilot_opt_dict['pilot_url_str'] + # get token filename according to CE + token_filename = None + if token_dir is not None and ce_info_dict.get('ce_endpoint'): + token_filename = endpoint_to_filename(ce_info_dict['ce_endpoint']) + token_path = None + if token_dir is not None and token_filename is not None: + token_path = os.path.join(token_dir, token_filename) + else: + tmpLog.warning('token_path is None: site={0}, token_dir={1} , token_filename={2}'.format(panda_queue_name, token_dir, token_filename)) # open tmpfile as submit description file tmpFile = tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='_submit.sdf', dir=workspec.get_access_point()) # placeholder map @@ -369,7 +381,9 @@ def make_a_jdl(workspec, template, n_core_per_node, log_dir, panda_queue_name, e 'nCoreTotal': n_core_total, 'nNode': n_node, 'requestRam': request_ram, + 'requestRamBytes': request_ram_bytes, 'requestRamPerCore': request_ram_per_core, + 'requestRamBytesPerCore': request_ram_bytes_per_core, 'requestDisk': request_disk, 'requestWalltime': request_walltime, 'requestWalltimeMinute': request_walltime_minute, @@ -402,6 +416,9 @@ def make_a_jdl(workspec, template, n_core_per_node, log_dir, panda_queue_name, e 'submissionHost': workspec.submissionHost, 'submissionHostShort': workspec.submissionHost.split('.')[0], 'ceARCGridType': ce_info_dict.get('ce_arc_grid_type', 'nordugrid'), + 'tokenDir': token_dir, + 'tokenFilename': token_filename, + 'tokenPath': token_path, } # fill in template string jdl_str = template.format(**placeholder_map) @@ -466,6 +483,16 @@ def __init__(self, **kwarg): self.x509UserProxyAnalysis except AttributeError: self.x509UserProxyAnalysis = os.getenv('X509_USER_PROXY_ANAL') + # Default token directory for a queue + try: + self.tokenDir + except AttributeError: + self.tokenDir = None + # token directory for analysis jobs in grandly unified queues + try: + self.tokenDirAnalysis + except AttributeError: + self.tokenDirAnalysis = None # ATLAS AGIS try: self.useAtlasAGIS = bool(self.useAtlasAGIS) @@ -528,6 +555,11 @@ def __init__(self, **kwarg): self.minBulkToRamdomizedSchedd except AttributeError: self.minBulkToRamdomizedSchedd = 20 + # try to use analysis credentials first + try: + self.useAnalysisCredentials + except AttributeError: + self.useAnalysisCredentials = False # record of information of CE statistics self.ceStatsLock = threading.Lock() self.ceStats = dict() @@ -668,18 +700,25 @@ def _handle_one_worker(workspec, to_submit=to_submit_any): # make logger tmpLog = core_utils.make_logger(baseLogger, 'site={0} workerID={1}'.format(self.queueName, workspec.workerID), method_name='_handle_one_worker') - def _choose_proxy(workspec): + def _choose_credential(workspec): """ - Choose the proxy based on the job type + Choose the credential based on the job type """ job_type = workspec.jobType proxy = self.x509UserProxy - if is_grandly_unified_queue and job_type in ('user', 'panda', 'analysis') and self.x509UserProxyAnalysis: - tmpLog.debug('Taking analysis proxy') - proxy = self.x509UserProxyAnalysis + token_dir = self.tokenDir + if (is_grandly_unified_queue and job_type in ('user', 'panda', 'analysis')) or self.useAnalysisCredentials: + if self.x509UserProxyAnalysis: + tmpLog.debug('Taking analysis proxy') + proxy = self.x509UserProxyAnalysis + if self.tokenDirAnalysis: + tmpLog.debug('Taking analysis token_dir') + token_dir = self.tokenDirAnalysis else: tmpLog.debug('Taking default proxy') - return proxy + if self.tokenDir: + tmpLog.debug('Taking default token_dir') + return proxy, token_dir # initialize ce_info_dict = dict() batch_log_dict = dict() @@ -823,7 +862,7 @@ def _choose_proxy(workspec): tmpLog.debug('Done jobspec attribute setting') # choose the x509 certificate based on the type of job (analysis or production) - proxy = _choose_proxy(workspec) + proxy, token_dir = _choose_credential(workspec) # set data dict data.update({ @@ -847,6 +886,7 @@ def _choose_proxy(workspec): 'pilot_url': pilot_url, 'pilot_version': pilot_version, 'python_version': python_version, + 'token_dir': token_dir, }) return data diff --git a/pandaharvester/harvestersubmitter/k8s_submitter.py b/pandaharvester/harvestersubmitter/k8s_submitter.py index dc4629fa..8afcf0f9 100644 --- a/pandaharvester/harvestersubmitter/k8s_submitter.py +++ b/pandaharvester/harvestersubmitter/k8s_submitter.py @@ -220,7 +220,8 @@ def submit_k8s_worker(self, work_spec): python_version = str(this_panda_queue_dict.get('python_version', '2')) # prod_source_label = harvester_queue_config.get_source_label(work_spec.jobType) - pilot_opt_dict = submitter_common.get_complicated_pilot_options(work_spec.pilotType) + pilot_opt_dict = submitter_common.get_complicated_pilot_options(work_spec.pilotType, pilot_url, + pilot_version) if pilot_opt_dict is None: prod_source_label = harvester_queue_config.get_source_label(work_spec.jobType) pilot_type = work_spec.pilotType @@ -235,7 +236,7 @@ def submit_k8s_worker(self, work_spec): # submit the worker rsp, yaml_content_final = self.k8s_client.create_job_from_yaml(yaml_content, work_spec, prod_source_label, pilot_type, pilot_url_str, - pilot_python_option, + pilot_python_option, pilot_version, container_image, executable, args, cert, max_time=max_time) except Exception as _e: diff --git a/pandaharvester/harvestersubmitter/submitter_common.py b/pandaharvester/harvestersubmitter/submitter_common.py index fd0da984..8acc3059 100644 --- a/pandaharvester/harvestersubmitter/submitter_common.py +++ b/pandaharvester/harvestersubmitter/submitter_common.py @@ -1,11 +1,16 @@ # Map "pilotType" (defined in harvester) to prodSourceLabel and pilotType option (defined in pilot, -i option) # and piloturl (pilot option --piloturl) for pilot 2 -def get_complicated_pilot_options(pilot_type, pilot_url=None): +def get_complicated_pilot_options(pilot_type, pilot_url=None, pilot_version=""): + # for pilot 3 + is_pilot3 = True if pilot_version.startswith('3') else False + # map + # 211012 currently only RC and PT may run pilot 3 pt_psl_map = { 'RC': { 'prod_source_label': 'rc_test2', 'pilot_type_opt': 'RC', - 'pilot_url_str': '--piloturl http://cern.ch/atlas-panda-pilot/pilot2-dev.tar.gz', + 'pilot_url_str': '--piloturl http://cern.ch/atlas-panda-pilot/pilot3-dev.tar.gz' if is_pilot3 \ + else '--piloturl http://cern.ch/atlas-panda-pilot/pilot2-dev.tar.gz', }, 'ALRB': { 'prod_source_label': 'rc_alrb', @@ -15,7 +20,8 @@ def get_complicated_pilot_options(pilot_type, pilot_url=None): 'PT': { 'prod_source_label': 'ptest', 'pilot_type_opt': 'PR', - 'pilot_url_str': '--piloturl http://cern.ch/atlas-panda-pilot/pilot2-dev2.tar.gz', + 'pilot_url_str': '--piloturl http://cern.ch/atlas-panda-pilot/pilot3-dev2.tar.gz' if is_pilot3 \ + else '--piloturl http://cern.ch/atlas-panda-pilot/pilot2-dev2.tar.gz', }, } pilot_opt_dict = pt_psl_map.get(pilot_type, None) @@ -23,7 +29,6 @@ def get_complicated_pilot_options(pilot_type, pilot_url=None): pilot_opt_dict['pilot_url_str'] = '--piloturl {0}'.format(pilot_url) return pilot_opt_dict - # get special flag of pilot wrapper about python version of pilot, and whether to run with python 3 if python version is "3" def get_python_version_option(python_version, prod_source_label): option = '' diff --git a/pandaharvester/harvestersweeper/htcondor_sweeper.py b/pandaharvester/harvestersweeper/htcondor_sweeper.py index 7cc30daa..30e0f710 100644 --- a/pandaharvester/harvestersweeper/htcondor_sweeper.py +++ b/pandaharvester/harvestersweeper/htcondor_sweeper.py @@ -103,10 +103,10 @@ def kill_workers(self, workspec_list): # Fill return list for workspec in workspec_list: if workspec.batchID is None: - ret = (True, 'worker withoug batchID; skipped') + ret = (True, 'worker without batchID; skipped') else: ret = all_job_ret_map.get(condor_job_id_from_workspec(workspec), - (False, 'batch job unfound in return map')) + (False, 'batch job not found in return map')) retList.append(ret) tmpLog.debug('done') # Return diff --git a/pandaharvester/panda_pkg_info.py b/pandaharvester/panda_pkg_info.py index 5b79ccf1..7420bd36 100644 --- a/pandaharvester/panda_pkg_info.py +++ b/pandaharvester/panda_pkg_info.py @@ -1 +1 @@ -release_version = "0.2.8" +release_version = "0.2.10" diff --git a/setup.py b/setup.py index 92ea5f28..08d9ea54 100644 --- a/setup.py +++ b/setup.py @@ -37,6 +37,7 @@ 'psutil >= 5.4.8', 'scandir; python_version < "3.5"', 'panda-pilot >= 2.7.2.1', + 'six', ], # optional pip dependencies diff --git a/templates/init.d/panda_harvester-uwsgi.rpmnew.template b/templates/init.d/panda_harvester-uwsgi.rpmnew.template index 16b476b7..b2aab55b 100755 --- a/templates/init.d/panda_harvester-uwsgi.rpmnew.template +++ b/templates/init.d/panda_harvester-uwsgi.rpmnew.template @@ -16,15 +16,18 @@ ARGV="$@" set -a #======= START CONFIGURATION SECTION ========================== # user and group to run harvester uWSGI -userName="#FIXME" -groupName="#FIXME" +# FIXME +userName=atlpan +# FIXME +groupName=zp # setup python and virtual env -VIRTUAL_ENV=/#FIXME +# FIXME +VIRTUAL_ENV=/opt/harvester # set log directory -#LOG_DIR=${VIRTUAL_ENV}/var/log/panda -LOG_DIR=/#FIXME +# FIXME +LOG_DIR=/var/log/panda # pid and lock files PIDFILE=${LOG_DIR}/panda_harvester.pid diff --git a/templates/panda_harvester.cfg.rpmnew.template b/templates/panda_harvester.cfg.rpmnew.template index 8b5ee369..ee976da2 100644 --- a/templates/panda_harvester.cfg.rpmnew.template +++ b/templates/panda_harvester.cfg.rpmnew.template @@ -122,15 +122,24 @@ nConnections = 5 # timeout timeout = 180 +# auth type x509 (default) or oidc +auth_type = x509 + # CA file ca_cert = /etc/pki/tls/certs/CERN-bundle.pem -# certificate +# certificate for x509 cert_file = FIXME -# key +# key for x509 key_file = FIXME +# token for oidc. bare string or filename (file:/path) +auth_token = FIXME + +# origin for oidc +auth_origin = FIXME + # base URL via http pandaURL = http://pandaserver.cern.ch:25080/server/panda