From cf5af80b6e459c5a7fb5558bf93ad9820075dddf Mon Sep 17 00:00:00 2001 From: mishaborodin Date: Fri, 22 Jul 2022 21:19:49 +0200 Subject: [PATCH] Task action refactoring --- atlas/JIRA/__init__.py | 0 atlas/JIRA/client.py | 143 ++++++ atlas/auth/websso/__init__.py | 0 atlas/auth/websso/ssocookies.py | 91 ++++ atlas/jedi/__init__.py | 0 atlas/jedi/client.py | 693 +++++++++++++++++++++++++++ atlas/prodtask/models.py | 78 ++- atlas/prodtask/task_actions.py | 35 ++ atlas/prodtask/tasks.py | 5 + atlas/settings/jediclient.py | 4 + atlas/settings/jiraclient.py | 54 +++ atlas/task_action/__init__.py | 0 atlas/task_action/task_management.py | 300 ++++++++++++ 13 files changed, 1395 insertions(+), 8 deletions(-) create mode 100644 atlas/JIRA/__init__.py create mode 100644 atlas/JIRA/client.py create mode 100644 atlas/auth/websso/__init__.py create mode 100644 atlas/auth/websso/ssocookies.py create mode 100644 atlas/jedi/__init__.py create mode 100644 atlas/jedi/client.py create mode 100644 atlas/settings/jediclient.py create mode 100644 atlas/settings/jiraclient.py create mode 100644 atlas/task_action/__init__.py create mode 100644 atlas/task_action/task_management.py diff --git a/atlas/JIRA/__init__.py b/atlas/JIRA/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/atlas/JIRA/client.py b/atlas/JIRA/client.py new file mode 100644 index 00000000..adad5af6 --- /dev/null +++ b/atlas/JIRA/client.py @@ -0,0 +1,143 @@ +import requests +import json +import copy +import logging + +from atlas.auth.websso.ssocookies import SSOCookies +from atlas.settings.jiraclient import JIRA_CONFIG +_logger = logging.getLogger('prodtaskwebui') + + +class JIRAClient(object): + def __init__(self, sso_cookies=None): + self.sso_cookies = sso_cookies + + def authorize(self): + try: + self.sso_cookies = SSOCookies( + JIRA_CONFIG['auth_url'], + pem_cert_file_path=JIRA_CONFIG['cert'], + pem_cert_key_path=JIRA_CONFIG['cert_key'] + ).get() + return self.sso_cookies + except Exception as ex: + raise Exception('JIRAClient: SSO authentication error: {0}'.format(str(ex))) + + def create_issue(self, summary, description): + if not self.sso_cookies: + raise Exception('JIRAClient: not authorized') + + issue = copy.deepcopy(JIRA_CONFIG['issue_template']) + issue['fields']['summary'] = issue['fields']['summary'] % summary + issue['fields']['description'] = issue['fields']['description'] % description + + headers = {'Content-type': 'application/json'} + + response = requests.post(JIRA_CONFIG['issue_url'], + data=json.dumps(issue), + headers=headers, + cookies=self.sso_cookies, + verify=JIRA_CONFIG['verify_ssl_certificates']) + + if response.status_code != requests.codes.created: + response.raise_for_status() + + result = json.loads(response.content) + + return result['key'] + + def delete_issue(self, issue_key, delete_sub_issues=True): + if not self.sso_cookies: + raise Exception('JIRAClient: not authorized') + + issue_url = '{0}{1}?deleteSubtasks={2}'.format( + JIRA_CONFIG['issue_url'], + issue_key, + str(delete_sub_issues).lower() + ) + + response = requests.delete(issue_url, + cookies=self.sso_cookies, + verify=JIRA_CONFIG['verify_ssl_certificates']) + + if response.status_code != requests.codes.no_content: + response.raise_for_status() + + return True + + def create_sub_issue(self, parent_issue_key, summary, description): + if not self.sso_cookies: + raise Exception('JIRAClient: not authorized') + + issue = copy.deepcopy(JIRA_CONFIG['sub_issue_template']) + issue['fields']['summary'] = issue['fields']['summary'] % summary + issue['fields']['description'] = issue['fields']['description'] % description + issue['fields']['parent']['key'] = issue['fields']['parent']['key'] % parent_issue_key + + headers = {'Content-type': 'application/json'} + + response = requests.post(JIRA_CONFIG['issue_url'], + data=json.dumps(issue), + headers=headers, + cookies=self.sso_cookies, + verify=JIRA_CONFIG['verify_ssl_certificates']) + + if response.status_code != requests.codes.created: + response.raise_for_status() + + result = json.loads(response.content) + + return result['key'] + + def log_exception(self, issue_key, exception, log_msg=None): + try: + if not log_msg: + log_msg = '{0}: {1}'.format(type(exception).__name__, str(exception)) + _logger.exception(log_msg) + self.add_issue_comment(issue_key, log_msg) + except Exception as ex: + if _logger: + _logger.exception('log_exception failed: {0}'.format(str(ex))) + + def add_issue_comment(self, issue_key, comment_body): + if not self.sso_cookies: + raise Exception('JIRAClient: not authorized') + + comment = JIRA_CONFIG['issue_comment_template'].copy() + comment['body'] = comment['body'] % comment_body + + headers = {'Content-type': 'application/json'} + comment_url = '{0}{1}/comment'.format(JIRA_CONFIG['issue_url'], issue_key) + + response = requests.post(comment_url, + data=json.dumps(comment), + headers=headers, + cookies=self.sso_cookies, + verify=JIRA_CONFIG['verify_ssl_certificates']) + + if response.status_code != requests.codes.created: + response.raise_for_status() + + return True + + def close_issue(self, issue_key, comment): + if not self.sso_cookies: + raise Exception('JIRAClient: not authorized') + + issue_close_request = copy.deepcopy(JIRA_CONFIG['issue_close_template']) + issue_close_request['update']['comment'][0]['add']['body'] = \ + issue_close_request['update']['comment'][0]['add']['body'] % comment + + headers = {'Content-type': 'application/json'} + transitions_url = '{0}{1}/transitions'.format(JIRA_CONFIG['issue_url'], issue_key) + + response = requests.post(transitions_url, + data=json.dumps(issue_close_request), + headers=headers, + cookies=self.sso_cookies, + verify=JIRA_CONFIG['verify_ssl_certificates']) + + if response.status_code != requests.codes.no_content: + response.raise_for_status() + + return True \ No newline at end of file diff --git a/atlas/auth/websso/__init__.py b/atlas/auth/websso/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/atlas/auth/websso/ssocookies.py b/atlas/auth/websso/ssocookies.py new file mode 100644 index 00000000..253be2b7 --- /dev/null +++ b/atlas/auth/websso/ssocookies.py @@ -0,0 +1,91 @@ +__version__ = '0.5.5' + +import os.path +import io +import re +import urllib.parse +import pycurl +import html.parser +import base64 +import getpass + + +# noinspection PyUnresolvedReferences, PyBroadException +class SSOCookies(object): + def __init__(self, url, pem_cert_file_path=None, pem_cert_key_path=None, encoding='utf-8'): + self.user_agent_cert = 'curl-sso-certificate/{0} (Mozilla)'.format(__version__) + self.adfs_ep = '/adfs/ls' + self.auth_error = 'HTTP Error 401.2 - Unauthorized' + self.encoding = encoding + + if not pem_cert_file_path or not pem_cert_key_path: + raise Exception('SSOCookies: certificate and/or private key file is not specified') + + if pem_cert_file_path: + if not os.path.isfile(pem_cert_file_path): + raise Exception('SSOCookies: certificate file {0} is not found'.format(pem_cert_file_path)) + if pem_cert_key_path: + if not os.path.isfile(pem_cert_key_path): + raise Exception('SSOCookies: key file {0} is not found'.format(pem_cert_key_path)) + + self.curl = pycurl.Curl() + self.curl.setopt(self.curl.COOKIEFILE, '') + self.curl.setopt(self.curl.USERAGENT, self.user_agent_cert) + self.curl.setopt(self.curl.SSLCERT, pem_cert_file_path) + self.curl.setopt(self.curl.SSLCERTTYPE, 'PEM') + self.curl.setopt(self.curl.SSLKEY, pem_cert_key_path) + self.curl.setopt(self.curl.SSLKEYTYPE, 'PEM') + self.curl.setopt(self.curl.FOLLOWLOCATION, 1) + self.curl.setopt(self.curl.UNRESTRICTED_AUTH, 1) + self.curl.setopt(self.curl.HEADER, 0) + self.curl.setopt(self.curl.SSL_VERIFYPEER, 0) + self.curl.setopt(self.curl.SSL_VERIFYHOST, 0) + self.curl.setopt(self.curl.URL, url) + + _, effective_url = self._request() + + if self.adfs_ep not in effective_url: + raise Exception('SSOCookies: the service does not support CERN SSO') + + self.curl.setopt(self.curl.URL, effective_url) + + response, effective_url = self._request() + + if self.auth_error in response: + raise Exception('SSOCookies: authentication error') + + result = re.search('form .+?action="([^"]+)"', response) + service_provider_url = result.groups()[0] + form_params = re.findall('input type="hidden" name="([^"]+)" value="([^"]+)"', response) + form_params = [(item[0], html.unescape(item[1])) for item in form_params] + + self.curl.setopt(self.curl.URL, service_provider_url) + self.curl.setopt(self.curl.POSTFIELDS, urllib.parse.urlencode(form_params)) + self.curl.setopt(self.curl.POST, 1) + + self._request() + + self.cookie_list = self.curl.getinfo(self.curl.INFO_COOKIELIST) + + def _request(self): + response = io.BytesIO() + self.curl.setopt(self.curl.WRITEFUNCTION, response.write) + self.curl.perform() + response = response.getvalue().decode(self.encoding) + effective_url = self.curl.getinfo(self.curl.EFFECTIVE_URL) + return response, effective_url + + def get(self): + cookies = {} + for item in self.cookie_list: + name = item.split('\t')[5] + value = item.split('\t')[6] + cookies.update({name: value}) + return cookies + + def extract_username(self): + try: + cookies = self.get() + return base64.b64decode(cookies['FedAuth']).split(',')[1].split('\\')[-1] + except Exception: + return getpass.getuser() diff --git a/atlas/jedi/__init__.py b/atlas/jedi/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/atlas/jedi/client.py b/atlas/jedi/client.py new file mode 100644 index 00000000..b00e6bbf --- /dev/null +++ b/atlas/jedi/client.py @@ -0,0 +1,693 @@ +import json +import logging +import gzip +import pickle +from copy import deepcopy +from abc import ABC, abstractmethod +import requests +from urllib.parse import urlencode +from ..settings import jediclient as jedi_settings + +_logger = logging.getLogger('prodtaskwebui') + + + +class JEDITaskActionInterface(ABC): + + @abstractmethod + def changeTaskPriority(self, jediTaskID, newPriority): + pass + + + @abstractmethod + def killTask(self, jediTaskID): + pass + + @abstractmethod + def finishTask(self, jediTaskID, soft): + pass + + @abstractmethod + def changeTaskRamCount(self, jediTaskID, ramCount): + pass + + @abstractmethod + def reassignTaskToSite(self, jediTaskID, site, mode): + pass + + @abstractmethod + def reassignTaskToCloud(self, jediTaskID, cloud, mode): + pass + + @abstractmethod + def reassignTaskToNucleus(self, jediTaskID, nucleus, mode): + pass + + @abstractmethod + def changeTaskWalltime(self, jediTaskID, wallTime): + pass + + @abstractmethod + def changeTaskCputime(self, jediTaskID, cpuTime): + pass + + @abstractmethod + def changeTaskSplitRule(self, jediTaskID, ruleName, ruleValue): + pass + + @abstractmethod + def changeTaskAttribute(self, jediTaskID, attrName, attrValue): + pass + + @abstractmethod + def retryTask(self, jediTaskID, verbose, noChildRetry, discardEvents, disable_staging_mode): + pass + + @abstractmethod + def reloadInput(self, jediTaskID, verbose): + pass + + @abstractmethod + def pauseTask(self, jediTaskID, verbose): + pass + + @abstractmethod + def resumeTask(self, jediTaskID, verbose): + pass + + @abstractmethod + def reassignShare(self, jedi_task_ids, share, reassign_running): + pass + + @abstractmethod + def triggerTaskBrokerage(self, jediTaskID): + pass + + @abstractmethod + def avalancheTask(self, jediTaskID, verbose): + pass + + @abstractmethod + def increaseAttemptNr(self, jediTaskID, increase): + pass + + @abstractmethod + def killUnfinishedJobs(self, jediTaskID, code, verbose, srvID, useMailAsID): + pass + + +class JEDIJobsActionInterface(ABC): + + @abstractmethod + def killJobs(self, ids, code, verbose, srvID, useMailAsID, keepUnmerged, jobSubStatus): + pass + + @abstractmethod + def reassignJobs(self, ids, forPending, firstSubmission): + pass + + +EC_Failed = 255 + +class JEDIClient(JEDITaskActionInterface, JEDIJobsActionInterface): + def __init__(self, base_url=jedi_settings.JEDI_BASE_URL, cert=jedi_settings.CERTIFICATE ): + """Initializes new instance of JEDI class + + :param cert: path to certificate or to proxy + :param base_url: JEDI REST API base url + """ + + self.cert = cert + self._base_url = base_url + #self._headers = {'Content-Type': 'application/json', 'Accept': 'application/json'} + self._headers = {'Content-Type': 'application/json'} + + def _form_url(self, command): + return self._base_url + '/' + command + + + def _post_command(self, command, data, convert_boolean=True): + url = self._form_url(command) + if convert_boolean: + data = self._convert_boolean_to_string(data) + response = requests.get(url, cert=self.cert, data=gzip.compress(json.dumps(data).encode('utf-8')), + headers=self._headers, verify='/etc/ssl/certs/CERN-bundle.pem') + if response.status_code != requests.codes.ok: + response.raise_for_status() + return self._jedi_output_distillation(response.content) + + # change task priority + def changeTaskPriority(self, jediTaskID, newPriority): + """Change task priority + args: + jediTaskID: jediTaskID of the task to change the priority + newPriority: new task priority + returns: + status code + 0: communication succeeded to the panda server + 255: communication failure + return code + 0: unknown task + 1: succeeded + None: database error + """ + + data = {'jediTaskID': jediTaskID, + 'newPriority': newPriority} + return self._post_command('changeTaskPriority', data) + + + # kill task + def killTask(self, jediTaskID): + """Kill a task + args: + jediTaskID: jediTaskID of the task to be killed + returns: + status code + 0: communication succeeded to the panda server + 255: communication failure + tuple of return code and diagnostic message + 0: request is registered + 1: server error + 2: task not found + 3: permission denied + 4: irrelevant task status + 100: non SSL connection + 101: irrelevant taskID + """ + + data = {'properErrorCode': True, 'jediTaskID': jediTaskID} + return self._post_command('killTask',data) + + + + + # finish task + def finishTask(self, jediTaskID, soft=False): + """Finish a task + args: + jediTaskID: jediTaskID of the task to be finished + soft: If True, new jobs are not generated and the task is + finihsed once all remaining jobs are done. + If False, all remaining jobs are killed and then the + task is finished + returns: + status code + 0: communication succeeded to the panda server + 255: communication failure + tuple of return code and diagnostic message + 0: request is registered + 1: server error + 2: task not found + 3: permission denied + 4: irrelevant task status + 100: non SSL connection + 101: irrelevant taskID + """ + + data = {'properErrorCode': True, 'jediTaskID': jediTaskID} + if soft: + data['soft'] = True + return self._post_command('finishTask',data) + + def changeTaskRamCount(self, jediTaskID, ramCount): + """Change task priority + args: + jediTaskID: jediTaskID of the task to change the priority + ramCount: new ramCount for the task + returns: + status code + 0: communication succeeded to the panda server + 255: communication failure + return code + 0: unknown task + 1: succeeded + None: database error + """ + data = {'jediTaskID': jediTaskID, + 'attrName': 'ramCount', + 'attrValue': ramCount} + return self._post_command('changeTaskAttributePanda',data) + + # reassign task to a site + def reassignTaskToSite(self, jediTaskID, site, mode=None): + """Reassign a task to a site. Existing jobs are killed and new jobs are generated at the site + args: + jediTaskID: jediTaskID of the task to be reassigned + site: the site name where the task is reassigned + mode: If soft, only defined/waiting/assigned/activated jobs are killed. If nokill, no jobs are killed. All jobs are killed by default. + returns: + status code + 0: communication succeeded to the panda server + 255: communication failure + tuple of return code and diagnostic message + 0: request is registered + 1: server error + 2: task not found + 3: permission denied + 4: irrelevant task status + 100: non SSL connection + 101: irrelevant taskID + """ + maxSite = 60 + if site is not None and len(site) > maxSite: + return EC_Failed, f'site parameter is too long > {maxSite}chars' + data = {'jediTaskID': jediTaskID, 'site': site} + if mode is not None: + data['mode'] = mode + return self._post_command('reassignTask', data) + + # reassign task to a cloud + def reassignTaskToCloud(self, jediTaskID, cloud, mode=None): + """Reassign a task to a cloud. Existing jobs are killed and new jobs are generated in the cloud + args: + jediTaskID: jediTaskID of the task to be reassigned + cloud: the cloud name where the task is reassigned + mode: If soft, only defined/waiting/assigned/activated jobs are killed. If nokill, no jobs are killed. All jobs are killed by default. + returns: + status code + 0: communication succeeded to the panda server + 255: communication failure + tuple of return code and diagnostic message + 0: request is registered + 1: server error + 2: task not found + 3: permission denied + 4: irrelevant task status + 100: non SSL connection + 101: irrelevant taskID + """ + + # execute + data = {'jediTaskID': jediTaskID, 'cloud': cloud} + if mode is not None: + data['mode'] = mode + return self._post_command('reassignTask',data) + + # reassign task to a nucleus + def reassignTaskToNucleus(self, jediTaskID, nucleus, mode=None): + """Reassign a task to a nucleus. Existing jobs are killed and new jobs are generated in the cloud + args: + jediTaskID: jediTaskID of the task to be reassigned + nucleus: the nucleus name where the task is reassigned + mode: If soft, only defined/waiting/assigned/activated jobs are killed. If nokill, no jobs are killed. All jobs are killed by default. + returns: + status code + 0: communication succeeded to the panda server + 255: communication failure + tuple of return code and diagnostic message + 0: request is registered + 1: server error + 2: task not found + 3: permission denied + 4: irrelevant task status + 100: non SSL connection + 101: irrelevant taskID + """ + data = {'jediTaskID': jediTaskID, 'nucleus': nucleus} + if mode is not None: + data['mode'] = mode + return self._post_command('reassignTask',data) + + # reassign jobs + def reassignJobs(self, ids, forPending=False, firstSubmission=None): + """Triggers reassignment of jobs. This is not effective if jobs were preassigned to sites before being submitted. + args: + ids: the list of taskIDs + forPending: set True if pending jobs are reassigned + firstSubmission: set True if first jobs are submitted for a task, or False if not + returns: + status code + 0: communication succeeded to the panda server + 255: communication failure + return code + True: request is processed + False: not processed + """ + # serialize + strIDs = pickle.dumps(ids, protocol=0).decode('utf-8') + data = {'ids': strIDs} + if forPending: + data['forPending'] = True + if firstSubmission is not None: + if firstSubmission: + data['firstSubmission'] = True + else: + data['firstSubmission'] = False + return self._post_command('reassignJobs',data) + + # change task walltime + def changeTaskWalltime(self, jediTaskID, wallTime): + """Change task priority + args: + jediTaskID: jediTaskID of the task to change the priority + wallTime: new walltime for the task + returns: + status code + 0: communication succeeded to the panda server + 255: communication failure + return code + 0: unknown task + 1: succeeded + None: database error + """ + # instantiate curl + data = {'jediTaskID': jediTaskID, + 'attrName': 'wallTime', + 'attrValue': wallTime} + return self._post_command('changeTaskAttributePanda',data) + + # change task cputime + def changeTaskCputime(self, jediTaskID, cpuTime): + """Change task cpuTime + args: + jediTaskID: jediTaskID of the task to change the priority + cpuTime: new cputime for the task + returns: + status code + 0: communication succeeded to the panda server + 255: communication failure + return code + 0: unknown task + 1: succeeded + None: database error + """ + # instantiate curl + data = {'jediTaskID': jediTaskID, + 'attrName': 'cpuTime', + 'attrValue': cpuTime} + return self._post_command('changeTaskAttributePanda',data) + + # change split rule for task + def changeTaskSplitRule(self, jediTaskID, ruleName, ruleValue): + """Change split rule fo task + args: + jediTaskID: jediTaskID of the task to change the rule + ruleName: rule name + ruleValue: new value for the rule + returns: + status code + 0: communication succeeded to the panda server + 255: communication failure + return: a tupple of return code and message + 0: unknown task + 1: succeeded + 2: disallowed to update the attribute + None: database error + """ + # instantiate curl + data = {'jediTaskID': jediTaskID, + 'attrName': ruleName, + 'attrValue': ruleValue} + return self._post_command('changeTaskSplitRulePanda',data) + + # change task attribute + def changeTaskAttribute(self, jediTaskID, attrName, attrValue): + """Change task attribute + args: + jediTaskID: jediTaskID of the task to change the attribute + attrName: attribute name + attrValue: new value for the attribute + returns: + status code + 0: communication succeeded to the panda server + 255: communication failure + return: a tupple of return code and message + 0: unknown task + 1: succeeded + 2: disallowed to update the attribute + None: database error + """ + data = {'jediTaskID': jediTaskID, + 'attrName': attrName, + 'attrValue': attrValue} + return self._post_command('changeTaskAttributePanda',data) + + + def retryTask(self, jediTaskID, verbose=False, noChildRetry=False, discardEvents=False, disable_staging_mode=False): + """Retry task + args: + jediTaskID: jediTaskID of the task to retry + noChildRetry: True not to retry child tasks + discardEvents: discard events + disable_staging_mode: disable staging mode + returns: + status code + 0: communication succeeded to the panda server + 255: communication failure + tuple of return code and diagnostic message + 0: request is registered + 1: server error + 2: task not found + 3: permission denied + 4: irrelevant task status + 100: non SSL connection + 101: irrelevant taskID + """ + # instantiate curl + data = {'jediTaskID': jediTaskID, 'properErrorCode': True} + if noChildRetry: + data['noChildRetry'] = True + if discardEvents: + data['discardEvents'] = True + if disable_staging_mode: + data['disable_staging_mode'] = True + return self._post_command('retryTask',data) + + # reload input + def reloadInput(self, jediTaskID, verbose=False): + """Retry task + args: + jediTaskID: jediTaskID of the task to retry + returns: + status code + 0: communication succeeded to the panda server + 255: communication failure + tuple of return code and diagnostic message + 0: request is registered + 1: server error + 2: task not found + 3: permission denied + 4: irrelevant task status + 100: non SSL connection + 101: irrelevant taskID + """ + data = {'jediTaskID': jediTaskID} + return self._post_command('reloadInput',data) + + # pause task + def pauseTask(self, jediTaskID, verbose=False): + """Pause task + args: + jediTaskID: jediTaskID of the task to pause + returns: + status code + 0: communication succeeded to the panda server + 255: communication failure + tuple of return code and diagnostic message + 0: request is registered + 1: server error + 2: task not found + 3: permission denied + 4: irrelevant task status + 100: non SSL connection + 101: irrelevant taskID + """ + data = {'jediTaskID': jediTaskID} + return self._post_command('pauseTask',data) + + + def resumeTask(self, jediTaskID, verbose=False): + """Resume task + args: + jediTaskID: jediTaskID of the task to release + returns: + status code + 0: communication succeeded to the panda server + 255: communication failure + tuple of return code and diagnostic message + 0: request is registered + 1: server error + 2: task not found + 3: permission denied + 4: irrelevant task status + 100: non SSL connection + 101: irrelevant taskID + """ + + data = {'jediTaskID': jediTaskID} + return self._post_command('resumeTask',data) + + def reassignShare(self, jedi_task_ids, share, reassign_running=False): + """ + args: + jedi_task_ids: task ids to act on + share: share to be applied to jeditaskids + returns: + status code + 0: communication succeeded to the panda server + 255: communication failure + return: a tuple of return code and message + 1: logical error + 0: success + None: database error + """ + jedi_task_ids_pickle = pickle.dumps(jedi_task_ids, protocol=0).decode('utf-8') + data = {'jedi_task_ids_pickle': jedi_task_ids_pickle, + 'share': share, + 'reassign_running': reassign_running} + return self._post_command('reassignShare',data, False) + + def triggerTaskBrokerage(self, jediTaskID): + """Trigger task brokerge + args: + jediTaskID: jediTaskID of the task to change the attribute + returns: + status code + 0: communication succeeded to the panda server + 255: communication failure + return: a tupple of return code and message + 0: unknown task + 1: succeeded + None: database error + """ + data = {'jediTaskID': jediTaskID, + 'diffValue': -12} + return self._post_command('changeTaskModTimePanda',data) + + + def avalancheTask(self, jediTaskID, verbose=False): + """force avalanche for task + args: + jediTaskID: jediTaskID of the task to avalanche + returns: + status code + 0: communication succeeded to the panda server + 255: communication failure + tuple of return code and diagnostic message + 0: request is registered + 1: server error + 2: task not found + 3: permission denied + 4: irrelevant task status + 100: non SSL connection + 101: irrelevant taskID + """ + data = {'jediTaskID': jediTaskID} + return self._post_command('avalancheTask',data) + + def increaseAttemptNr(self, jediTaskID, increase): + """Change task priority + args: + jediTaskID: jediTaskID of the task to increase attempt numbers + increase: increase for attempt numbers + returns: + status code + 0: communication succeeded to the panda server + 255: communication failure + return code + 0: succeeded + 1: unknown task + 2: invalid task status + 3: permission denied + 4: wrong parameter + None: database error + """ + data = {'jediTaskID': jediTaskID, + 'increasedNr': increase} + return self._post_command('increaseAttemptNrPanda',data) + + def killUnfinishedJobs(self, jediTaskID, code=None, verbose=False, srvID=None, useMailAsID=False): + """Kill unfinished jobs in a task. Normal users can kill only their own jobs. + People with production VOMS role can kill any jobs. + Running jobs are killed when next heartbeat comes from the pilot. + Set code=9 if running jobs need to be killed immediately. + args: + jediTaskID: the taskID of the task + code: specify why the jobs are killed + 2: expire + 3: aborted + 4: expire in waiting + 7: retry by server + 8: rebrokerage + 9: force kill + 50: kill by JEDI + 91: kill user jobs with prod role + verbose: set True to see what's going on + srvID: obsolete + useMailAsID: obsolete + returns: + status code + 0: communication succeeded to the panda server + 255: communication failure + the list of clouds (or Nones if tasks are not yet assigned) + """ + + data = {'jediTaskID': jediTaskID, 'code': code} + return self._post_command('killUnfinishedJobs',data) + + def killJobs(self, ids, code=None, verbose=False, srvID=None, useMailAsID=False, keepUnmerged=False, jobSubStatus=None): + """Kill jobs. Normal users can kill only their own jobs. + People with production VOMS role can kill any jobs. + Running jobs are killed when next heartbeat comes from the pilot. + Set code=9 if running jobs need to be killed immediately. + args: + ids: the list of PandaIDs + code: specify why the jobs are killed + 2: expire + 3: aborted + 4: expire in waiting + 7: retry by server + 8: rebrokerage + 9: force kill + 10: fast rebrokerage on overloaded PQs + 50: kill by JEDI + 91: kill user jobs with prod role + verbose: set True to see what's going on + srvID: obsolete + useMailAsID: obsolete + keepUnmerged: set True not to cancel unmerged jobs when pmerge is killed. + jobSubStatus: set job sub status if any + returns: + status code + 0: communication succeeded to the panda server + 255: communication failure + the list of clouds (or Nones if tasks are not yet assigned) + """ + # serialize + strIDs = pickle.dumps(ids, protocol=0).decode('utf-8') + + data = {'ids': strIDs, 'code': code, 'useMailAsID': useMailAsID} + killOpts = '' + if keepUnmerged: + killOpts += 'keepUnmerged,' + if jobSubStatus is not None: + killOpts += 'jobSubStatus={0},'.format(jobSubStatus) + data['killOpts'] = killOpts[:-1] + return self._post_command('killJobs',data) + + @staticmethod + def _jedi_output_distillation(jedi_respond_raw): + jedi_respond = jedi_respond_raw + if type(jedi_respond_raw) is bytes: + jedi_respond = pickle.loads(jedi_respond_raw) + return_code = -1 + return_info = '' + if type(jedi_respond) is int: + return_code = jedi_respond + elif type(jedi_respond) is tuple: + return_code = jedi_respond[0] + return_info = jedi_respond[1] + return return_code, return_info + + @staticmethod + def _convert_boolean_to_string(data): + converted_data = deepcopy(data) + if type(converted_data) is dict: + for key in converted_data: + if type(converted_data[key]) is bool: + if converted_data[key]: + converted_data[key] = 'True' + else: + converted_data[key] = 'False' + return converted_data \ No newline at end of file diff --git a/atlas/prodtask/models.py b/atlas/prodtask/models.py index 3ed7974a..f366bd3e 100644 --- a/atlas/prodtask/models.py +++ b/atlas/prodtask/models.py @@ -1,5 +1,6 @@ import json from datetime import timedelta +from enum import Enum, auto from django.core.exceptions import ObjectDoesNotExist from django.db import models @@ -724,6 +725,28 @@ class Meta: +class TaskTemplate(models.Model): + id = models.DecimalField(decimal_places=0, max_digits=12, db_column='TASK_TEMPLATE_ID', primary_key=True) + step = models.ForeignKey(StepExecution, db_column='STEP_ID', on_delete=CASCADE) + request = models.ForeignKey(TRequest, db_column='PR_ID', on_delete=CASCADE) + parent_id = models.DecimalField(decimal_places=0, max_digits=12, db_column='PARENT_TID') + name = models.CharField(max_length=130, db_column='TASK_NAME') + timestamp = models.DateTimeField(db_column='TIMESTAMP') + template_type = models.CharField(max_length=128, db_column='TEMPLATE_TYPE', null=True) + task_template = models.JSONField(db_column='TEMPLATE') + task_error = models.CharField(max_length=4000, db_column='TRASK_ERROR', null=True) + build = models.CharField(max_length=200, db_column='TAG', null=True) + + + def save(self, *args, **kwargs): + self.timestamp = timezone.now() + super(TaskTemplate, self).save(*args, **kwargs) + + class Meta: + app_label = 'dev' + db_table = "T_TASK_TEMPLATE" + + class TTask(models.Model): id = models.DecimalField(decimal_places=0, max_digits=12, db_column='TASKID', primary_key=True) status = models.CharField(max_length=12, db_column='STATUS', null=True) @@ -787,13 +810,42 @@ class Meta: + class ProductionTask(models.Model): - STATUS_ORDER = ['total', 'waiting','staging','registered', 'assigning', 'submitting', 'ready', 'running', - 'paused', 'exhausted', 'done', 'finished', 'toretry', 'toabort', 'failed', 'broken', 'aborted', - 'obsolete'] - SYNC_STATUS = ['running','registered','paused','assigning','toabort','toretry','submitting','ready','exhausted','waiting', 'staging'] - RED_STATUS = ['failed','aborted','broken'] - NOT_RUNNING = RED_STATUS + ['finished','done','obsolete'] + + + class STATUS: + WAITING = 'waiting' + STAGING = 'staging' + REGISTERED = 'registered' + ASSIGNING = 'assigning' + SUBMITTING = 'submitting' + READY = 'ready' + RUNNING = 'running' + PAUSED = 'paused' + EXHAUSTED = 'exhausted' + DONE = 'done' + FINISHED = 'finished' + TORETRY = 'toretry' + TOABORT = 'toabort' + FAILED = 'failed' + BROKEN = 'broken' + ABORTED = 'aborted' + OBSOLETE = 'obsolete' + + + STATUS_ORDER = ['total', STATUS.WAITING, STATUS.STAGING, STATUS.REGISTERED, STATUS.ASSIGNING, STATUS.SUBMITTING, + STATUS.READY, STATUS.RUNNING, STATUS.PAUSED, STATUS.EXHAUSTED, STATUS.DONE, STATUS.FINISHED, + STATUS.TORETRY, STATUS.TOABORT, STATUS.FAILED, STATUS.BROKEN, STATUS.ABORTED, STATUS.OBSOLETE] + SYNC_STATUS = [STATUS.RUNNING, STATUS.REGISTERED, STATUS.PAUSED, STATUS.ASSIGNING, STATUS.TOABORT, STATUS.TORETRY, + STATUS.SUBMITTING, STATUS.READY, STATUS.EXHAUSTED, STATUS.WAITING, STATUS.STAGING] + RED_STATUS = [STATUS.FAILED, STATUS.ABORTED, STATUS.BROKEN] + NOT_RUNNING = RED_STATUS + [STATUS.FINISHED, STATUS.DONE, STATUS.OBSOLETE] + OBSOLETE_READY_STATUS = [STATUS.FINISHED, STATUS.DONE] + + + + id = models.DecimalField(decimal_places=0, max_digits=12, db_column='TASKID', primary_key=True) step = models.ForeignKey(StepExecution, db_column='STEP_ID', on_delete=CASCADE) request = models.ForeignKey(TRequest, db_column='PR_ID', on_delete=CASCADE) @@ -847,7 +899,7 @@ class ProductionTask(models.Model): def failure_rate(self): try: #rate = round(self.total_files_failed/self.total_files_tobeused*100,3); - rate = self.total_files_failed/self.total_files_tobeused*100; + rate = self.total_files_failed/self.total_files_tobeused*100 if rate == 0 or rate>=1: rate = int(rate) elif rate < .001: @@ -1086,6 +1138,15 @@ def remove_hashtag_from_task(task_id, hashtag): class StepAction(models.Model): + class STATUS: + ACTIVE = 'active' + FAILED = 'failed' + DONE = 'done' + EXECUTING = 'executing' + CANCELED = 'canceled' + + ACTIVE_STATUS = [STATUS.ACTIVE, STATUS.EXECUTING] + id = models.DecimalField(decimal_places=0, max_digits=12, db_column='STEP_ACTION_ID', primary_key=True) request = models.ForeignKey(TRequest, db_column='PR_ID', on_delete=CASCADE) step = models.DecimalField(decimal_places=0, max_digits=12, db_column='STEP_ID') @@ -1139,9 +1200,10 @@ class DatasetStaging(models.Model): ACTIVE_STATUS = ['queued','staging'] - class STATUS(): + class STATUS: QUEUED = 'queued' STAGING = 'staging' + CANCELED = 'canceled' id = models.DecimalField(decimal_places=0, max_digits=12, db_column='DATASET_STAGING_ID', primary_key=True) dataset = models.CharField(max_length=255, db_column='DATASET', null=True) diff --git a/atlas/prodtask/task_actions.py b/atlas/prodtask/task_actions.py index 514f202f..99c4ca12 100644 --- a/atlas/prodtask/task_actions.py +++ b/atlas/prodtask/task_actions.py @@ -1,4 +1,5 @@ import json +from functools import partial from django.utils import timezone from django.core.exceptions import ObjectDoesNotExist @@ -12,6 +13,7 @@ from atlas.prodtask.views import task_clone_with_skip_used from atlas.prodtask.ddm_api import DDM +from atlas.task_action.task_management import TaskActionExecutor _deft_client = deft.Client(auth_user=settings.DEFT_AUTH_USER, auth_key=settings.DEFT_AUTH_KEY,base_url=settings.BASE_DEFT_API_URL) @@ -89,6 +91,39 @@ def create_disable_idds_action(owner, task_id): return {'exception':'No staging rule is found'} +def do_new_action(owner, task_id, action, *args): + action_executor = TaskActionExecutor(owner) + action_translation = { + 'abort': action_executor.killTask, + 'finish': action_executor.finishTask, + 'change_priority': action_executor.changeTaskPriority, + 'reassign_to_site': action_executor.reassignTaskToSite, + 'reassign_to_cloud': action_executor.reassignTaskToCloud, + 'reassign_to_nucleus': action_executor.reassignTaskToNucleus, + 'reassign_to_share': action_executor.reassignShare, + 'retry': action_executor.retryTask, + 'change_ram_count': action_executor.changeTaskRamCount, + 'change_wall_time': action_executor.changeTaskWalltime, + 'change_cpu_time': action_executor.changeTaskCputime, + 'increase_attempt_number': action_executor.increaseAttemptNr, + 'abort_unfinished_jobs': action_executor.killUnfinishedJobs, + 'delete_output': action_executor.clean_task_carriages, + 'kill_job': action_executor.kill_jobs_in_task, + 'obsolete': action_executor.obsolete_task, + 'change_core_count': partial(action_executor.changeTaskAttribute, attrName='coreCount'), + 'change_split_rule': action_executor.changeTaskSplitRule, + 'pause_task': action_executor.pauseTask, + 'resume_task': action_executor.resumeTask, + 'trigger_task': action_executor.triggerTaskBrokerage, + 'avalanche_task': action_executor.avalancheTask, + 'reload_input': action_executor.reloadInput, + 'retry_new': action_executor.retry_new, + 'set_hashtag': action_executor.set_hashtag, + 'remove_hashtag': action_executor.remove_hashtag, + 'sync_jedi': action_executor.sync_jedi, + 'disable_idds': action_executor.create_disable_idds_action + } + return action_translation[action](task_id, *args) def do_action(owner, task_id, action, *args): diff --git a/atlas/prodtask/tasks.py b/atlas/prodtask/tasks.py index 02813444..325e1dfa 100644 --- a/atlas/prodtask/tasks.py +++ b/atlas/prodtask/tasks.py @@ -11,6 +11,7 @@ from atlas.prodtask.hashtag import hashtag_request_to_tasks from atlas.prodtask.mcevgen import sync_cvmfs_db from atlas.prodtask.open_ended import check_open_ended +from atlas.prodtask.task_actions import do_new_action from atlas.prodtask.task_views import sync_old_tasks, check_merge_container from functools import wraps @@ -68,6 +69,10 @@ def cric_profile_sync(): sync_cric_deft() return None +@app.task +def async_tasks_action(username, task_ids, action, *args): + result = [do_new_action(username, x, action, *args) for x in task_ids] + return result @app.task(ignore_result=True) def find_DC_existsed_replica_tasks(): diff --git a/atlas/settings/jediclient.py b/atlas/settings/jediclient.py new file mode 100644 index 00000000..bc9c320f --- /dev/null +++ b/atlas/settings/jediclient.py @@ -0,0 +1,4 @@ +from .local import CERTIFICATE_PATH + +JEDI_BASE_URL = 'https://pandaserver.cern.ch:25443/server/panda' +CERTIFICATE = CERTIFICATE_PATH \ No newline at end of file diff --git a/atlas/settings/jiraclient.py b/atlas/settings/jiraclient.py new file mode 100644 index 00000000..2eb51d99 --- /dev/null +++ b/atlas/settings/jiraclient.py @@ -0,0 +1,54 @@ +from .local import CERT_PEM_PATH, CERT_KEY_PEM_PATH + +JIRA_CONFIG = { + 'auth_url': 'https://its.cern.ch/jira/loginCern.jsp', + 'issue_url': 'https://its.cern.ch/jira/rest/api/2/issue/', + 'cert': CERT_PEM_PATH, + 'cert_key': CERT_KEY_PEM_PATH, + 'verify_ssl_certificates': False, + 'issue_template': { + 'fields': { + 'project': { + 'key': 'ATLPSTASKS' + }, + 'issuetype': { + 'name': 'Information Request' + }, + 'summary': "%s", + 'description': "%s" + } + }, + 'sub_issue_template': { + 'fields': { + 'project': { + 'key': 'ATLPSTASKS' + }, + 'issuetype': { + 'name': 'Sub-task' + }, + 'summary': "%s", + 'description': "%s", + 'parent': { + 'key': "%s" + } + } + }, + 'issue_comment_template': { + 'body': "%s" + }, + 'issue_close_template': { + 'update': { + 'comment': [ + {'add': {'body': "%s"}} + ] + }, + 'fields': { + 'resolution': { + 'name': 'None' + } + }, + 'transition': { + 'id': '2' + }, + } +} \ No newline at end of file diff --git a/atlas/task_action/__init__.py b/atlas/task_action/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/atlas/task_action/task_management.py b/atlas/task_action/task_management.py new file mode 100644 index 00000000..f32ca928 --- /dev/null +++ b/atlas/task_action/task_management.py @@ -0,0 +1,300 @@ +import json +import re +from abc import ABC, abstractmethod + +from atlas.JIRA.client import JIRAClient +from atlas.jedi.client import JEDIClient, JEDITaskActionInterface +import logging +from django.utils import timezone + +from atlas.prodtask.ddm_api import DDM +from atlas.prodtask.hashtag import add_or_get_request_hashtag +from atlas.prodtask.models import ProductionTask, TRequest, ActionStaging, StepAction +from atlas.prodtask.task_views import sync_deft_jedi_task + +logger = logging.getLogger('prodtaskwebui') +_jsonLogger = logging.getLogger('prodtask_ELK') +from dataclasses import dataclass + + +class DEFTAction(ABC): + + @abstractmethod + def create_disable_idds_action(self, task_id): + pass + + # @abstractmethod + # def increase_task_priority(self, task_id, delta): + # pass + # + # @abstractmethod + # def decrease_task_priority(self, task_id, delta): + # pass + + @abstractmethod + def obsolete_task(self, task_id): + pass + + @abstractmethod + def sync_jedi(self, task_id): + pass + + @abstractmethod + def set_hashtag(self, task_id, hashtag_name): + pass + + @abstractmethod + def remove_hashtag(self, task_id, hashtag_name): + pass + + @abstractmethod + def retry_new(self, task_id): + pass + + @abstractmethod + def clean_task_carriages(self, task_id, output_formats): + pass + + @abstractmethod + def kill_jobs_in_task(self, task_id, jobs_id_str, code, keepUnmerged): + pass + + +@dataclass +class TaskActionExecutor(JEDITaskActionInterface, DEFTAction): + + username: str + comment: str + + def __init__(self, username, comment=''): + self.jedi_client = JEDIClient() + self.username = username + self.jira_client = None + self.comment = comment + + def _log_production_task_action_message(self, production_request_id, task_id, action, return_code, return_message, *args): + _jsonLogger.info("Production task action", + extra={'task': str(task_id), 'prod_request': production_request_id,'user': self.username, 'action': action, 'params': json.dumps(args), + 'return_code': return_code,'return_message': return_message ,'comment': self.comment}) + # print({'task': str(task_id), 'prod_request': production_request_id,'user': self.username, 'action': action, 'params': json.dumps(args), + # 'return_code': return_code,'return_message': return_message}) + + def _log_analysis_task_action_message(self, task_id, action, return_code, return_message, *args): + _jsonLogger.info("Analysis task action", + extra={'task': str(task_id), 'user': self.username, 'comment': self.comment, + 'action': action, 'params': json.dumps(args), + 'return_code': return_code, 'return_message': return_message}) + # print({'task': str(task_id), 'user': self.username, + # 'action': action, 'params': json.dumps(args), + # 'return_code': return_code, 'return_message': return_message}) + + def _log_action_message(self, task_id, action, return_code, return_message, *args): + try: + + if ProductionTask.objects.filter(id=task_id).exists(): + task = ProductionTask.objects.get(id=task_id) + if task.request_id > 300: + production_request = task.request + # if not production_request.info_field('task_jira_es'): + # if not self.jira_client: + # self.jira_client = JIRAClient() + # production_request.set_info_field('task_jira_es', True) + # production_request.save() + self._log_production_task_action_message(production_request.reqid, task_id, action, return_code, return_message, *args) + return + self._log_analysis_task_action_message(task_id, action, return_code, return_message, *args) + except Exception as ex: + logger.error(f"Action logging problem: {ex}") + print(f"Action logging problem: {ex}") + + + def _jedi_decorator(func): + def inner(self, task_id, *args, **kwargs): + try: + result = func(self, task_id, *args, **kwargs) + logger.info(f"JEDI action {task_id} {func.__name__} with parameters {args} from {self.username} result {result}") + return_code, return_message = result + if type(return_code) is int and return_code == 0: + return_code = True + self._log_action_message(task_id, func.__name__, bool(return_code), return_message, *args) + return bool(return_code), return_message + except Exception as ex: + self._log_action_message(task_id, func.__name__, False, str(ex), *args) + return False, str(ex) + return inner + + def _action_logger(func): + def inner(self, task_id, *args, **kwargs): + try: + return_code, return_message = func(self, task_id, *args, **kwargs) + self._log_action_message(task_id, func.__name__, return_code, return_message, *args) + return return_code, return_message + except Exception as ex: + self._log_action_message(task_id, func.__name__, False, str(ex), *args) + return False, str(ex) + return inner + + _jedi_decorator = staticmethod(_jedi_decorator) + _action_logger = staticmethod(_action_logger) + + @_jedi_decorator + def changeTaskPriority(self, jediTaskID, newPriority): + return self.jedi_client.changeTaskPriority(jediTaskID, newPriority) + + @_jedi_decorator + def killTask(self, jediTaskID): + return self.jedi_client.killTask(jediTaskID) + + @_jedi_decorator + def finishTask(self, jediTaskID, soft=False): + return self.jedi_client.finishTask(jediTaskID, soft) + + @_jedi_decorator + def changeTaskRamCount(self, jediTaskID, ramCount): + return self.jedi_client.changeTaskRamCount(jediTaskID, ramCount) + + @_jedi_decorator + def reassignTaskToSite(self, jediTaskID, site, mode=None): + return self.jedi_client.reassignTaskToSite(jediTaskID, site, mode) + + @_jedi_decorator + def reassignTaskToCloud(self, jediTaskID, cloud, mode=None): + return self.jedi_client.reassignTaskToCloud(jediTaskID, cloud, mode) + + @_jedi_decorator + def reassignTaskToNucleus(self, jediTaskID, nucleus, mode=None): + return self.jedi_client.reassignTaskToNucleus( jediTaskID, nucleus, mode) + + + @_jedi_decorator + def changeTaskWalltime(self, jediTaskID, wallTime): + return self.jedi_client.changeTaskWalltime(jediTaskID, wallTime) + + @_jedi_decorator + def changeTaskCputime(self, jediTaskID, cpuTime): + return self.jedi_client.changeTaskCputime(jediTaskID, cpuTime) + + @_jedi_decorator + def changeTaskSplitRule(self, jediTaskID, ruleName, ruleValue): + return self.jedi_client.changeTaskSplitRule(jediTaskID, ruleName, ruleValue) + + @_jedi_decorator + def changeTaskAttribute(self, jediTaskID, attrName, attrValue): + return self.jedi_client.changeTaskAttribute(jediTaskID, attrName, attrValue) + + @_jedi_decorator + def retryTask(self, jediTaskID, verbose=False, noChildRetry=False, discardEvents=False, disable_staging_mode=False): + return self.jedi_client.retryTask(jediTaskID, verbose, noChildRetry, discardEvents, disable_staging_mode) + + @_jedi_decorator + def reloadInput(self, jediTaskID, verbose=False): + return self.jedi_client.reloadInput(jediTaskID, verbose) + + @_jedi_decorator + def pauseTask(self, jediTaskID, verbose=False): + return self.jedi_client.pauseTask(jediTaskID, verbose) + + @_jedi_decorator + def resumeTask(self, jediTaskID, verbose=False): + return self.jedi_client.resumeTask(jediTaskID, verbose) + + @_jedi_decorator + def reassignShare(self, jediTaskID, share, reassign_running=False): + return self.jedi_client.reassignShare([jediTaskID, ], share, reassign_running) + + @_jedi_decorator + def triggerTaskBrokerage(self, jediTaskID): + return self.jedi_client.triggerTaskBrokerage(jediTaskID) + + @_jedi_decorator + def avalancheTask(self, jediTaskID, verbose=False): + return self.jedi_client.avalancheTask(jediTaskID, verbose) + + @_jedi_decorator + def increaseAttemptNr(self, jediTaskID, increase): + return self.jedi_client.increaseAttemptNr(jediTaskID, increase) + + @_jedi_decorator + def killUnfinishedJobs(self, jediTaskID, code=None, verbose=False, srvID=None, useMailAsID=False): + return self.jedi_client.killUnfinishedJobs(jediTaskID, code, verbose, srvID, useMailAsID) + + @_action_logger + def create_disable_idds_action(self, task_id): + task = ProductionTask.objects.get(id=task_id) + if ActionStaging.objects.filter(task=task.id).exists(): + if task.total_files_finished > 0: + step = task.step + actions = StepAction.objects.filter(step=step.id, action=12, status__in=StepAction.ACTIVE_STATUS) + action_exists = False + for action in actions: + if action.get_config('task') == task_id: + action_exists = True + break + if not action_exists: + new_action = StepAction() + new_action.step = step.id + new_action.action = 12 + new_action.set_config({'task': int(task_id)}) + new_action.attempt = 0 + new_action.status = StepAction.STATUS.ACTIVE + new_action.request = step.request + new_action.create_time = timezone.now() + new_action.execution_time = timezone.now() + new_action.save() + return self.finishTask(task_id) + else: + if task.status == 'staging': + try: + dataset_stage = ActionStaging.objects.filter(task=task.id)[0].dataset_stage + ddm = DDM() + rule = ddm.get_rule(dataset_stage.rse) + if rule['locks_ok_cnt'] == 0: + ddm.delete_replication_rule(dataset_stage.rse) + else: + return self.resumeTask(task_id) + return True, '' + except Exception as e: + return False, str(e) + + return False, 'No staging rule is found' + + @_action_logger + def obsolete_task(self, task_id): + task = ProductionTask.objects.get(id=task_id) + if task.status in ProductionTask.OBSOLETE_READY_STATUS: + task.status = ProductionTask.STATUS.OBSOLETE + task.timestamp = timezone.now() + task.save() + return True, '' + + @_action_logger + def sync_jedi(self, task_id): + sync_deft_jedi_task(task_id) + return True, '' + + @_action_logger + def set_hashtag(self, task_id, hashtag_name): + task = ProductionTask.objects.get(id=task_id) + hashtag = add_or_get_request_hashtag(hashtag_name[0]) + task.set_hashtag(hashtag) + return True, '' + + @_action_logger + def remove_hashtag(self, task_id, hashtag_name): + task = ProductionTask.objects.get(id=task_id) + hashtag = add_or_get_request_hashtag(hashtag_name[0]) + task.remove_hashtag(hashtag) + return True, '' + + @_action_logger + def retry_new(self, task_id): + raise NotImplementedError("Not yet implemented") + + @_action_logger + def clean_task_carriages(self, task_id, output_formats): + raise NotImplementedError("Not yet implemented") + + @_jedi_decorator + def kill_jobs_in_task(self, task_id, jobs_id_str, code=None, keepUnmerged=False): + jobs_id = [int(x) for x in re.split(r"[^a-zA-Z0-9]", str(jobs_id_str)) if x] + return self.jedi_client.killJobs(jobs_id, code=code, keepUnmerged=keepUnmerged)