From e66f81d90f0ace694008ea96ca5b46f9d3afa0bd Mon Sep 17 00:00:00 2001 From: Aleksandr Alekseev Date: Tue, 20 Sep 2022 11:11:07 +0200 Subject: [PATCH] Updated panda client for tasks --- core/oauth/utils.py | 11 ++- core/panda_client/urls.py | 7 +- core/panda_client/utils.py | 67 +++++++++++++++-- core/panda_client/views.py | 132 ++++------------------------------ core/templates/taskInfo.html | 29 +++++++- core/views.py | 136 +++++++++++++++++++++++++++-------- 6 files changed, 224 insertions(+), 158 deletions(-) diff --git a/core/oauth/utils.py b/core/oauth/utils.py index 87219932..428bb183 100644 --- a/core/oauth/utils.py +++ b/core/oauth/utils.py @@ -80,4 +80,13 @@ def deny_rights(request, rtype): return False - return True \ No newline at end of file + return True + +def get_auth_provider(request): + user = request.user + + if user.is_authenticated and user.social_auth is not None: + auth_provider = (request.user.social_auth.get()).provider + else: + auth_provider = None + return auth_provider \ No newline at end of file diff --git a/core/panda_client/urls.py b/core/panda_client/urls.py index 921f2fef..c87e978f 100644 --- a/core/panda_client/urls.py +++ b/core/panda_client/urls.py @@ -4,11 +4,6 @@ from core.panda_client import views as panda_client -app_name = "bigpandamon" - urlpatterns = [ - re_path(r'^panda_client/get_pandaserver_attr/$', panda_client.get_pandaserver_attr, name='get_pandaserver_attr'), - re_path(r'^panda_client/setNumSlots/$', panda_client.setNumSlots, name='setNumSlots'), - # re_path(r'^killTask/(?P.*)/$', panda_client.killTask, name='killTask') - # re_path(r'^finishTask/(?P.*)/$', panda_client.finishTask, name='finishTask') + re_path(r'^panda_client/$', panda_client.client, name='panda_client'), ] diff --git a/core/panda_client/utils.py b/core/panda_client/utils.py index 486a38c6..172bd1c7 100644 --- a/core/panda_client/utils.py +++ b/core/panda_client/utils.py @@ -1,12 +1,14 @@ import time - +from requests import post +from core.oauth.utils import get_auth_provider +baseURL = 'https://pandaserver.cern.ch/server/panda' def get_auth_indigoiam(request): header = {} organisation = 'atlas' - user = request.user - if user.is_authenticated and user.social_auth is not None: - auth_provider = (request.user.social_auth.get()).provider + auth_provider = get_auth_provider(request) + + if auth_provider: social = request.user.social_auth.get(provider=auth_provider) if (auth_provider == 'indigoiam'): if (social.extra_data['auth_time'] + social.extra_data['expires_in'] - 10) <= int(time.time()): @@ -23,6 +25,63 @@ def get_auth_indigoiam(request): return header +def kill_task(auth, jeditaskid): + """Kill a task + + request parameters: + jediTaskID: jediTaskID of the task to be killed + +""" + if jeditaskid is not None: + data = {} + + data['jediTaskID'] = jeditaskid + data['properErrorCode'] = True + + url = baseURL + '/killTask' + + try: + resp = post(url, headers=auth, data=data) + resp = resp.text + except Exception as ex: + resp = "ERROR killTask: %s %s" % (ex, resp.status_code) + else: + resp = 'Jeditaskid is not defined' + + return resp + + +def finish_task(auth, jeditaskid, soft=True): + """Finish a task + + request parameters: + jediTaskID: jediTaskID of the task to be finished + soft: If True, new jobs are not generated and the task is + finihsed once all remaining jobs are done. + If False, all remaining jobs are killed and then the + task is finished + """ + if jeditaskid is not None: + + data = {} + + data['jediTaskID'] = jeditaskid + data['properErrorCode'] = True + data['soft'] = soft + + + url = baseURL + '/finishTask' + + try: + resp = post(url, headers=auth, data=data) + resp = resp.text + except Exception as ex: + resp = "ERROR finishTask: %s %s" % (ex, resp.status_code) + else: + resp = 'Jeditaskid is not defined' + + return resp + ### TODO change it later # def pandaclient_initialization(request): # user = request.user diff --git a/core/panda_client/views.py b/core/panda_client/views.py index 51c00385..e7afcd73 100644 --- a/core/panda_client/views.py +++ b/core/panda_client/views.py @@ -3,9 +3,10 @@ from requests import get, post from django.http import HttpResponse from django.views.decorators.cache import never_cache +from django.views.decorators.csrf import csrf_exempt from core.oauth.utils import login_customrequired -from core.panda_client.utils import get_auth_indigoiam +from core.panda_client.utils import get_auth_indigoiam, kill_task, finish_task from core.views import initRequest baseURL = 'https://pandaserver.cern.ch/server/panda' @@ -27,128 +28,25 @@ def get_pandaserver_attr(request): resp = auth['detail'] return HttpResponse(resp, content_type='application/json') - -# kill task -@login_customrequired -@never_cache -def killTask(request): - """Kill a task - - request parameters: - jediTaskID: jediTaskID of the task to be killed - """ - valid, response = initRequest(request) - if not valid: - return response - - auth = get_auth_indigoiam(request) - - if auth is not None and ('Authorization' in auth and 'Origin' in auth): - if 'jeditaskid' in request.session['requestParams'] and request.session['requestParams']['jeditaskid'] is not None: - data = {} - - data['jediTaskID'] = request.session['requestParams']['jeditaskid'] - data['properErrorCode'] = True - - url = baseURL + '/killTask' - - try: - resp = post(url, headers=auth, data=data) - except Exception as ex: - resp = "ERROR killTasl: %s %s" % (ex, resp.status_code) - else: - resp = 'jeditaskid is not defined' - else: - resp = auth['detail'] - - return HttpResponse(resp, content_type='application/json') - -@login_customrequired -@never_cache -def finishTask(request): - """Finish a task - - request parameters: - jediTaskID: jediTaskID of the task to be finished - soft: If True, new jobs are not generated and the task is - finihsed once all remaining jobs are done. - If False, all remaining jobs are killed and then the - task is finished - """ +@csrf_exempt +def client(request): valid, response = initRequest(request) if not valid: return response - auth = get_auth_indigoiam(request) - + info = {} if auth is not None and ('Authorization' in auth and 'Origin' in auth): - if 'jeditaskid' in request.session['requestParams'] and request.session['requestParams']['jeditaskid'] is not None: - - data = {} - data['jediTaskID'] = request.session['requestParams']['jeditaskid'] - - data['properErrorCode'] = True - - if 'soft' in request.session['requestParams'] and bool(request.session['requestParams']['soft']) == True: - data['soft'] = True + if len(request.session['requestParams']) > 0: + data = request.session['requestParams'] + if data['action'] == 'finishtask' and ('task' in data and data['task'] is not None): + info = finish_task(auth=auth, jeditaskid=data['task']) + elif data['action'] == 'killtask' and ('task' in data and data['task'] is not None): + info = kill_task(auth=auth, jeditaskid=data['task']) else: - data['soft'] = False - - url = baseURL + '/finishTask' - - try: - resp = post(url, headers=auth, data=data) - except Exception as ex: - resp = "ERROR finishTask: %s %s" % (ex, resp.status_code) + info = 'Operation error' else: - resp = 'jeditaskid is not defined' + info = 'Request body is empty' else: - resp = auth['detail'] - - return HttpResponse(resp, content_type='application/json') - -@login_customrequired -@never_cache -def setNumSlots(request): - """Finish a task - - request parameters: - pandaQueueName: string - numSlots: int - gshare: string - resourceType: string - validPeriod: int (number of days) - """ - valid, response = initRequest(request) - if not valid: - return response - - auth = get_auth_indigoiam(request) - details = {} - if auth is not None and ('Authorization' in auth and 'Origin' in auth): - data = {} - if 'pandaqueuename' in request.session['requestParams'] and request.session['requestParams']['pandaqueuename'] is not None: - data['pandaQueueName'] = request.session['requestParams']['pandaqueuename'] - if 'gshare' in request.session['requestParams'] and request.session['requestParams']['gshare'] is not None: - data['gshare'] = request.session['requestParams']['gshare'] - if 'resourcetype' in request.session['requestParams'] and request.session['requestParams']['resourcetype'] is not None: - data['resourceType'] = request.session['requestParams']['resourcetype'] - if 'numslots' in request.session['requestParams'] and request.session['requestParams']['numslots'] is not None: - data['numSlots'] = request.session['requestParams']['numslots'] - if 'validperiod' in request.session['requestParams'] and request.session['requestParams']['validperiod'] is not None: - data['validPeriod'] = request.session['requestParams']['validperiod'] - url = baseURL + '/setNumSlotsForWP' - try: - resp = post(url, headers=auth, data=data) - details['code'] = resp.status_code - details['text'] = resp.text - details['auth_details'] = auth - details['data'] = data - except Exception as ex: - resp = "ERROR setNumSlots: %s %s" % (ex, resp.status_code) - details['message'] = resp - else: - resp = auth['detail'] - details['message'] = resp + info = auth['detail'] - return HttpResponse(json.dumps(details), content_type='application/json') \ No newline at end of file + return HttpResponse(info, content_type='text/html') diff --git a/core/templates/taskInfo.html b/core/templates/taskInfo.html index e0b1e028..73f50780 100644 --- a/core/templates/taskInfo.html +++ b/core/templates/taskInfo.html @@ -313,8 +313,13 @@
Logged status: {{ task.errordialog|safe }}
{% with 'done finished failed ready broken aborted defined' as finalstatelist %} {% if not task.status in finalstatelist %}
+ {% if 'indigoiam' in authtype %} + Finish (PanDA client) + Abort (PanDA client) + {% else %} Finish Abort + {% endif %}
{% endif %} {% endwith %} @@ -1134,6 +1139,28 @@
Warning! {{ warning.memoryleaksuspicion.message }}
} } +function killtasks_token(taskID, action) { + var message = ''; + if (action === 0) + message = 'Are you sure you want to FINISH this task?' + else + message = 'Are you sure you want to ABORT this task?' + if (confirm(message)) { + $.ajax({ + url: {% url 'panda_client' %}, + data: {task: taskID, action: action}, + async: false, + type: "POST", + success: function(msg){ + alert(msg); + } + }) + } else { + // Do nothing! + } +} + + function buildDatasetsTable(data) { $('#datasetstable').dataTable({ @@ -1175,11 +1202,11 @@
Warning! {{ warning.memoryleaksuspicion.message }}
let str = ''; if (data > 0) { str += '' + full['nfiles'] + ''; + if (full['nfilesmissing'] > 0) {str += ' +' + full['nfilesmissing'] + ' lost'; } } else { str += data; } - if (full['nfilesmissing'] > 0) {str += ' +' + full['nfilesmissing'] + ' lost'; } return str } }, diff --git a/core/views.py b/core/views.py index 28112ca4..6c7d5851 100644 --- a/core/views.py +++ b/core/views.py @@ -71,7 +71,7 @@ from core.libs.UserProfilePlot import UserProfilePlot from core.libs.TasksErrorCodesAnalyser import TasksErrorCodesAnalyser -from core.oauth.utils import login_customrequired +from core.oauth.utils import login_customrequired, get_auth_provider from core.utils import is_json_request, extensibleURL, complete_request, is_wildcards, removeParam from core.libs.dropalgorithm import insert_dropped_jobs_to_tmp_table, drop_job_retries @@ -95,7 +95,7 @@ from core.libs.bpuser import get_relevant_links, filterErrorData from core.libs.user import prepare_user_dash_plots, get_panda_user_stats, humanize_metrics from core.libs.elasticsearch import create_esatlas_connection, get_payloadlog -from core.libs.sqlcustom import escape_input, preprocess_wild_card_string, filter_dict_by_wildcards +from core.libs.sqlcustom import escape_input, preprocess_wild_card_string from core.libs.datetimestrings import datetime_handler, parse_datetime from core.libs.jobconsumers import reconstruct_job_consumers from core.libs.DateEncoder import DateEncoder @@ -645,7 +645,7 @@ def setupView(request, opmode='', hours=0, limit=-99, querytype='job', wildCardE query['jobsetid__gte'] = plo query['jobsetid__lte'] = phi elif param == 'user' or param == 'username' or param == 'produsername': - if querytype == 'job' and not is_wildcards(request.session['requestParams'][param]): + if querytype == 'job': query['produsername__icontains'] = request.session['requestParams'][param].strip() elif param in ('project',) and querytype == 'task': val = request.session['requestParams'][param] @@ -849,11 +849,7 @@ def setupView(request, opmode='', hours=0, limit=-99, querytype='job', wildCardE if param == 'site': pqs = get_panda_queues() - pqs = filter_dict_by_wildcards(pqs, 'gocname', request.session['requestParams'][param]) - if 'computingsite__in' in query: - query['computingsite__in'] = list(set(query['computingsite__in']).intersection(set([pq for pq in pqs.keys()]))) - else: - query['computingsite__in'] = [pq for pq in pqs.keys()] + query['computingsite__in'] = [info['nickname'] for pq, info in pqs.items() if info['gocname'] == request.session['requestParams'][param]] for field in Jobsactive4._meta.get_fields(): if param == field.name: @@ -868,16 +864,6 @@ def setupView(request, opmode='', hours=0, limit=-99, querytype='job', wildCardE query['%s__range' % param] = (int(leftlimit) * 1000, int(rightlimit) * 1000 - 1) else: query[param] = int(request.session['requestParams'][param]) - elif param == 'computingsite': - if is_wildcards(request.session['requestParams'][param]): - pqs = get_panda_queues() - pqs = filter_dict_by_wildcards(pqs, 'nickname', request.session['requestParams'][param]) - if 'computingsite__in' in query: - query['computingsite__in'] = list(set(query['computingsite__in']).intersection(set([pq for pq in pqs.keys()]))) - else: - query['computingsite__in'] = [pq for pq in pqs.keys()] - else: - query[param] = request.session['requestParams'][param] elif param == 'specialhandling' and not '*' in request.session['requestParams'][param]: query['specialhandling__contains'] = request.session['requestParams'][param] elif param == 'prodsourcelabel': @@ -977,12 +963,13 @@ def setupView(request, opmode='', hours=0, limit=-99, querytype='job', wildCardE query[param] = request.session['requestParams'][param] if 'region' in request.session['requestParams']: - pqs = get_panda_queues() - pqs = filter_dict_by_wildcards(pqs, 'cloud', request.session['requestParams']['region']) - if 'computingsite__in' in query: - query['computingsite__in'] = list(set(query['computingsite__in']).intersection(set([pq for pq in pqs.keys()]))) - else: - query['computingsite__in'] = [pq for pq in pqs.keys()] + region = request.session['requestParams']['region'] + pq_clouds = get_pq_clouds() + siteListForRegion = [] + for sn, rn in pq_clouds.items(): + if rn == region: + siteListForRegion.append(str(sn)) + query['computingsite__in'] = siteListForRegion if opmode in ['analysis', 'production'] and querytype == 'job': if opmode.startswith('analy'): @@ -999,16 +986,16 @@ def setupView(request, opmode='', hours=0, limit=-99, querytype='job', wildCardE extraQueryString = '' # wild cards handling - wildSearchFieldsToProcess = set() wildSearchFields = (set(wildSearchFields) & set(list(request.session['requestParams'].keys()))) # filter out fields that already in query dict + wildSearchFields1 = set() for currenfField in wildSearchFields: if not (currenfField.lower() == 'transformation'): if not ((currenfField.lower() == 'cloud') & ( any(card.lower() == 'all' for card in request.session['requestParams'][currenfField].split('|')))): if not any(currenfField in key for key, value in query.items()) and currenfField not in extraQueryFields: - wildSearchFieldsToProcess.add(currenfField) - wildSearchFields = wildSearchFieldsToProcess + wildSearchFields1.add(currenfField) + wildSearchFields = wildSearchFields1 for i_field, field_name in enumerate(wildSearchFields, start=1): extraQueryString += '(' @@ -1558,9 +1545,9 @@ def jobList(request, mode=None, param=None): if 'taskid' in request.session['requestParams'] and '|' not in request.session['requestParams']['taskid']: taskname = get_task_name_by_taskid(request.session['requestParams']['taskid']) - if 'produsername' in request.session['requestParams'] and not is_wildcards(request.session['requestParams']['produsername']): + if 'produsername' in request.session['requestParams']: user = request.session['requestParams']['produsername'] - elif 'user' in request.session['requestParams'] and not is_wildcards(request.session['requestParams']['user']): + elif 'user' in request.session['requestParams']: user = request.session['requestParams']['user'] else: user = None @@ -4802,6 +4789,91 @@ def killtasks(request): response = HttpResponse(dump, content_type='application/json') return response +@never_cache +def killtasks_token(request): + valid, response = initRequest(request) + if not valid: + return response + + from requests import get, post + + taskid = -1 + action = -1 + + if 'task' in request.session['requestParams']: + taskid = int(request.session['requestParams']['task']) + if 'action' in request.session['requestParams']: + action = int(request.session['requestParams']['action']) + + id_token = None + token_type = None + access_token = None + + username = None + fullname = None + organisation = 'atlas' + + auth_provider = None + + user = request.user + + if user.is_authenticated and user.social_auth is not None: + + auth_provider = (request.user.social_auth.get()).provider + social = request.user.social_auth.get(provider=auth_provider) + + if (auth_provider == 'indigoiam'): + if (social.extra_data['auth_time'] + social.extra_data['expires_in'] - 10) <= int(time.time()): + resp = {"detail": "id token is expired"} + dump = json.dumps(resp, cls=DateEncoder) + response = HttpResponse(dump, content_type='application/json') + return response + else: + token_type = social.extra_data['token_type'] + access_token = social.extra_data['access_token'] + id_token = social.extra_data['id_token'] + + os.environ['PANDA_AUTH_ID_TOKEN'] = id_token + os.environ['PANDA_AUTH'] = 'oidc' + os.environ['PANDA_AUTH_VO'] = organisation + else: + return None + + resp = get('https://atlas-auth.web.cern.ch/api/tokens/access', data={"grant_type": "access_token"}, + headers={'Authorization': '%s %s' % (token_type, access_token)}) + + header = {} + header['Authorization'] = 'Bearer {0}'.format(id_token) + header['Origin'] = 'atlas' + resp1 = post('https://pandaserver.cern.ch/server/panda/getAttr', headers=header) + + if resp.ok: + user_tokens = json.loads(resp.text) + # + # from pandaclient import Client + # c=Client() + + print('completed') + # c = Client() + # c.show_tasks() + # from core.panda_client.utils import kill_task, show_tasks + # show_tasks() + + resp = None + + if resp and len(resp.data) > 0: + try: + pass + except: + pass + else: + pass + + dump = json.dumps(resp, cls=DateEncoder) + response = HttpResponse(dump, content_type='application/json') + return response + + def getTaskScoutingInfo(tasks, nmax): taskslToBeDisplayed = tasks[:nmax] @@ -5449,6 +5521,8 @@ def taskInfo(request, jeditaskid=0): # Here we try to get cached data. We get any cached data is available data = getCacheEntry(request, "taskInfo", skipCentralRefresh=True) + ### Get the current AUTH type + auth = get_auth_provider(request) # temporarily turn off caching # data = None @@ -5467,6 +5541,9 @@ def taskInfo(request, jeditaskid=0): except: doRefresh = True + if 'authtype' in data and data['authtype'] != auth: + doRefresh = True + # We still want to refresh tasks if request came from central crawler and task not in the frozen state if (('REMOTE_ADDR' in request.META) and (request.META['REMOTE_ADDR'] in settings.CACHING_CRAWLER_HOSTS) and data['task'] and data['task']['status'] not in ['broken', 'aborted']): @@ -5804,6 +5881,7 @@ def taskInfo(request, jeditaskid=0): 'eventservice': eventservice, 'built': datetime.now().strftime("%m-%d %H:%M:%S"), 'warning': warning, + 'authtype': auth } data.update(getContextVariables(request))