diff --git a/Dockerfile b/Dockerfile index 08f35aa5..b884bf18 100644 --- a/Dockerfile +++ b/Dockerfile @@ -25,9 +25,15 @@ RUN yum upgrade -y && \ yum clean all && \ rm -rf /var/cache/yum -RUN yum install -y httpd.x86_64 conda gridsite mod_ssl.x86_64 httpd-devel.x86_64 gcc.x86_64 supervisor.noarch fetch-crl.noarch lcg-CA postgresql postgresql-contrib postgresql-static postgresql-libs postgresql-devel && \ - yum clean all && \ - rm -rf /var/cache/yum +# RUN yum install -y httpd.x86_64 conda gridsite mod_ssl.x86_64 httpd-devel.x86_64 gcc.x86_64 supervisor.noarch fetch-crl.noarch lcg-CA postgresql postgresql-contrib postgresql-static postgresql-libs postgresql-devel && \ +# yum clean all && \ +# rm -rf /var/cache/yum +RUN yum install -y httpd.x86_64 conda gridsite mod_ssl.x86_64 httpd-devel.x86_64 gcc.x86_64 supervisor.noarch fetch-crl.noarch lcg-CA redis && \ +yum clean all && \ +rm -rf /var/cache/yum + +RUN yum install -y https://download.postgresql.org/pub/repos/yum/reporpms/EL-7-x86_64/pgdg-redhat-repo-latest.noarch.rpm +RUN yum install -y postgresql14 # RUN curl http://repository.egi.eu/sw/production/cas/1/current/repo-files/EGI-trustanchors.repo -o /etc/yum.repos.d/EGI-trustanchors.repo RUN curl https://repository.egi.eu/sw/production/cas/1/current/repo-files/EGI-trustanchors.repo -o /etc/yum.repos.d/EGI-trustanchors.repo diff --git a/atlas/lib/idds/atlas/workflow/atlasdagwork.py b/atlas/lib/idds/atlas/workflow/atlasdagwork.py new file mode 100644 index 00000000..4fc3b5e6 --- /dev/null +++ b/atlas/lib/idds/atlas/workflow/atlasdagwork.py @@ -0,0 +1,498 @@ +#!/usr/bin/env python +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0OA +# +# Authors: +# - Wen Guan, , 2020 + +try: + import ConfigParser +except ImportError: + import configparser as ConfigParser + +import copy +import json +import os +import traceback +import uuid +import urllib +import ssl + +from idds.common import exceptions +from idds.common.constants import (TransformType, CollectionType, CollectionStatus, + ContentStatus, ContentType, + ProcessingStatus, WorkStatus) +from idds.workflow.work import Work +from idds.workflow.workflow import Condition +import logging + + +class DomaCondition(Condition): + def __init__(self, cond=None, current_work=None, true_work=None, false_work=None): + super(DomaCondition, self).__init__(cond=cond, current_work=current_work, + true_work=true_work, false_work=false_work) + + +class DomaLSSTWork(Work): + def __init__(self, executable=None, arguments=None, parameters=None, setup=None, + work_tag='lsst', exec_type='panda', sandbox=None, work_id=None, + primary_input_collection=None, other_input_collections=None, input_collections=None, + primary_output_collection=None, other_output_collections=None, + output_collections=None, log_collections=None, + logger=None, dependency_map=None, task_name=""): + """ + Init a work/task/transformation. + + :param setup: A string to setup the executable enviroment, it can be None. + :param executable: The executable. + :param arguments: The arguments. + :param parameters: A dict with arguments needed to be replaced. + :param work_type: The work type like data carousel, hyperparameteroptimization and so on. + :param exec_type: The exec type like 'local', 'remote'(with remote_package set), 'docker' and so on. + :param sandbox: The sandbox. + :param work_id: The work/task id. + :param primary_input_collection: The primary input collection. + :param other_input_collections: List of the input collections. + :param output_collections: List of the output collections. + # :param workflow: The workflow the current work belongs to. + """ + + super(DomaLSSTWork, self).__init__(executable=executable, arguments=arguments, + parameters=parameters, setup=setup, work_type=TransformType.Processing, + work_tag=work_tag, exec_type=exec_type, sandbox=sandbox, work_id=work_id, + primary_input_collection=primary_input_collection, + other_input_collections=other_input_collections, + primary_output_collection=primary_output_collection, + other_output_collections=other_output_collections, + input_collections=input_collections, + output_collections=output_collections, + log_collections=log_collections, + logger=logger) + self.pandamonitor = None + self.dependency_map = dependency_map + self.logger.setLevel(logging.DEBUG) + self.task_name = task_name + + def my_condition(self): + if self.is_finished(): + return True + return False + + def jobs_to_idd_ds_status(self, jobstatus): + if jobstatus == 'finished': + return ContentStatus.Available + elif jobstatus == 'failed ': + return ContentStatus.Failed + else: + return ContentStatus.Processing + + def load_panda_config(self): + panda_config = ConfigParser.ConfigParser() + if os.environ.get('IDDS_PANDA_CONFIG', None): + configfile = os.environ['IDDS_PANDA_CONFIG'] + if panda_config.read(configfile) == [configfile]: + return panda_config + + configfiles = ['%s/etc/panda/panda.cfg' % os.environ.get('IDDS_HOME', ''), + '/etc/panda/panda.cfg', '/opt/idds/etc/panda/panda.cfg', + '%s/etc/panda/panda.cfg' % os.environ.get('VIRTUAL_ENV', '')] + for configfile in configfiles: + if panda_config.read(configfile) == [configfile]: + return panda_config + return panda_config + + def load_panda_monitor(self): + panda_config = self.load_panda_config() + self.logger.info("panda config: %s" % panda_config) + if panda_config.has_section('panda'): + if panda_config.has_option('panda', 'pandamonitor'): + pandamonitor = panda_config.get('panda', 'pandamonitor') + return pandamonitor + return None + + def poll_external_collection(self, coll): + try: + # if 'coll_metadata' in coll and 'is_open' in coll['coll_metadata'] and not coll['coll_metadata']['is_open']: + if 'status' in coll and coll['status'] in [CollectionStatus.Closed]: + return coll + else: + # client = self.get_rucio_client() + # did_meta = client.get_metadata(scope=coll['scope'], name=coll['name']) + if 'coll_metadata' not in coll: + coll['coll_metadata'] = {} + coll['coll_metadata']['bytes'] = 1 + # coll['coll_metadata']['total_files'] = 1 + coll['coll_metadata']['availability'] = 1 + coll['coll_metadata']['events'] = 1 + coll['coll_metadata']['is_open'] = True + coll['coll_metadata']['run_number'] = 1 + coll['coll_metadata']['did_type'] = 'DATASET' + coll['coll_metadata']['list_all_files'] = False + + if 'is_open' in coll['coll_metadata'] and not coll['coll_metadata']['is_open']: + coll_status = CollectionStatus.Closed + else: + coll_status = CollectionStatus.Open + coll['status'] = coll_status + coll['coll_type'] = CollectionType.Dataset + + return coll + except Exception as ex: + self.logger.error(ex) + self.logger.error(traceback.format_exc()) + raise exceptions.IDDSException('%s: %s' % (str(ex), traceback.format_exc())) + + def get_input_collections(self): + """ + *** Function called by Transformer agent. + """ + colls = [self._primary_input_collection] + self._other_input_collections + for coll_int_id in colls: + coll = self.collections[coll_int_id] + coll = self.poll_external_collection(coll) + self.collections[coll_int_id] = coll + return super(DomaLSSTWork, self).get_input_collections() + + def get_unsubmitted_inputs(self): + not_submitted_inputs = filter(lambda t: not t["submitted"], self.dependency_map) + tasks_to_check = [] + for job in not_submitted_inputs: + tasks_to_check.extend([(input["task"], input["inputname"]) for input in job["dependencies"] if not input["available"]]) + tasks_to_check_compact = {} + for task in tasks_to_check: + tasks_to_check_compact.setdefault(task[0], set()).add(task[1]) + return tasks_to_check_compact + + def set_dependency_input_available(self, taskname, inputname): + for job in self.dependency_map: + for dependency in job["dependencies"]: + if dependency["task"] == taskname and dependency["inputname"] == inputname: + dependency["available"] = True + + def update_dependencies(self): + tasks_to_check = self.get_unsubmitted_inputs() + for task, inputs in tasks_to_check.items(): + _, outputs = self.poll_panda_task(task_name=task) + for input in inputs: + if outputs.get(input, ContentStatus.Processing) == ContentStatus.Available: + self.set_dependency_input_available(task, input) + + def get_ready_inputs(self): + not_submitted_inputs = filter(lambda j: not j["submitted"], self.dependency_map) + files_to_submit = [] + for job in not_submitted_inputs: + unresolved_deps = [input for input in job["dependencies"] if not input["available"]] + if len(unresolved_deps) == 0: + files_to_submit.append(job["name"]) + return files_to_submit + + def check_dependencies(self): + self.update_dependencies() + return self.get_ready_inputs() + + def can_close(self): + not_submitted_inputs = list(filter(lambda t: not t["submitted"], self.dependency_map)) + if len(not_submitted_inputs) == 0: + return True + else: + return False + + def get_input_contents(self): + """ + Get all input contents from DDM. + """ + try: + files = self.check_dependencies() + ret_files = [] + coll = self.collections[self._primary_input_collection] + for file in files: + ret_file = {'coll_id': coll['coll_id'], + 'scope': coll['scope'], + 'name': file, # or a different file name from the dataset name + 'bytes': 1, + 'adler32': '12345678', + 'min_id': 0, + 'max_id': 1, + 'content_type': ContentType.File, + 'content_metadata': {'events': 1}} # here events is all events for eventservice, not used here. + ret_files.append(ret_file) + return ret_files + except Exception as ex: + self.logger.error(ex) + self.logger.error(traceback.format_exc()) + raise exceptions.IDDSException('%s: %s' % (str(ex), traceback.format_exc())) + + def get_mapped_inputs(self, mapped_input_output_maps): + ret = [] + for map_id in mapped_input_output_maps: + inputs = mapped_input_output_maps[map_id]['inputs'] + + # if 'primary' is not set, the first one is the primary input. + primary_input = inputs[0] + for ip in inputs: + if 'primary' in ip['content_metadata'] and ip['content_metadata']['primary']: + primary_input = ip + ret.append(primary_input) + return ret + + def get_new_input_output_maps(self, mapped_input_output_maps={}): + """ + *** Function called by Transformer agent. + New inputs which are not yet mapped to outputs. + + :param mapped_input_output_maps: Inputs that are already mapped. + """ + inputs = self.get_input_contents() + mapped_inputs = self.get_mapped_inputs(mapped_input_output_maps) + mapped_inputs_scope_name = [ip['name'] for ip in mapped_inputs] + + new_inputs = [] + new_input_output_maps = {} + for ip in inputs: + ip_scope_name = ip['name'] + if ip_scope_name not in mapped_inputs_scope_name: + new_inputs.append(ip) + + # to avoid cheking new inputs if there are no new inputs anymore + if not new_inputs and self.collections[self._primary_input_collection]['status'] in [CollectionStatus.Closed]: + self.set_has_new_inputs(False) + else: + mapped_keys = mapped_input_output_maps.keys() + if mapped_keys: + next_key = max(mapped_keys) + 1 + else: + next_key = 1 + for ip in new_inputs: + out_ip = copy.deepcopy(ip) + out_ip['coll_id'] = self.collections[self._primary_output_collection]['coll_id'] + new_input_output_maps[next_key] = {'inputs': [ip], + 'outputs': [out_ip]} + next_key += 1 + self.logger.debug("get_new_input_output_maps, new_input_output_maps: %s" % str(new_input_output_maps)) + + for index, _inputs in new_input_output_maps.items(): + if len(_inputs['inputs']) > 0: + for item in self.dependency_map: + if item["name"] == _inputs['inputs'][0]["name"]: + item["submitted"] = True # 0 is used due to a single file pseudo input + + if self.can_close(): + self.collections[self._primary_input_collection]['coll_metadata']['is_open'] = False + self.collections[self._primary_input_collection]['status'] = CollectionStatus.Closed + + return new_input_output_maps + + def get_processing(self, input_output_maps): + """ + *** Function called by Transformer agent. + + If there is already an active processing for this work, will do nothing. + If there is no active processings, create_processing will be called. + """ + if self.active_processings: + return self.processings[self.active_processings[0]] + else: + return None + + def create_processing(self, input_output_maps): + """ + *** Function called by Transformer agent. + + :param input_output_maps: new maps from inputs to outputs. + """ + in_files = [] + for map_id in input_output_maps: + # one map is a job which transform the inputs to outputs. + inputs = input_output_maps[map_id]['inputs'] + # outputs = input_output_maps[map_id]['outputs'] + for ip in inputs: + in_files.append(ip['name']) + + taskParamMap = {} + taskParamMap['vo'] = 'wlcg' + taskParamMap['site'] = 'BNL_OSG_1' + taskParamMap['workingGroup'] = 'lsst' + taskParamMap['nFilesPerJob'] = 1 + taskParamMap['nFiles'] = len(in_files) + taskParamMap['noInput'] = True + taskParamMap['pfnList'] = in_files + taskParamMap['taskName'] = self.task_name + taskParamMap['userName'] = 'Siarhei Padolski' + taskParamMap['taskPriority'] = 900 + taskParamMap['architecture'] = '' + taskParamMap['transUses'] = '' + taskParamMap['transHome'] = None + taskParamMap['transPath'] = 'https://atlpan.web.cern.ch/atlpan/bash-c' + taskParamMap['processingType'] = 'testidds' + taskParamMap['prodSourceLabel'] = 'test' + taskParamMap['taskType'] = 'test' + taskParamMap['coreCount'] = 1 + taskParamMap['skipScout'] = True + taskParamMap['cloud'] = 'US' + taskParamMap['jobParameters'] = [ + {'type': 'constant', + 'value': "echo ${IN/L}", # noqa: E501 + }, + ] + + proc = {'processing_metadata': {'internal_id': str(uuid.uuid1()), + 'task_id': None, + 'task_param': taskParamMap}} + self.add_processing_to_processings(proc) + self.active_processings.append(proc['processing_metadata']['internal_id']) + return proc + + def submit_panda_task(self, processing): + try: + from pandaclient import Client + + task_param = processing['processing_metadata']['task_param'] + return_code = Client.insertTaskParams(task_param, verbose=True) + if return_code[0] == 0: + return return_code[1][1] + except Exception as ex: + self.logger.error(ex) + self.logger.error(traceback.format_exc()) + # raise exceptions.AgentPluginError('%s: %s' % (str(ex), traceback.format_exc())) + return None + + def submit_processing(self, processing): + """ + *** Function called by Carrier agent. + """ + if 'task_id' in processing['processing_metadata'] and processing['processing_metadata']['task_id']: + pass + else: + task_id = self.submit_panda_task(processing) + processing['processing_metadata']['task_id'] = task_id + + def download_payload_json(self, task_url): + response = None + try: + ctx = ssl.create_default_context() + ctx.check_hostname = False + ctx.verify_mode = ssl.CERT_NONE + req = urllib.request.Request(task_url) + response = urllib.request.urlopen(req, timeout=180, context=ctx).read() + response = json.loads(response) + except Exception or urllib.request.error as e: + raise e + return response + + def poll_panda_task(self, processing=None, task_name=None): + try: + if not self.pandamonitor: + self.pandamonitor = self.load_panda_monitor() + self.logger.info("panda server: %s" % self.pandamonitor) + + task_id = None + task_info = None + if processing: + task_id = processing['processing_metadata']['task_id'] + task_url = self.pandamonitor + '/task/?json&jeditaskid=' + str(task_id) + task_info = self.download_payload_json(task_url) + elif task_name: + task_url = self.pandamonitor + '/tasks/?taskname=' + str(task_name) + "&json" + self.logger.debug("poll_panda_task, task_url: %s" % str(task_url)) + self.logger.debug("poll_panda_task, task_url: %s" % str(self.download_payload_json(task_url))) + task_json = self.download_payload_json(task_url) + if len(task_json) > 0: + task_info = task_json[0] + else: + return "No status", {} + if not task_id: + task_id = task_info.get('jeditaskid', None) + if not task_id: + return "No status", {} + + jobs_url = self.pandamonitor + '/jobs/?json&datasets=yes&jeditaskid=' + str(task_id) + jobs_list = self.download_payload_json(jobs_url) + outputs_status = {} + for job_info in jobs_list['jobs']: + if 'jobstatus' in job_info and 'datasets' in job_info and len(job_info['datasets']) > 0: + output_index = job_info['datasets'][0]['lfn'].split(':')[1] + status = self.jobs_to_idd_ds_status(job_info['jobstatus']) + outputs_status[output_index] = status + + task_status = None + self.logger.debug("poll_panda_task, task_info: %s" % str(task_info)) + if task_info.get("task", None) is not None: + task_status = task_info["task"]["status"] + return task_status, outputs_status + except Exception as ex: + msg = "Failed to check the panda task(%s) status: %s" % (str(task_id), str(ex)) + raise exceptions.IDDSException(msg) + + def poll_processing_updates(self, processing, input_output_maps): + """ + *** Function called by Carrier agent. + """ + updated_contents = [] + update_processing = {} + self.logger.debug("poll_processing_updates, input_output_maps: %s" % str(input_output_maps)) + + if processing: + task_status, outputs_status = self.poll_panda_task(processing=processing) + + self.logger.debug("poll_processing_updates, outputs_status: %s" % str(outputs_status)) + self.logger.debug("poll_processing_updates, task_status: %s" % str(task_status)) + + content_substatus = {'finished': 0, 'unfinished': 0} + for map_id in input_output_maps: + outputs = input_output_maps[map_id]['outputs'] + for content in outputs: + key = content['name'] + if key in outputs_status: + if content.get('substatus', ContentStatus.New) != outputs_status[key]: + updated_content = {'content_id': content['content_id'], + 'substatus': outputs_status[key]} + updated_contents.append(updated_content) + content['substatus'] = outputs_status[key] + if content['substatus'] == ContentStatus.Available: + content_substatus['finished'] += 1 + else: + content_substatus['unfinished'] += 1 + + if task_status and task_status == 'done' and content_substatus['finished'] > 0 and content_substatus['unfinished'] == 0: + update_processing = {'processing_id': processing['processing_id'], + 'parameters': {'status': ProcessingStatus.Finished}} + + self.logger.debug("poll_processing_updates, update_processing: %s" % str(update_processing)) + self.logger.debug("poll_processing_updates, updated_contents: %s" % str(updated_contents)) + return update_processing, updated_contents, {} + + def get_status_statistics(self, registered_input_output_maps): + status_statistics = {} + for map_id in registered_input_output_maps: + outputs = registered_input_output_maps[map_id]['outputs'] + + for content in outputs: + if content['status'].name not in status_statistics: + status_statistics[content['status'].name] = 0 + status_statistics[content['status'].name] += 1 + self.status_statistics = status_statistics + self.logger.debug("registered_input_output_maps, status_statistics: %s" % str(status_statistics)) + return status_statistics + + def syn_work_status(self, registered_input_output_maps): + self.get_status_statistics(registered_input_output_maps) + self.logger.debug("syn_work_status, self.active_processings: %s" % str(self.active_processings)) + self.logger.debug("syn_work_status, self.has_new_inputs(): %s" % str(self.has_new_inputs())) + self.logger.debug("syn_work_status, coll_metadata_is_open: %s" % str(self.collections[self.primary_input_collection]['coll_metadata']['is_open'])) + self.logger.debug("syn_work_status, primary_input_collection_status: %s" % str(self.collections[self.primary_input_collection]['status'])) + + if self.is_processings_terminated() and not self.has_new_inputs(): + keys = self.status_statistics.keys() + if ContentStatus.New.name in keys or ContentStatus.Processing.name in keys: + pass + else: + if len(keys) == 1: + if ContentStatus.Available.name in keys: + self.status = WorkStatus.Finished + else: + self.status = WorkStatus.Failed + else: + self.status = WorkStatus.SubFinished diff --git a/atlas/lib/idds/atlas/workflow/atlaslocalpandawork.py b/atlas/lib/idds/atlas/workflow/atlaslocalpandawork.py new file mode 100644 index 00000000..5f9fbf56 --- /dev/null +++ b/atlas/lib/idds/atlas/workflow/atlaslocalpandawork.py @@ -0,0 +1,412 @@ +#!/usr/bin/env python +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0OA +# +# Authors: +# - Wen Guan, , 2020 - 2021 + + +import json +import os +import re +import traceback + +# from rucio.client.client import Client as RucioClient +from rucio.common.exception import (CannotAuthenticate as RucioCannotAuthenticate) + +from idds.common import exceptions +from idds.common.constants import (ProcessingStatus, WorkStatus) +from idds.common.utils import extract_scope_atlas +# from idds.workflow.work import Work, Processing +# from idds.workflow.workflow import Condition +from .atlaspandawork import ATLASPandaWork + + +class ATLASLocalPandaWork(ATLASPandaWork): + def __init__(self, task_parameters=None, + work_tag='atlas', exec_type='panda', work_id=None, + primary_input_collection=None, other_input_collections=None, + input_collections=None, + primary_output_collection=None, other_output_collections=None, + output_collections=None, log_collections=None, + logger=None, + num_retries=5, + ): + + self.work_dir = "/tmp" + self.output_files = [] + + super(ATLASLocalPandaWork, self).__init__(task_parameters=task_parameters, + work_tag=work_tag, + exec_type=exec_type, + work_id=work_id, + primary_input_collection=primary_input_collection, + other_input_collections=other_input_collections, + input_collections=input_collections, + primary_output_collection=primary_output_collection, + other_output_collections=other_output_collections, + output_collections=output_collections, + log_collections=log_collections, + logger=logger, + num_retries=num_retries) + + def set_agent_attributes(self, attrs, req_attributes=None): + if self.class_name not in attrs or 'life_time' not in attrs[self.class_name] or int(attrs[self.class_name]['life_time']) <= 0: + attrs['life_time'] = None + super(ATLASLocalPandaWork, self).set_agent_attributes(attrs) + if self.agent_attributes and 'num_retries' in self.agent_attributes and self.agent_attributes['num_retries']: + self.num_retries = int(self.agent_attributes['num_retries']) + if self.agent_attributes and 'work_dir' in self.agent_attributes and self.agent_attributes['work_dir']: + self.work_dir = self.agent_attributes['work_dir'] + + def parse_task_parameters(self, task_parameters): + super(ATLASLocalPandaWork, self).parse_task_parameters(task_parameters) + + try: + if self.task_parameters and 'jobParameters' in self.task_parameters: + jobParameters = self.task_parameters['jobParameters'] + for jobP in jobParameters: + if type(jobP) in [dict]: + if 'dataset' in jobP and 'param_type' in jobP: + if jobP['param_type'] == 'output' and 'value' in jobP: + output_c = jobP['dataset'] + scope, name = extract_scope_atlas(output_c, scopes=[]) + # output_coll = {'scope': scope, 'name': name} + output_f = jobP['value'] + output_file = {'scope': scope, 'name': name, 'file': output_f} + self.output_files.append(output_file) + except Exception as ex: + self.logger.error(ex) + self.logger.error(traceback.format_exc()) + # raise exceptions.IDDSException('%s: %s' % (str(ex), traceback.format_exc())) + self.add_errors(str(ex)) + + def renew_parameters_from_attributes(self): + super(ATLASLocalPandaWork, self).renew_parameters_from_attributes() + if not self.task_parameters: + return + + try: + if 'taskName' in self.task_parameters: + self.task_name = self.task_parameters['taskName'] + self.task_name = self.renew_parameter(self.task_name) + self.set_work_name(self.task_name) + + if 'prodSourceLabel' in self.task_parameters: + self.task_type = self.task_parameters['prodSourceLabel'] + + if 'jobParameters' in self.task_parameters: + jobParameters = self.task_parameters['jobParameters'] + for jobP in jobParameters: + if type(jobP) in [dict]: + for key in jobP: + if jobP[key] and type(jobP[key]) in [str]: + jobP[key] = self.renew_parameter(jobP[key]) + for coll_id in self.collections: + coll_name = self.collections[coll_id].name + self.collections[coll_id].name = self.renew_parameter(coll_name) + + output_files = self.output_files + self.output_files = [] + for output_file in output_files: + output_file['name'] = self.renew_parameter(output_file['name']) + output_file['file'] = self.renew_parameter(output_file['file']) + self.output_files.append(output_file) + except Exception as ex: + self.logger.error(ex) + self.logger.error(traceback.format_exc()) + # raise exceptions.IDDSException('%s: %s' % (str(ex), traceback.format_exc())) + self.add_errors(str(ex)) + + def set_output_data(self, data): + self.output_data = data + if data and type(data) in [dict]: + for key in data: + new_key = "user_" + str(key) + setattr(self, new_key, data[key]) + + def match_pattern_file(self, pattern, lfn): + pattern1 = "\\$[_a-zA-Z0-9]+" + pattern2 = "\\$\\{[_a-zA-Z0-9\\/]+\\}" + while True: + m = re.search(pattern1, pattern) + if m: + pattern = pattern.replace(m.group(0), "*") + else: + break + while True: + m = re.search(pattern2, pattern) + if m: + pattern = pattern.replace(m.group(0), "*") + else: + break + + pattern = pattern.replace(".", "\\.") + pattern = pattern.replace("*", ".*") + pattern = pattern + "$" + + m = re.search(pattern, lfn) + if m: + return True + return False + + def ping_output_files(self): + try: + rucio_client = self.get_rucio_client() + for output_i in range(len(self.output_files)): + output_file = self.output_files[output_i] + files = rucio_client.list_files(scope=output_file['scope'], + name=output_file['name']) + files = [f for f in files] + self.logger.debug("ping_output_files found files for dataset(%s:%s): %s" % (output_file['scope'], + output_file['name'], + str([f['name'] for f in files]))) + for f in files: + if self.match_pattern_file(output_file['file'], f['name']): + self.output_files[output_i]['lfn'] = f['name'] + return True + except Exception as ex: + self.logger.error(ex) + self.logger.error(traceback.format_exc()) + # raise exceptions.IDDSException('%s: %s' % (str(ex), traceback.format_exc())) + return False + + def download_output_files_rucio(self, file_items): + try: + ret = {} + self.logger.debug("download_output_files_rucio: %s" % str(file_items)) + if file_items: + client = self.get_rucio_download_client() + outputs = client.download_dids(file_items) + for output in outputs: + if 'dest_file_paths' in output and output['dest_file_paths']: + ret[output['name']] = output['dest_file_paths'][0] + return ret + except Exception as ex: + self.logger.error(ex) + self.logger.error(traceback.format_exc()) + # raise exceptions.IDDSException('%s: %s' % (str(ex), traceback.format_exc())) + return {} + + def get_download_dir(self, processing): + req_dir = 'request_%s_%s/transform_%s' % (processing['request_id'], + processing['workload_id'], + processing['transform_id']) + d_dir = os.path.join(self.work_dir, req_dir) + if not os.path.exists(d_dir): + os.makedirs(d_dir) + return d_dir + + def download_output_files(self, processing): + try: + failed_items = [] + file_items = [] + for output_i in range(len(self.output_files)): + if 'lfn' in self.output_files[output_i]: + self.logger.debug("download_output_files, Processing (%s) lfn for %s: %s" % (processing['processing_id'], + self.output_files[output_i]['file'], + self.output_files[output_i]['lfn'])) + file_item = {'did': "%s:%s" % (self.output_files[output_i]['scope'], self.output_files[output_i]['lfn']), + 'base_dir': self.get_download_dir(processing), + 'no_subdir': True} + file_items.append(file_item) + else: + self.logger.warn("download_output_files, Processing (%s) lfn for %s not found" % (processing['processing_id'], + self.output_files[output_i]['file'])) + + pfn_items = self.download_output_files_rucio(file_items) + for output_i in range(len(self.output_files)): + if 'lfn' in self.output_files[output_i]: + if self.output_files[output_i]['lfn'] in pfn_items: + pfn = pfn_items[self.output_files[output_i]['lfn']] + self.output_files[output_i]['pfn'] = pfn + self.logger.info("download_output_files, Processing (%s) pfn for %s: %s" % (processing['processing_id'], + self.output_files[output_i]['file'], + self.output_files[output_i]['pfn'])) + else: + self.logger.info("download_output_files, Processing (%s) pfn cannot be found for %s" % (processing['processing_id'], + self.output_files[output_i]['file'])) + failed_items.append(self.output_files[output_i]) + return failed_items + except Exception as ex: + self.logger.error(ex) + self.logger.error(traceback.format_exc()) + # raise exceptions.IDDSException('%s: %s' % (str(ex), traceback.format_exc())) + return [] + + def parse_output_file(self, pfn): + try: + if not os.path.exists(pfn): + self.logger.warn("%s doesn't exist" % pfn) + return {} + else: + with open(pfn, 'r') as f: + data = f.read() + outputs = json.loads(data) + return outputs + except Exception as ex: + # self.logger.error(ex) + # self.logger.error(traceback.format_exc()) + raise exceptions.IDDSException('%s: %s' % (str(ex), traceback.format_exc())) + return {} + + def parse_output_files(self, processing): + try: + output_data = {} + for output_i in range(len(self.output_files)): + if 'pfn' in self.output_files[output_i]: + data = self.parse_output_file(self.output_files[output_i]['pfn']) + if type(data) in [dict]: + output_data.update(data) + return True, output_data + except Exception as ex: + self.logger.error(ex) + self.logger.error(traceback.format_exc()) + # raise exceptions.IDDSException('%s: %s' % (str(ex), traceback.format_exc())) + return False, output_data + + def process_outputs(self, processing): + ping_status = self.ping_output_files() + self.logger.debug("ping_output_files(Processing_id: %s), status: %s" % (processing['processing_id'], ping_status)) + + failed_items = self.download_output_files(processing) + self.logger.debug("download_output_files(Processing_id: %s), failed_items: %s" % (processing['processing_id'], str(failed_items))) + + parse_status, output_data = self.parse_output_files(processing) + self.logger.debug("parse_output_files(Processing_id: %s), parse_status: %s, output_data: %s" % (processing['processing_id'], parse_status, str(output_data))) + + if ping_status and not failed_items and parse_status: + return True, output_data + return False, output_data + + def get_rucio_download_client(self): + try: + from rucio.client.downloadclient import DownloadClient + client = DownloadClient() + except RucioCannotAuthenticate as error: + self.logger.error(error) + self.logger.error(traceback.format_exc()) + raise exceptions.IDDSException('%s: %s' % (str(error), traceback.format_exc())) + return client + + def poll_panda_task_output(self, processing=None, input_output_maps=None, log_prefix=''): + task_id = None + try: + from pandaclient import Client + + if processing: + output_metadata = {} + proc = processing['processing_metadata']['processing'] + task_id = proc.workload_id + if task_id is None: + task_id = self.get_panda_task_id(processing) + + if task_id: + # ret_ids = Client.getPandaIDsWithTaskID(task_id, verbose=False) + task_info = Client.getJediTaskDetails({'jediTaskID': task_id}, True, True, verbose=False) + self.logger.info(log_prefix + "poll_panda_task, task_info: %s" % str(task_info)) + if task_info[0] != 0: + self.logger.warn(log_prefix + "poll_panda_task %s, error getting task status, task_info: %s" % (task_id, str(task_info))) + return ProcessingStatus.Submitting, [], {}, {} + + task_info = task_info[1] + + processing_status = self.get_processing_status_from_panda_status(task_info["status"]) + + if processing_status in [ProcessingStatus.SubFinished, ProcessingStatus.Finished]: + output_status, output_metadata = self.process_outputs(processing) + if not output_status: + err = "Failed to process processing(processing_id: %s, task_id: %s) outputs" % (processing['processing_id'], task_id) + self.logger.error(log_prefix + err) + self.add_errors(err) + processing_status = ProcessingStatus.Failed + + return processing_status, [], {}, output_metadata + else: + return ProcessingStatus.Failed, [], {}, output_metadata + except Exception as ex: + msg = "Failed to check the processing (%s) status: %s" % (str(processing['processing_id']), str(ex)) + self.logger.error(log_prefix + msg) + self.logger.error(log_prefix + ex) + self.logger.error(traceback.format_exc()) + # raise exceptions.IDDSException(msg) + return ProcessingStatus.Submitting, [], {}, {} + + def poll_processing_updates(self, processing, input_output_maps, log_prefix=''): + """ + *** Function called by Carrier agent. + """ + updated_contents = [] + update_contents_full = [] + parameters = {} + # self.logger.debug("poll_processing_updates, input_output_maps: %s" % str(input_output_maps)) + + if processing: + proc = processing['processing_metadata']['processing'] + + processing_status, poll_updated_contents, new_input_output_maps, output_metadata = self.poll_panda_task_output(processing=processing, + input_output_maps=input_output_maps, + log_prefix=log_prefix) + self.logger.debug(log_prefix + "poll_processing_updates, output_metadata: %s" % str(output_metadata)) + + if poll_updated_contents: + proc.has_new_updates() + for content in poll_updated_contents: + updated_content = {'content_id': content['content_id'], + 'status': content['status'], + 'substatus': content['substatus'], + 'content_metadata': content['content_metadata']} + updated_contents.append(updated_content) + + if output_metadata: + parameters = {'output_metadata': output_metadata} + + return processing_status, updated_contents, new_input_output_maps, update_contents_full, parameters + + def syn_work_status(self, registered_input_output_maps, all_updates_flushed=True, output_statistics={}, to_release_input_contents=[]): + super(ATLASLocalPandaWork, self).syn_work_status(registered_input_output_maps, all_updates_flushed, output_statistics, to_release_input_contents) + # self.get_status_statistics(registered_input_output_maps) + self.status_statistics = output_statistics + + self.logger.debug("syn_work_status, self.active_processings: %s" % str(self.active_processings)) + self.logger.debug("syn_work_status, self.has_new_inputs(): %s" % str(self.has_new_inputs)) + self.logger.debug("syn_work_status, coll_metadata_is_open: %s" % + str(self.collections[self._primary_input_collection].coll_metadata['is_open'])) + self.logger.debug("syn_work_status, primary_input_collection_status: %s" % + str(self.collections[self._primary_input_collection].status)) + + self.logger.debug("syn_work_status(%s): is_processings_terminated: %s" % (str(self.get_processing_ids()), str(self.is_processings_terminated()))) + self.logger.debug("syn_work_status(%s): is_input_collections_closed: %s" % (str(self.get_processing_ids()), str(self.is_input_collections_closed()))) + self.logger.debug("syn_work_status(%s): has_new_inputs: %s" % (str(self.get_processing_ids()), str(self.has_new_inputs))) + self.logger.debug("syn_work_status(%s): has_to_release_inputs: %s" % (str(self.get_processing_ids()), str(self.has_to_release_inputs()))) + self.logger.debug("syn_work_status(%s): to_release_input_contents: %s" % (str(self.get_processing_ids()), str(to_release_input_contents))) + + # if self.is_processings_terminated() and self.is_input_collections_closed() and not self.has_new_inputs and not self.has_to_release_inputs() and not to_release_input_contents: + if self.is_processings_terminated(): + # if not self.is_all_outputs_flushed(registered_input_output_maps): + if not all_updates_flushed: + self.logger.warn("The work processings %s is terminated. but not all outputs are flushed. Wait to flush the outputs then finish the transform" % str(self.get_processing_ids())) + return + + if self.is_processings_finished(): + self.status = WorkStatus.Finished + elif self.is_processings_subfinished(): + self.status = WorkStatus.SubFinished + elif self.is_processings_failed(): + self.status = WorkStatus.Failed + elif self.is_processings_expired(): + self.status = WorkStatus.Expired + elif self.is_processings_cancelled(): + self.status = WorkStatus.Cancelled + elif self.is_processings_suspended(): + self.status = WorkStatus.Suspended + elif self.is_processings_running(): + self.status = WorkStatus.Running + else: + self.status = WorkStatus.Transforming + + if self.is_processings_terminated() or self.is_processings_running() or self.is_processings_started(): + self.started = True diff --git a/environment.yml b/environment.yml new file mode 100644 index 00000000..7a678431 --- /dev/null +++ b/environment.yml @@ -0,0 +1,20 @@ +name: iDDS +dependencies: +- python==3.6 +- pip +- pip: + - requests # requests + - SQLAlchemy # db orm + - urllib3 # url connections + - retrying # retrying behavior + - mysqlclient # mysql python client + #- web.py # web service + - futures # multiple process/threads + - stomp.py # Messaging broker client + - cx-Oracle # Oralce client + - unittest2 # unit test tool + - pep8 # checks for PEP8 code style compliance + - flake8 # Wrapper around PyFlakes&pep8 + - pytest # python testing tool + - nose # nose test tools + diff --git a/main/lib/idds/agents/clerk/clerk.py b/main/lib/idds/agents/clerk/clerk.py index a68374ee..181db816 100644 --- a/main/lib/idds/agents/clerk/clerk.py +++ b/main/lib/idds/agents/clerk/clerk.py @@ -508,16 +508,16 @@ def handle_update_request_real(self, req, event): req_status = RequestStatus.Transforming if wf.is_terminated(): - if wf.is_finished(): + if wf.is_finished(synchronize=False): req_status = RequestStatus.Finished else: if to_abort and not to_abort_transform_id: req_status = RequestStatus.Cancelled - elif wf.is_expired(): + elif wf.is_expired(synchronize=False): req_status = RequestStatus.Expired - elif wf.is_subfinished(): + elif wf.is_subfinished(synchronize=False): req_status = RequestStatus.SubFinished - elif wf.is_failed(): + elif wf.is_failed(synchronize=False): req_status = RequestStatus.Failed else: req_status = RequestStatus.Failed diff --git a/main/lib/idds/tests/core_tests.py b/main/lib/idds/tests/core_tests.py index 9066f9a1..71070660 100644 --- a/main/lib/idds/tests/core_tests.py +++ b/main/lib/idds/tests/core_tests.py @@ -1,7 +1,7 @@ import sys import datetime -from idds.common.utils import json_dumps # noqa F401 +from idds.common.utils import json_dumps, setup_logging # noqa F401 from idds.common.constants import ContentStatus, ContentType, ContentRelationType, ContentLocking # noqa F401 from idds.core.requests import get_requests # noqa F401 from idds.core.messages import retrieve_messages # noqa F401 @@ -11,6 +11,11 @@ from idds.core import transforms as core_transforms # noqa F401 from idds.orm.contents import get_input_contents from idds.core.transforms import release_inputs_by_collection, release_inputs_by_collection_old # noqa F401 +from idds.workflowv2.workflow import Workflow # noqa F401 +from idds.workflowv2.work import Work # noqa F401 + + +setup_logging(__name__) def release_inputs_test(): @@ -102,6 +107,34 @@ def show_works(req): print(work_ids) +def print_workflow(workflow, layers=0): + prefix = " " * layers * 4 + for run in workflow.runs: + print(prefix + "run: " + str(run) + ", has_loop_condition: " + str(workflow.runs[run].has_loop_condition())) + if workflow.runs[run].has_loop_condition(): + print(prefix + " Loop condition: %s" % json_dumps(workflow.runs[run].loop_condition, sort_keys=True, indent=4)) + for work_id in workflow.runs[run].works: + print(prefix + " " + str(work_id) + " " + str(type(workflow.runs[run].works[work_id]))) + if type(workflow.runs[run].works[work_id]) in [Workflow]: + print(prefix + " parent_num_run: " + workflow.runs[run].works[work_id].parent_num_run + ", num_run: " + str(workflow.runs[run].works[work_id].num_run)) + print_workflow(workflow.runs[run].works[work_id], layers=layers + 1) + # elif type(workflow.runs[run].works[work_id]) in [Work]: + else: + print(prefix + " " + workflow.runs[run].works[work_id].task_name + ", num_run: " + str(workflow.runs[run].works[work_id].num_run)) + + +def print_workflow_template(workflow, layers=0): + prefix = " " * layers * 4 + print(prefix + str(workflow.template.internal_id) + ", has_loop_condition: " + str(workflow.template.has_loop_condition())) + for work_id in workflow.template.works: + print(prefix + " " + str(work_id) + " " + str(type(workflow.template.works[work_id]))) + if type(workflow.template.works[work_id]) in [Workflow]: + print(prefix + " parent_num_run: " + str(workflow.template.works[work_id].parent_num_run) + ", num_run: " + str(workflow.template.works[work_id].num_run)) + print_workflow_template(workflow.template.works[work_id], layers=layers + 1) + else: + print(prefix + " " + workflow.template.works[work_id].task_name + ", num_run: " + str(workflow.template.works[work_id].num_run)) + + # 283511, 283517 # reqs = get_requests(request_id=599, with_detail=True, with_metadata=True) # reqs = get_requests(request_id=283511, with_request=True, with_detail=False, with_metadata=True) @@ -115,12 +148,15 @@ def show_works(req): # reqs = get_requests(request_id=370028, with_request=True, with_detail=False, with_metadata=True) # reqs = get_requests(request_id=370400, with_request=True, with_detail=False, with_metadata=True) # reqs = get_requests(request_id=371204, with_request=True, with_detail=False, with_metadata=True) -reqs = get_requests(request_id=372678, with_request=True, with_detail=False, with_metadata=True) +# reqs = get_requests(request_id=372678, with_request=True, with_detail=False, with_metadata=True) +# reqs = get_requests(request_id=373602, with_request=True, with_detail=False, with_metadata=True) +# reqs = get_requests(request_id=376086, with_request=True, with_detail=False, with_metadata=True) +reqs = get_requests(request_id=380474, with_request=True, with_detail=False, with_metadata=True) for req in reqs: # print(req['request_id']) - print(req) + # print(req) # print(rets) - print(json_dumps(req, sort_keys=True, indent=4)) + # print(json_dumps(req, sort_keys=True, indent=4)) # show_works(req) pass workflow = req['request_metadata']['workflow'] @@ -129,16 +165,28 @@ def show_works(req): # print(workflow.runs["1"]) # print(json_dumps(workflow.runs["1"], sort_keys=True, indent=4)) - print(workflow.runs["1"].works.keys()) - print(workflow.runs["1"].works["048a1811"]) + # print(workflow.runs["1"].works.keys()) + # print(workflow.runs["1"].has_loop_condition()) + # print(workflow.runs["1"].works["7aa1ec08"]) # print(json_dumps(workflow.runs["1"].works["048a1811"], indent=4)) - print(workflow.runs["1"].works["048a1811"].runs.keys()) - print(json_dumps(workflow.runs["1"].works["048a1811"].runs["2"], indent=4)) + # print(workflow.runs["1"].works["7aa1ec08"].runs.keys()) + # print(workflow.runs["1"].works["7aa1ec08"].runs["1"].has_loop_condition()) + # print(workflow.runs["1"].works["7aa1ec08"].runs["1"].works.keys()) + + # print(json_dumps(workflow.runs["1"].works["7aa1ec08"].runs["1"], indent=4)) if hasattr(workflow, 'get_relation_map'): # print(json_dumps(workflow.get_relation_map(), sort_keys=True, indent=4)) pass -sys.exit(0) + print("workflow") + print_workflow(workflow) + print("workflow template") + print_workflow_template(workflow) + + # workflow.sync_works() + +# sys.exit(0) + """ # reqs = get_requests() @@ -155,7 +203,7 @@ def show_works(req): """ -tfs = get_transforms(request_id=370028) +tfs = get_transforms(request_id=380474) # tfs = get_transforms(transform_id=350723) for tf in tfs: # print(tf) @@ -166,6 +214,7 @@ def show_works(req): print(tf['transform_metadata']['work_name']) print(tf['transform_metadata']['work'].num_run) print(tf['transform_metadata']['work'].task_name) + print(tf['transform_metadata']['work'].output_data) pass sys.exit(0) @@ -190,7 +239,7 @@ def show_works(req): sys.exit(0) """ -prs = get_processings(request_id=350723) +prs = get_processings(request_id=373602) # prs = get_processings(transform_id=350723) i = 0 for pr in prs: diff --git a/main/lib/idds/tests/retry_processing.py b/main/lib/idds/tests/retry_processing.py new file mode 100644 index 00000000..9072f7b3 --- /dev/null +++ b/main/lib/idds/tests/retry_processing.py @@ -0,0 +1,9 @@ +import sys + +from pandaclient import Client + +task_id = sys.argv[1] + +status, output = Client.retryTask(task_id) +print(status) +print(output) diff --git a/main/lib/idds/tests/test_migrate_requests.py b/main/lib/idds/tests/test_migrate_requests.py index 8d9ceb52..7f3e1d62 100644 --- a/main/lib/idds/tests/test_migrate_requests.py +++ b/main/lib/idds/tests/test_migrate_requests.py @@ -44,6 +44,7 @@ def migrate(): # old_request_id = 349 old_request_id = 2400 old_request_id = 371204 + old_request_id = 372930 # for old_request_id in [152]: # for old_request_id in [60]: # noqa E115 diff --git a/main/tools/env/setup_idds.sh b/main/tools/env/setup_idds.sh new file mode 100644 index 00000000..8629e1e1 --- /dev/null +++ b/main/tools/env/setup_idds.sh @@ -0,0 +1,10 @@ +#!/bin/bash + +export IDDS_HOST=https://aipanda160.cern.ch:443/idds +export IDDS_LOCAL_CONFIG_ROOT=~/.idds +# export IDDS_CONFIG= +export IDDS_AUTH_TYPE=oidc +# export IDDS_OIDC_TOKEN=~/.idds/.token +export IDDS_VO=Rubin +export IDDS_AUTH_NO_VERIFY=1 + diff --git a/main/tools/k8s/install_k8s.sh b/main/tools/k8s/install_k8s.sh new file mode 100644 index 00000000..e374d069 --- /dev/null +++ b/main/tools/k8s/install_k8s.sh @@ -0,0 +1,211 @@ +#!/bin/bash + +mkdir /opt/k8s +cd /opt/k8s + +curl -LO "https://dl.k8s.io/release/$(curl -L -s https://dl.k8s.io/release/stable.txt)/bin/linux/amd64/kubectl" + +# install -o root -g root -m 0755 kubectl /usr/local/bin/kubectl +install -o root -g root -m 0755 kubectl /usr/bin/kubectl +kubectl version --client + +# kubectl config ~/.kube/config + +# iptables bridged traffic +cat /sys/class/dmi/id/product_uuid +lsmod | grep br_netfilter + +cat < 0 and self.num_finished_works + self.num_subfinished_works <= self.num_total_works) - def is_failed(self): + def is_failed(self, synchronize=True): """ *** Function called by Marshaller agent. """ @@ -1835,20 +1835,20 @@ def is_to_expire(self, expired_at=None, pending_time=None, request_id=None): return False - def is_expired(self): + def is_expired(self, synchronize=True): """ *** Function called by Marshaller agent. """ # return self.is_terminated() and (self.num_expired_works > 0) return self.is_terminated() and self.expired - def is_cancelled(self): + def is_cancelled(self, synchronize=True): """ *** Function called by Marshaller agent. """ return self.is_terminated() and (self.num_cancelled_works > 0) - def is_suspended(self): + def is_suspended(self, synchronize=True): """ *** Function called by Marshaller agent. """ @@ -2167,39 +2167,39 @@ def is_to_expire(self, expired_at=None, pending_time=None, request_id=None): return self.runs[str(self.num_run)].is_to_expire(expired_at=expired_at, pending_time=pending_time, request_id=request_id) return False - def is_terminated(self): + def is_terminated(self, synchronize=True): if self.runs: if self.runs[str(self.num_run)].is_terminated(): if not self.runs[str(self.num_run)].has_loop_condition() or not self.runs[str(self.num_run)].get_loop_condition_status(): return True return False - def is_finished(self): + def is_finished(self, synchronize=True): if self.is_terminated(): return self.runs[str(self.num_run)].is_finished() return False - def is_subfinished(self): + def is_subfinished(self, synchronize=True): if self.is_terminated(): return self.runs[str(self.num_run)].is_subfinished() return False - def is_failed(self): + def is_failed(self, synchronize=True): if self.is_terminated(): return self.runs[str(self.num_run)].is_failed() return False - def is_expired(self): + def is_expired(self, synchronize=True): if self.is_terminated(): return self.runs[str(self.num_run)].is_expired() return False - def is_cancelled(self): + def is_cancelled(self, synchronize=True): if self.is_terminated(): return self.runs[str(self.num_run)].is_cancelled() return False - def is_suspended(self): + def is_suspended(self, synchronize=True): if self.is_terminated(): return self.runs[str(self.num_run)].is_suspended() return False diff --git a/workflow/lib/idds/workflowv2/workflow.py b/workflow/lib/idds/workflowv2/workflow.py index 7d421b4e..057e06bd 100644 --- a/workflow/lib/idds/workflowv2/workflow.py +++ b/workflow/lib/idds/workflowv2/workflow.py @@ -1151,9 +1151,16 @@ def get_works(self): return self.works def get_combined_num_run(self): - if self.parent_num_run: - return str(self.parent_num_run) + "_" + str(self.num_run) - return str(self.num_run) + if self.parent_num_run and len(str(self.parent_num_run)): + if self.num_run and len(str(self.num_run)): + return str(self.parent_num_run) + "_" + str(self.num_run) + else: + return str(self.parent_num_run) + else: + if self.num_run and len(str(self.num_run)): + return str(self.num_run) + else: + return "" def get_new_work_to_run(self, work_id, new_parameters=None): # 1. initialize works @@ -1417,16 +1424,18 @@ def get_loop_condition_status(self): def __str__(self): return str(json_dumps(self)) - def get_new_works(self): + def get_new_works(self, synchronize=True): """ *** Function called by Marshaller agent. new works to be ready to start """ + self.logger.info("%s get_new_works" % self.get_internal_id()) if self.to_cancel: return [] - self.sync_works(to_cancel=self.to_cancel) + if synchronize: + self.sync_works(to_cancel=self.to_cancel) works = [] if self.submitting_works: @@ -1434,24 +1443,30 @@ def get_new_works(self): return works if self.to_start_works: + to_start_works = self.to_start_works.copy() init_works = self.init_works - to_start_works = self.to_start_works - work_id = to_start_works.pop(0) - self.to_start_works = to_start_works - self.get_new_work_to_run(work_id) - if not init_works: - init_works.append(work_id) - self.init_works = init_works + starting_works = [] + for work_id in to_start_works: + if not self.works[work_id].has_dependency(): + starting_works.append(work_id) + self.get_new_work_to_run(work_id) + if not init_works: + init_works.append(work_id) + self.init_works = init_works + for work_id in starting_works: + if work_id in self.to_start_works: + self.to_start_works.remove(work_id) for k in self.new_to_run_works: if isinstance(self.works[k], Work): self.works[k] = self.get_new_parameters_for_work(self.works[k]) works.append(self.works[k]) if isinstance(self.works[k], Workflow): - works = works + self.works[k].get_new_works() + works = works + self.works[k].get_new_works(synchronize=False) for k in self.current_running_works: if isinstance(self.works[k], Workflow): - works = works + self.works[k].get_new_works() + works = works + self.works[k].get_new_works(synchronize=False) + self.logger.info("%s get_new_works done" % self.get_internal_id()) return works def get_current_works(self): @@ -1469,20 +1484,23 @@ def get_current_works(self): works = works + self.works[k].get_current_works() return works - def get_all_works(self): + def get_all_works(self, synchronize=True): """ *** Function called by Marshaller agent. Current running works """ - self.sync_works(to_cancel=self.to_cancel) + self.logger.info("%s get_all_works" % self.get_internal_id()) + if synchronize: + self.sync_works(to_cancel=self.to_cancel) works = [] for k in self.works: if isinstance(self.works[k], Work): works.append(self.works[k]) if isinstance(self.works[k], Workflow): - works = works + self.works[k].get_all_works() + works = works + self.works[k].get_all_works(synchronize=False) + self.logger.info("%s get_all_works done" % self.get_internal_id()) return works def get_primary_initial_collection(self): @@ -1602,7 +1620,7 @@ def first_initialize(self): def sync_works(self, to_cancel=False): if to_cancel: self.to_cancel = to_cancel - self.log_debug("synchroning works") + self.log_debug("%s synchroning works" % self.get_internal_id()) self.first_initialize() self.refresh_works() @@ -1678,7 +1696,7 @@ def sync_works(self, to_cancel=False): # # if it's a loop workflow, to generate new loop # if isinstance(work, Workflow): # work.sync_works() - log_str = "num_total_works: %s" % self.num_total_works + log_str = "%s num_total_works: %s" % (self.get_internal_id(), self.num_total_works) log_str += ", num_finished_works: %s" % self.num_finished_works log_str += ", num_subfinished_works: %s" % self.num_subfinished_works log_str += ", num_failed_works: %s" % self.num_failed_works @@ -1690,7 +1708,7 @@ def sync_works(self, to_cancel=False): self.log_debug(log_str) self.refresh_works() - self.log_debug("synchronized works") + self.log_debug("%s synchronized works" % self.get_internal_id()) def resume_works(self): self.to_cancel = False @@ -1835,7 +1853,7 @@ def is_terminated(self, synchronize=True, new=False): if synchronize: self.sync_works(to_cancel=self.to_cancel) if new: - if (self.to_cancel) and len(self.current_running_works) == 0 and self.num_total_works > 0: + if (self.to_cancel) or (len(self.new_to_run_works) == 0 and len(self.current_running_works) == 0 and self.num_total_works > 0): return True else: if (self.to_cancel or len(self.new_to_run_works) == 0) and len(self.current_running_works) == 0: @@ -2021,6 +2039,7 @@ def metadata(self, value): self.template.parent_num_run = self.parent_num_run self.runs[run_id] = self.template.copy() self.runs[run_id].metadata = runs[run_id] + self.runs[run_id].parent_num_run = self.parent_num_run # self.add_metadata_item('runs', ) @property @@ -2120,7 +2139,10 @@ def transforming(self, value): if str(self.num_run) not in self.runs: self.template.parent_num_run = self.parent_num_run self.runs[str(self.num_run)] = self.template.copy() - self.runs[str(self.num_run)].num_run = self.num_run + if self.runs[str(self.num_run)].has_loop_condition(): + self.runs[str(self.num_run)].num_run = self.num_run + self.runs[str(self.num_run)].parent_num_run = self.parent_num_run + # self.runs[str(self.num_run)].parent_num_run = self.get_combined_num_run() if self.runs[str(self.num_run)].has_loop_condition(): if self._num_run > 1: @@ -2135,6 +2157,9 @@ def submitted(self): def submitted(self, value): pass + def has_dependency(self): + return False + def set_workload_id(self, workload_id): if self.runs: self.runs[str(self.num_run)].workload_id = workload_id @@ -2189,13 +2214,17 @@ def sync_global_parameters(self, global_parameters, sliced_global_parameters=Non return self.runs[str(self.num_run)].sync_global_parameters(global_parameters, sliced_global_parameters) return self.template.sync_global_parameters(global_parameters, sliced_global_parameters) - def get_new_works(self): + def get_new_works(self, synchronize=True): + self.logger.info("%s get_new_works" % self.get_internal_id()) self.log_debug("synchronizing works") - self.sync_works(to_cancel=self.to_cancel) + if synchronize: + self.sync_works(to_cancel=self.to_cancel) self.log_debug("synchronized works") + works = [] if self.runs: - return self.runs[str(self.num_run)].get_new_works() - return [] + works = self.runs[str(self.num_run)].get_new_works(synchronize=False) + self.logger.info("%s get_new_works done" % self.get_internal_id()) + return works def get_current_works(self): self.sync_works(to_cancel=self.to_cancel) @@ -2203,11 +2232,15 @@ def get_current_works(self): return self.runs[str(self.num_run)].get_current_works() return [] - def get_all_works(self): - self.sync_works(to_cancel=self.to_cancel) + def get_all_works(self, synchronize=True): + self.logger.info("%s get_all_works" % self.get_internal_id()) + if synchronize: + self.sync_works(to_cancel=self.to_cancel) + works = [] if self.runs: - return self.runs[str(self.num_run)].get_all_works() - return [] + works = self.runs[str(self.num_run)].get_all_works(synchronize=False) + self.logger.info("%s get_all_works done" % self.get_internal_id()) + return works def get_primary_initial_collection(self): if self.runs: @@ -2311,7 +2344,9 @@ def sync_works(self, to_cancel=False): if str(self.num_run) not in self.runs: self.template.parent_num_run = self.parent_num_run self.runs[str(self.num_run)] = self.template.copy() - self.runs[str(self.num_run)].num_run = self.num_run + if self.runs[str(self.num_run)].has_loop_condition(): + self.runs[str(self.num_run)].num_run = self.num_run + # self.runs[str(self.num_run)].parent_num_run = self.get_combined_num_run() if self.runs[str(self.num_run)].has_loop_condition(): if self.num_run > 1: @@ -2321,23 +2356,26 @@ def sync_works(self, to_cancel=False): self.runs[str(self.num_run)].sync_works(to_cancel=to_cancel) if self.runs[str(self.num_run)].is_terminated(synchronize=False, new=True): + self.logger.info("%s num_run %s is_terminated" % (self.get_internal_id(), self.num_run)) if to_cancel: self.logger.info("num_run %s, to cancel" % self.num_run) else: if self.runs[str(self.num_run)].has_loop_condition(): if self.runs[str(self.num_run)].get_loop_condition_status(): - self.logger.info("num_run %s get_loop_condition_status %s, start next run" % (self.num_run, self.runs[str(self.num_run)].get_loop_condition_status())) + self.logger.info("%s num_run %s get_loop_condition_status %s, start next run" % (self.get_internal_id(), self.num_run, self.runs[str(self.num_run)].get_loop_condition_status())) self._num_run += 1 self.template.parent_num_run = self.parent_num_run self.runs[str(self.num_run)] = self.template.copy() - self.runs[str(self.num_run)].num_run = self.num_run + if self.runs[str(self.num_run)].has_loop_condition(): + self.runs[str(self.num_run)].num_run = self.num_run + p_metadata = self.runs[str(self.num_run - 1)].get_metadata_item('parameter_links') self.runs[str(self.num_run)].add_metadata_item('parameter_links', p_metadata) self.runs[str(self.num_run)].global_parameters = self.runs[str(self.num_run - 1)].global_parameters else: - self.logger.info("num_run %s get_loop_condition_status %s, terminated loop" % (self.num_run, self.runs[str(self.num_run)].get_loop_condition_status())) + self.logger.info("%s num_run %s get_loop_condition_status %s, terminated loop" % (self.get_internal_id(), self.num_run, self.runs[str(self.num_run)].get_loop_condition_status())) self.refresh_works() def get_relation_map(self):