From ce7fc944fb29bdfadebea0ce46232efa6375c97c Mon Sep 17 00:00:00 2001 From: Tatiana Korchuganova Date: Tue, 18 Apr 2023 16:04:46 +0200 Subject: [PATCH 1/5] prmon | ATLASPANDA-804 add extra check of cmtconfig --- core/filebrowser/MemoryMonitorPlots.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/filebrowser/MemoryMonitorPlots.py b/core/filebrowser/MemoryMonitorPlots.py index ec4fada5..3d629cde 100644 --- a/core/filebrowser/MemoryMonitorPlots.py +++ b/core/filebrowser/MemoryMonitorPlots.py @@ -71,7 +71,7 @@ def prMonPlots(request, pandaid=-1): if len(job) > 0: job = job[0] - if 'cmtconfig' in job and 'gpu' in job['cmtconfig']: + if 'cmtconfig' in job and job['cmtconfig'] and 'gpu' in job['cmtconfig']: processor_type = 'gpu' plots_list = [ From 46bcd497e0e6107a98db7e458b796a67f6e24c07 Mon Sep 17 00:00:00 2001 From: Tatiana Korchuganova Date: Tue, 18 Apr 2023 17:45:39 +0200 Subject: [PATCH 2/5] buildmonitor | add retry mechanism for requests to art & add /art/overview/ to exceptions in ddosprotection --- core/buildmonitor/viewsartmonit.py | 117 ++++++++++++++++----------- core/cachecontroller/mainmenurls.txt | 2 +- core/ddosprotection.py | 2 +- 3 files changed, 73 insertions(+), 48 deletions(-) diff --git a/core/buildmonitor/viewsartmonit.py b/core/buildmonitor/viewsartmonit.py index 6080718d..3b62ac34 100644 --- a/core/buildmonitor/viewsartmonit.py +++ b/core/buildmonitor/viewsartmonit.py @@ -4,9 +4,11 @@ from django.core.cache import cache import requests import json, re, datetime +import logging from django.views.decorators.cache import never_cache from core.libs.DateEncoder import DateEncoder +_logger = logging.getLogger('bigpandamon') @never_cache @@ -15,6 +17,8 @@ def artmonitviewDemo(request): # https://bigpanda.cern.ch/art/overview/?ntags=2020-01-14,2020-01-15&view=branches&json valid, response = initRequest(request) + if not valid: + return response ts_now = datetime.datetime.now() s_tstamp = '' @@ -25,60 +29,88 @@ def artmonitviewDemo(request): s_tstamp = str(ts_f) else: s_tstamp = s_tstamp + ',' + str(ts_f) - # print('string of tstamps ',s_tstamp) + + # getting branches url10 = "https://bigpanda.cern.ch/art/overview/?ntags=" + s_tstamp + "&view=branches&json" - # print('TRYING requests.get('+url10+')') - r = requests.get(url10, verify=False) - # pprint(r) + _logger.debug('getting branches from {}'.format(url10)) + n_attempts = 3 + is_success = False + i_attempt = 0 + r = None + while i_attempt < n_attempts and not is_success: + r = requests.get(url10, verify=False) + if r.status_code == 200: + is_success = True + + if not is_success: + _logger.error("Internal Server Error! Failed to get ART test results for buildmonitor from {} with\n{}".format( + url10, + str(r.text) + )) + return render( + request, + 'artmonitviewDemo.html', + {'viewParams': request.session['viewParams'], + 'resltART': []}, + content_type='text/html' + ) + + a0 = json.loads(r.text) branch_dict = a0.get('artpackages', {}) branch_list = branch_dict.keys() - # print('Branch list:',branch_list) + + # getting ART GRID test results per branch + _logger.debug('Branch list:'.format(branch_list)) dict_result = {} for branch in branch_list: url11 = "https://bigpanda.cern.ch/art/tasks/?branch=" + branch + '&ntags=' + s_tstamp + '&json' - # print('TRYING requests.get('+url11+')') - r = requests.get(url11, verify=False) - # pprint(r) - a = json.loads(r.text) - tasks = a.get('arttasks', {}) - # cache.set('art-monit-dict', dict_branch, 1800) - reslist = [] - dict_branch = {} - for k, v in tasks.items(): - if isinstance(v, dict): - for kx, ky in v.items(): - if kx == branch: - if isinstance(ky, dict): - for kxx, kyy in ky.items(): - if isinstance(kyy, dict): - for kxxx, kyyy in kyy.items(): - # print('K ',kxx,kxxx) - if re.search(kxx, kxxx): - # pprint(kyyy) - a0_branch = dict_branch.get(kxxx, {'active': 0, 'succeeded': 0, 'failed': 0, - 'finished': 0}) - s_active = kyyy['active'] + a0_branch['active'] - s_done = kyyy['succeeded'] + a0_branch['succeeded'] - s_failed = kyyy['failed'] + a0_branch['failed'] - s_finished = kyyy['finished'] + a0_branch['finished'] - dict_branch[kxxx] = {'active': s_active, 'succeeded': s_done, 'failed': s_failed, - 'finished': s_finished} - # cache.set('art-monit-dict', dict_branch, 1800) - reslist.append([s_active, s_done, s_failed, s_finished]) - dict_result[branch] = dict_branch + _logger.debug('TRYING requests.get({})'.format(url11)) + try: + r = requests.get(url11, verify=False) + r.raise_for_status() + except requests.RequestException as e: + _logger.exception("General Error\n{}".format(str(e))) + r = None + if r is not None: + a = json.loads(r.text) + tasks = a.get('arttasks', {}) + reslist = [] + dict_branch = {} + for k, v in tasks.items(): + if isinstance(v, dict): + for kx, ky in v.items(): + if kx == branch: + if isinstance(ky, dict): + for kxx, kyy in ky.items(): + if isinstance(kyy, dict): + for kxxx, kyyy in kyy.items(): + if re.search(kxx, kxxx): + a0_branch = dict_branch.get( + kxxx, + {'active': 0, 'succeeded': 0, 'failed': 0, 'finished': 0}) + s_active = kyyy['active'] + a0_branch['active'] + s_done = kyyy['succeeded'] + a0_branch['succeeded'] + s_failed = kyyy['failed'] + a0_branch['failed'] + s_finished = kyyy['finished'] + a0_branch['finished'] + dict_branch[kxxx] = { + 'active': s_active, + 'succeeded': s_done, + 'failed': s_failed, + 'finished': s_finished + } + reslist.append([s_active, s_done, s_failed, s_finished]) + dict_result[branch] = dict_branch cache.set('art-monit-dict', dict_result, 1800) - # dict_from_cache = cache.get('art-monit-dict') - # pprint('===============================') + list2view = [] for k46, v46 in dict_result.items(): for kk, vv in v46.items(): l1 = [k46] l1.append(kk) l1.extend([vv['active'], vv['succeeded'], vv['failed'], vv['finished']]) -# print('L1 ',l1) list2view.append(l1) -########### + new_cur = connection.cursor() query = """ select n.nname as \"BRANCH\", platf.pl, @@ -120,13 +152,6 @@ def artmonitviewDemo(request): dict_loc_result[l_branch] = dict_inter cache.set('art-local-dict', dict_loc_result, 1800) -# pprint(dict_loc_result) -# for k47, v47 in dict_loc_result.items(): -# print('L2',k47) -# pprint(v47) -# for kk, vv in v47.items(): -# print('L2 ', k47, kk, vv.get('done','UNDEF'), vv.get('failed','UNDEF')) - data = {'viewParams': request.session['viewParams'], 'resltART': json.dumps(list2view, cls=DateEncoder)} return render(request, 'artmonitviewDemo.html', data, content_type='text/html') diff --git a/core/cachecontroller/mainmenurls.txt b/core/cachecontroller/mainmenurls.txt index 5c77146c..0af6a4b5 100644 --- a/core/cachecontroller/mainmenurls.txt +++ b/core/cachecontroller/mainmenurls.txt @@ -33,7 +33,7 @@ /bigpandamonitor/ /art/ /errorsscat/ -/artmonitview +/artmonitview/ /dash/region/ /dash/region/?jobtype=analy&splitby=jobtype /dash/region/?jobtype=prod&splitby=jobtype diff --git a/core/ddosprotection.py b/core/ddosprotection.py index 2ea4daac..81f5b01e 100644 --- a/core/ddosprotection.py +++ b/core/ddosprotection.py @@ -32,7 +32,7 @@ class DDOSMiddleware(object): ] excepted_views = [ '/grafana/img/', '/payloadlog/', '/statpixel/', '/idds/getiddsfortask/', '/api/dc/staginginfofortask/', - '/art/tasks/', + '/art/tasks/', '/art/overview/' ] blacklist = ['130.132.21.90', '192.170.227.149'] maxAllowedJSONRequstesParallel = 1 From aef6cb5fc70d2ac62af4df7c7d168ac88dc0a832 Mon Sep 17 00:00:00 2001 From: Tatiana Korchuganova Date: Tue, 18 Apr 2023 17:59:45 +0200 Subject: [PATCH 3/5] art | replace hardcoded atlas_panda* by settings.DB_SCHEMA* --- core/art/jobSubResults.py | 14 +++++++------- core/art/modelsART.py | 9 +++++---- core/art/views.py | 32 ++++++++++++++++---------------- 3 files changed, 28 insertions(+), 27 deletions(-) diff --git a/core/art/jobSubResults.py b/core/art/jobSubResults.py index 803b5e6e..5d5127e8 100644 --- a/core/art/jobSubResults.py +++ b/core/art/jobSubResults.py @@ -121,10 +121,10 @@ def lock_nqueuedjobs(cur, nrows): """ lock_time = datetime.now().strftime(settings.DATETIME_FORMAT) - lquery = """UPDATE atlas_pandabigmon.art_results_queue + lquery = """UPDATE {}.art_results_queue SET IS_LOCKED = 1, - LOCK_TIME = to_date('%s', 'YYYY-MM-DD HH24:MI:SS') - WHERE rownum <= %i AND IS_LOCKED = 0""" % (lock_time, nrows) + LOCK_TIME = to_date('{}', 'YYYY-MM-DD HH24:MI:SS') + WHERE rownum <= {} AND IS_LOCKED = 0""".format(settings.DB_SCHEMA, lock_time, nrows) try: cur.execute(lquery) except DatabaseError as e: @@ -141,9 +141,9 @@ def delete_queuedjobs(cur, lock_time): :return: """ - dquery = """DELETE FROM atlas_pandabigmon.art_results_queue + dquery = """DELETE FROM {}.art_results_queue WHERE IS_LOCKED = 1 - AND LOCK_TIME = to_date('%s', 'YYYY-MM-DD HH24:MI:SS')""" % (lock_time) + AND LOCK_TIME = to_date('{}', 'YYYY-MM-DD HH24:MI:SS')""".format(settings.DB_SCHEMA, lock_time) try: cur.execute(dquery) except DatabaseError as e: @@ -160,8 +160,8 @@ def clear_queue(cur): :return: """ - cquery = """DELETE FROM atlas_pandabigmon.art_results_queue - WHERE IS_LOCKED = 1""" + cquery = """DELETE FROM {}.art_results_queue + WHERE IS_LOCKED = 1""".format(settings.DB_SCHEMA) try: cur.execute(cquery) except DatabaseError as e: diff --git a/core/art/modelsART.py b/core/art/modelsART.py index 35d5503c..55ec0671 100644 --- a/core/art/modelsART.py +++ b/core/art/modelsART.py @@ -4,6 +4,7 @@ from __future__ import unicode_literals from django.db import models +from django.conf import settings class ARTResults(models.Model): @@ -20,7 +21,7 @@ class ARTResults(models.Model): lock_time = models.DateTimeField(null=True, db_column='lock_time', blank=True) class Meta: - db_table = u'"ATLAS_PANDABIGMON"."ART_RESULTS"' + db_table = f'"{settings.DB_SCHEMA}"."ART_RESULTS"' class ARTSubResult(models.Model): @@ -29,7 +30,7 @@ class ARTSubResult(models.Model): result = models.TextField(db_column='RESULT_JSON', blank=True) class Meta: - db_table = u'"ATLAS_PANDABIGMON"."ART_SUBRESULT"' + db_table = f'"{settings.DB_SCHEMA}"."ART_SUBRESULT"' class ARTResultsQueue(models.Model): @@ -38,7 +39,7 @@ class ARTResultsQueue(models.Model): is_locked = models.IntegerField(db_column='is_locked') lock_time = models.DateTimeField(null=True, db_column='lock_time', blank=True) class Meta: - db_table = u'"ATLAS_PANDABIGMON"."ART_RESULTS_QUEUE"' + db_table = f'"{settings.DB_SCHEMA}"."ART_RESULTS_QUEUE"' class ARTTests(models.Model): @@ -56,4 +57,4 @@ class ARTTests(models.Model): # subresult = models.OneToOneField('ARTSubResult', related_name='pandaid_sr', on_delete=models.DO_NOTHING, db_column='pandaid') class Meta: - db_table = u'"ATLAS_PANDABIGMON"."ART_TESTS"' + db_table = f'"{settings.DB_SCHEMA}"."ART_TESTS"' diff --git a/core/art/views.py b/core/art/views.py index d3ada4e6..9e00470e 100644 --- a/core/art/views.py +++ b/core/art/views.py @@ -146,8 +146,8 @@ def artOverview(request): # quering data from dedicated SQL function query_raw = """ SELECT package, branch, ntag, nightly_tag, status, result, pandaid, testname, attemptmark - FROM table(ATLAS_PANDABIGMON.ARTTESTS_LIGHT('{}','{}','{}')) - """.format(query['ntag_from'], query['ntag_to'], query['strcondition']) + FROM table({}.ARTTESTS_LIGHT('{}','{}','{}')) + """.format(settings.DB_SCHEMA, query['ntag_from'], query['ntag_to'], query['strcondition']) cur = connection.cursor() cur.execute(query_raw) tasks_raw = cur.fetchall() @@ -275,8 +275,8 @@ def artTasks(request): cur = connection.cursor() query_raw = """ SELECT package, branch, ntag, nightly_tag, pandaid, testname, taskid, status, result, attemptmark - FROM table(ATLAS_PANDABIGMON.ARTTESTS_LIGHT('{}','{}','{}')) - """.format(query['ntag_from'], query['ntag_to'], query['strcondition']) + FROM table({}.ARTTESTS_LIGHT('{}','{}','{}')) + """.format(settings.DB_SCHEMA, query['ntag_from'], query['ntag_to'], query['strcondition']) cur.execute(query_raw) tasks_raw = cur.fetchall() cur.close() @@ -442,8 +442,8 @@ def artJobs(request): c.attemptmark, c.inputfileid, c.extrainfo - FROM table(ATLAS_PANDABIGMON.ARTTESTS('{}','{}','{}')) c - """.format(query['ntag_from'], query['ntag_to'], query['strcondition']) + FROM table({}.ARTTESTS('{}','{}','{}')) c + """.format(settings.DB_SCHEMA, query['ntag_from'], query['ntag_to'], query['strcondition']) cur.execute(query_raw) jobs = cur.fetchall() cur.close() @@ -754,8 +754,8 @@ def artStability(request): c.pandaid, c.result, c.attemptmark - FROM table(ATLAS_PANDABIGMON.ARTTESTS_LIGHT('{}','{}','{}')) c - """.format(query['ntag_from'], query['ntag_to'], query['strcondition']) + FROM table({}.ARTTESTS_LIGHT('{}','{}','{}')) c + """.format(settings.DB_SCHEMA, query['ntag_from'], query['ntag_to'], query['strcondition']) cur.execute(query_raw) jobs = cur.fetchall() cur.close() @@ -900,9 +900,9 @@ def artErrors(request): c.status, c.pandaid, c.result - FROM table(ATLAS_PANDABIGMON.ARTTESTS_LIGHT('{}','{}','{}')) c + FROM table({}.ARTTESTS_LIGHT('{}','{}','{}')) c WHERE c.attemptmark = 0 - """.format(query['ntag_from'], query['ntag_to'], query['strcondition']) + """.format(settings.DB_SCHEMA, query['ntag_from'], query['ntag_to'], query['strcondition']) cur.execute(query_raw) jobs = cur.fetchall() cur.close() @@ -992,15 +992,15 @@ def updateARTJobList(request): # Adding to ART_RESULTS_QUEUE jobs with not loaded result json yet cur = connection.cursor() - cur.execute("""INSERT INTO atlas_pandabigmon.art_results_queue + cur.execute("""INSERT INTO {0}.art_results_queue (pandaid, IS_LOCKED, LOCK_TIME) - SELECT pandaid, 0, NULL FROM table(ATLAS_PANDABIGMON.ARTTESTS_LIGHT('{}','{}','{}')) + SELECT pandaid, 0, NULL FROM table({0}.ARTTESTS_LIGHT('{1}','{2}','{3}')) WHERE pandaid is not NULL and attemptmark = 0 and result is NULL and status in ('finished', 'failed') - and pandaid not in (select pandaid from atlas_pandabigmon.art_results_queue) - """.format(query['ntag_from'], query['ntag_to'], query['strcondition'])) + and pandaid not in (select pandaid from {0}.art_results_queue) + """.format(settings.DB_SCHEMA, query['ntag_from'], query['ntag_to'], query['strcondition'])) cur.close() data = { @@ -1316,9 +1316,9 @@ def sendArtReport(request): cur = connection.cursor() query_raw = """ SELECT taskid, package, branch, ntag, nightly_tag, testname, status, result - FROM table(ATLAS_PANDABIGMON.ARTTESTS_LIGHT('{}','{}','{}')) + FROM table({}.ARTTESTS_LIGHT('{}','{}','{}')) WHERE attemptmark = 0 - """.format(query['ntag_from'], query['ntag_to'], query['strcondition']) + """.format(settings.DB_SCHEMA, query['ntag_from'], query['ntag_to'], query['strcondition']) cur.execute(query_raw) jobs = cur.fetchall() cur.close() From a6d0f457f7232c80c0ae6dd7d7aec3f4e4662b46 Mon Sep 17 00:00:00 2001 From: Tatiana Korchuganova Date: Wed, 19 Apr 2023 16:15:34 +0200 Subject: [PATCH 4/5] globalshares | replace hardcoded atlas_panda* by settings.DB_SCHEMA* --- core/errorsscattering/views.py | 14 +- core/globalshares/models.py | 38 ++++ core/globalshares/utils.py | 30 +++ core/globalshares/views.py | 322 +++++++++++++++------------------ core/libs/dropalgorithm.py | 15 +- 5 files changed, 233 insertions(+), 186 deletions(-) create mode 100644 core/globalshares/models.py diff --git a/core/errorsscattering/views.py b/core/errorsscattering/views.py index 0f7c0081..b67bf803 100644 --- a/core/errorsscattering/views.py +++ b/core/errorsscattering/views.py @@ -382,7 +382,7 @@ def errorsScatteringDetailed(request, cloud, reqid): sum(case when jobstatus = 'finished' then 1 else 0 end) as finishedc, sum(case when jobstatus in ('finished', 'failed') then 1 else 0 end) as allc, computingsite, reqid, jeditaskid - from atlas_panda.jobsarchived4 where jeditaskid in ( + from {5}.jobsarchived4 where jeditaskid in ( select id from {0} where transactionkey={1}) and modificationtime > to_date('{2}', 'YYYY-MM-DD HH24:MI:SS') and {3} group by computingsite, jeditaskid, reqid union @@ -390,13 +390,21 @@ def errorsScatteringDetailed(request, cloud, reqid): sum(case when jobstatus = 'finished' then 1 else 0 end) as finishedc, sum(case when jobstatus in ('finished', 'failed') then 1 else 0 end) as allc, computingsite, reqid, jeditaskid - from atlas_pandaarch.jobsarchived where jeditaskid in ( + from {6}.jobsarchived where jeditaskid in ( select id from {0} where transactionkey={1}) and modificationtime > to_date('{2}', 'YYYY-MM-DD HH24:MI:SS') and {3} group by computingsite, jeditaskid, reqid ) j where j.allc > 0 and {4} group by jeditaskid, computingsite, reqid - """.format(tmpTableName, transactionKey, query['modificationtime__castdate__range'][0], jcondition, condition) + """.format( + tmpTableName, + transactionKey, + query['modificationtime__castdate__range'][0], + jcondition, + condition, + settings.DB_SCHEMA_PANDA, + settings.DB_SCHEMA_PANDA_ARCH + ) new_cur.execute(querystr) diff --git a/core/globalshares/models.py b/core/globalshares/models.py new file mode 100644 index 00000000..06eaf28f --- /dev/null +++ b/core/globalshares/models.py @@ -0,0 +1,38 @@ +from django.db import models +from django.conf import settings + + +class GlobalSharesModel(models.Model): + name = models.CharField(max_length=32, db_column='name', null=False) + value = models.IntegerField(db_column='value', null=False) + parent = models.CharField(max_length=32, db_column='parent', null=True, blank=False) + prodsourcelabel = models.CharField(max_length=100, db_column='prodsourcelabel', null=True, blank=True) + workinggroup = models.CharField(max_length=100, db_column='workinggroup', null=True, blank=True) + campaign = models.CharField(max_length=100, db_column='campaign', null=True, blank=True) + processingtype = models.CharField(max_length=100, db_column='processingtype', null=True, blank=True) + vo = models.CharField(max_length=32, db_column='vo', null=True, blank=True) + queueid = models.IntegerField(db_column='queue_id', null=False, blank=True, primary_key=True) + throttled = models.CharField(max_length=1, db_column='throttled', null=True, blank=True) + transpath = models.CharField(max_length=128, db_column='transpath', null=True, blank=True) + rtype = models.CharField(max_length=16, db_column='rtype', null=True, blank=True) + + class Meta: + app_label = 'panda' + db_table = f'"{settings.DB_SCHEMA_PANDA}"."global_shares"' + + +class JobsShareStats(models.Model): + ts = models.DateTimeField(db_column='ts', null=True, blank=True) + gshare = models.CharField(max_length=32, db_column='gshare', null=True) + computingsite = models.CharField(max_length=128, db_column='computingsite', null=True) + jobstatus = models.CharField(max_length=15, db_column='jobstatus', null=False) + maxpriority = models.IntegerField(db_column='maxpriority', null=True) + njobs = models.IntegerField(db_column='njobs', null=True) + hs = models.DecimalField(db_column='hs', max_digits=5, null=True) + vo = models.CharField(max_length=32, db_column='vo', null=True) + workqueueid = models.IntegerField(db_column='workqueue_id', null=False) + resourcetype = models.CharField(max_length=16, db_column='resource_type', null=True) + + class Meta: + app_label = 'panda' + db_table = f'"{settings.DB_SCHEMA_PANDA}"."jobs_share_stats"' \ No newline at end of file diff --git a/core/globalshares/utils.py b/core/globalshares/utils.py index fe75aaf6..43d5a0bf 100644 --- a/core/globalshares/utils.py +++ b/core/globalshares/utils.py @@ -3,6 +3,9 @@ """ import re +from django.db.models import Case, When, Value, Sum +from core.globalshares.models import JobsShareStats + def get_child_elements(tree,childsgsharelist): for gshare in tree: if gshare!='childlist': @@ -31,6 +34,33 @@ def get_child_sumstats(childsgsharelist,resourcesdict,gshare): return parentgshare +def get_hs_distribution(group_by='gshare', out_format='dict'): + """ + Get HS06s aggregation from jobs_share_stats table + :param group_by: field to group by + :return: + """ + group_by_list = [] + if type(group_by) in (list, tuple) and len(set(group_by) - set([f.name for f in JobsShareStats._meta.get_fields()])) == 0: + group_by_list = list(group_by) + elif isinstance(group_by, str) and group_by in [f.name for f in JobsShareStats._meta.get_fields()]: + group_by_list.append(group_by) + else: + return [] + group_by_list.append('jobstatus_grouped') + hs_distribution = JobsShareStats.objects.annotate( + jobstatus_grouped=Case( + When(jobstatus='activated', then=Value('queued')), + When(jobstatus__in=('sent', 'running'), then=Value('executing')), + default=Value('ignore') + ) + ).values(*group_by_list).annotate(hs_sum=Sum('hs')) + group_by_list.append('hs_sum') + if out_format == 'tuple': + hs_distribution = [tuple(row[v] for v in group_by_list) for row in hs_distribution] + + return hs_distribution + def get_gs_plots_data(gs_list, resources_dict, gs_tree_dict): gs_plot_data = { 'level1': { diff --git a/core/globalshares/views.py b/core/globalshares/views.py index 6cd978f1..4cfe906d 100644 --- a/core/globalshares/views.py +++ b/core/globalshares/views.py @@ -5,11 +5,11 @@ import re from decimal import Decimal -import urllib3 from django.http import HttpResponse from django.shortcuts import render from django.utils.cache import patch_response_headers from django.db import connection +from django.conf import settings from core.libs.cache import getCacheEntry, setCacheEntry from core.libs.CustomJSONSerializer import DecimalEncoder @@ -20,7 +20,8 @@ import json from core.globalshares import GlobalShares -from core.globalshares.utils import get_gs_plots_data, get_child_elements, get_child_sumstats +from core.globalshares.utils import get_gs_plots_data, get_child_elements, get_child_sumstats, get_hs_distribution +from core.globalshares.models import GlobalSharesModel, JobsShareStats @login_customrequired def globalshares(request): @@ -146,15 +147,8 @@ def get_resources_gshare(): PLEDGED = 'pledged' IGNORE = 'ignore' resourcesDictSites = get_pq_resource_types() - sqlRequest = """ - SELECT gshare, computingsite, jobstatus_grouped, SUM(HS) - FROM (SELECT gshare, computingsite, HS, CASE WHEN jobstatus IN('activated') THEN 'queued' WHEN jobstatus IN('sent', 'running') THEN 'executing' - ELSE 'ignore' END jobstatus_grouped FROM ATLAS_PANDA.JOBS_SHARE_STATS JSS) GROUP BY gshare,computingsite, jobstatus_grouped order by gshare - """ - cur = connection.cursor() - cur.execute(sqlRequest) + hs_distribution_raw = get_hs_distribution(group_by=('gshare', 'computingsite'), out_format='tuple') # get the hs distribution data into a dictionary structure - hs_distribution_raw = cur.fetchall() hs_distribution_dict = {} hs_queued_total = 0 hs_executing_total = 0 @@ -256,45 +250,25 @@ def add_resources(gshare,tableRows,resourceslist,level): def get_shares(parents=''): - comment = ' /* DBProxy.get_shares */' - methodName = comment.split(' ')[-2].split('.')[-1] - - sql = """ - SELECT NAME, VALUE, PARENT, PRODSOURCELABEL, WORKINGGROUP, CAMPAIGN, PROCESSINGTYPE - FROM ATLAS_PANDA.GLOBAL_SHARES - """ - var_map = None - - if parents == '': - # Get all shares - pass - elif parents is None: - # Get top level shares - sql += "WHERE parent IS NULL" - + """ + Get global shares from DB + :param parents: + :return: + """ + gvalues = ('name', 'value', 'parent', 'prodsourcelabel', 'workinggroup', 'campaign', 'processingtype') + gquery = {} + if parents is None: + gquery['parent__isnull'] = True elif type(parents) == np.unicode: - # Get the children of a specific share - var_map = {':parent': parents} - sql += "WHERE parent = :parent" - + gquery['parent'] = parents elif type(parents) in (list, tuple): - # Get the children of a list of shares - i = 0 - var_map = {} - for parent in parents: - key = ':parent{0}'.format(i) - var_map[key] = parent - i += 1 + gquery['parent__in'] = parents - parentBindings = ','.join(':parent{0}'.format(i) for i in xrange(len(parents))) - sql += "WHERE parent IN ({0})".format(parentBindings) - - cur = connection.cursor() - cur.execute(sql, var_map) - resList = cur.fetchall() - cur.close() + global_shares_list = [] + global_shares_list.extend(GlobalSharesModel.objects.filter(**gquery).values(*gvalues)) + global_shares_tuples = [(tuple(gs[gv] for gv in gvalues)) for gs in global_shares_list] - return resList + return global_shares_tuples def __load_branch(share): @@ -336,12 +310,7 @@ def __get_hs_leave_distribution(): tree.normalize() leave_shares = tree.get_leaves() - sql_hs_distribution = "SELECT gshare, jobstatus_grouped, SUM(HS) FROM (SELECT gshare, HS, CASE WHEN jobstatus IN('activated') THEN 'queued' WHEN jobstatus IN('sent', 'running') THEN 'executing' ELSE 'ignore' END jobstatus_grouped FROM ATLAS_PANDA.JOBS_SHARE_STATS JSS) GROUP BY gshare, jobstatus_grouped" - - cur = connection.cursor() - cur.execute(sql_hs_distribution) - hs_distribution_raw = cur.fetchall() - cur.close() + hs_distribution_raw = get_hs_distribution(group_by='gshare', out_format='tuple') # get the hs distribution data into a dictionary structure hs_distribution_dict = {} @@ -440,41 +409,41 @@ def getChildStat(node, hs_distribution_dict, level): ###JSON for Datatables globalshares### def detailedInformationJSON(request): fullListGS = [] - sqlRequest = ''' -SELECT gshare, corecount, jobstatus, count(*), sum(HS06) FROM -(select gshare, (CASE -WHEN corecount is null THEN 1 else corecount END -) as corecount, - (CASE - WHEN jobstatus in ('defined','waiting','pending','assigned','throttled','activated','merging','starting','holding','transferring') THEN 'scheduled' - WHEN jobstatus in ('sent','running') THEN 'running' - WHEN jobstatus in ('finished','failed','cancelled','closed') THEN 'did run' -END) as jobstatus,HS06 -from -atlas_panda.jobsactive4 -UNION ALL -select gshare, (CASE -WHEN corecount is null THEN 1 else corecount END -) as corecount, -(CASE - WHEN jobstatus in ('defined','waiting','pending','assigned','throttled','activated','merging','starting','holding','transferring') THEN 'scheduled' - WHEN jobstatus in ('sent','running') THEN 'running' - WHEN jobstatus in ('finished','failed','cancelled','closed') THEN 'did run' -END) as jobstatus,HS06 -from -atlas_panda.JOBSDEFINED4 -UNION ALL -select gshare, (CASE - WHEN corecount is null THEN 1 else corecount END -) as corecount, (CASE - WHEN jobstatus in ('defined','waiting','pending','assigned','throttled','activated','merging','starting','holding','transferring') THEN 'scheduled' - WHEN jobstatus in ('sent','running') THEN 'running' - WHEN jobstatus in ('finished','failed','cancelled','closed') THEN 'did run' -END) as jobstatus,HS06 from -atlas_panda.JOBSWAITING4) -group by gshare, corecount, jobstatus -order by gshare, corecount, jobstatus -''' + sqlRequest = """ + SELECT gshare, corecount, jobstatus, count(*), sum(HS06) FROM + (select gshare, (CASE + WHEN corecount is null THEN 1 else corecount END + ) as corecount, + (CASE + WHEN jobstatus in ('defined','waiting','pending','assigned','throttled','activated','merging','starting','holding','transferring') THEN 'scheduled' + WHEN jobstatus in ('sent','running') THEN 'running' + WHEN jobstatus in ('finished','failed','cancelled','closed') THEN 'did run' + END) as jobstatus,HS06 + from + {0}.jobsactive4 + UNION ALL + select gshare, (CASE + WHEN corecount is null THEN 1 else corecount END + ) as corecount, + (CASE + WHEN jobstatus in ('defined','waiting','pending','assigned','throttled','activated','merging','starting','holding','transferring') THEN 'scheduled' + WHEN jobstatus in ('sent','running') THEN 'running' + WHEN jobstatus in ('finished','failed','cancelled','closed') THEN 'did run' + END) as jobstatus,HS06 + from + {0}.JOBSDEFINED4 + UNION ALL + select gshare, (CASE + WHEN corecount is null THEN 1 else corecount END + ) as corecount, (CASE + WHEN jobstatus in ('defined','waiting','pending','assigned','throttled','activated','merging','starting','holding','transferring') THEN 'scheduled' + WHEN jobstatus in ('sent','running') THEN 'running' + WHEN jobstatus in ('finished','failed','cancelled','closed') THEN 'did run' + END) as jobstatus,HS06 from + {0}.JOBSWAITING4) + group by gshare, corecount, jobstatus + order by gshare, corecount, jobstatus + """.format(settings.DB_SCHEMA_PANDA) #if isJobsss: #sqlRequest += ' WHERE '+ codename + '='+codeval # INPUT_EVENTS, TOTAL_EVENTS, STEP @@ -498,37 +467,37 @@ def detailedInformationJSON(request): def sharesDistributionJSON(request): fullListGS = [] sqlRequest = ''' -SELECT gshare,COMPUTINGSITE, corecount, jobstatus, COUNT(*), SUM(HS06) -FROM (select gshare,COMPUTINGSITE, (CASE - WHEN corecount is null THEN 1 else corecount END -) as corecount, - (CASE jobstatus - WHEN 'running' THEN 'running' - ELSE 'scheduled' -END) as jobstatus, HS06 -from -atlas_panda.jobsactive4 -UNION ALL -select gshare,COMPUTINGSITE, (CASE - WHEN corecount is null THEN 1 else corecount END -) as corecount, - (CASE jobstatus - WHEN 'running' THEN 'running' - ELSE 'scheduled' -END) as jobstatus, HS06 -from -atlas_panda.JOBSDEFINED4 -UNION ALL -select gshare,COMPUTINGSITE, (CASE - WHEN corecount is null THEN 1 else corecount END -) as corecount, (CASE jobstatus - WHEN 'running' THEN 'running' - ELSE 'scheduled' -END) as jobstatus, HS06 from -atlas_panda.JOBSWAITING4 -) group by gshare,COMPUTINGSITE, corecount, jobstatus -order by gshare,COMPUTINGSITE, corecount, jobstatus -''' + SELECT gshare,COMPUTINGSITE, corecount, jobstatus, COUNT(*), SUM(HS06) + FROM (select gshare,COMPUTINGSITE, (CASE + WHEN corecount is null THEN 1 else corecount END + ) as corecount, + (CASE jobstatus + WHEN 'running' THEN 'running' + ELSE 'scheduled' + END) as jobstatus, HS06 + from + {0}.jobsactive4 + UNION ALL + select gshare,COMPUTINGSITE, (CASE + WHEN corecount is null THEN 1 else corecount END + ) as corecount, + (CASE jobstatus + WHEN 'running' THEN 'running' + ELSE 'scheduled' + END) as jobstatus, HS06 + from + {0}.JOBSDEFINED4 + UNION ALL + select gshare,COMPUTINGSITE, (CASE + WHEN corecount is null THEN 1 else corecount END + ) as corecount, (CASE jobstatus + WHEN 'running' THEN 'running' + ELSE 'scheduled' + END) as jobstatus, HS06 from + {0}.JOBSWAITING4 + ) group by gshare,COMPUTINGSITE, corecount, jobstatus + order by gshare,COMPUTINGSITE, corecount, jobstatus + '''.format(settings.DB_SCHEMA_PANDA) #if isJobsss: #sqlRequest += ' WHERE '+ codename + '='+codeval # INPUT_EVENTS, TOTAL_EVENTS, STEP @@ -561,37 +530,37 @@ def sharesDistributionJSON(request): def siteWorkQueuesJSON(request): fullListGS = [] sqlRequest = ''' -SELECT COMPUTINGSITE,gshare, corecount, jobstatus,COUNT (*) -FROM (select COMPUTINGSITE,gshare, (CASE - WHEN corecount is null THEN 1 else corecount END -) as corecount, - (CASE jobstatus - WHEN 'running' THEN 'running' - ELSE 'scheduled' -END) as jobstatus -from -atlas_panda.jobsactive4 -UNION ALL -select COMPUTINGSITE,gshare, (CASE - WHEN corecount is null THEN 1 else corecount END -) as corecount, - (CASE jobstatus - WHEN 'running' THEN 'running' - ELSE 'scheduled' -END) as jobstatus -from -atlas_panda.JOBSDEFINED4 -UNION ALL -select COMPUTINGSITE,gshare, (CASE -WHEN corecount is null THEN 1 else corecount END -) as corecount, (CASE jobstatus - WHEN 'running' THEN 'running' - ELSE 'scheduled' -END) as jobstatus from -atlas_panda.JOBSWAITING4 -) group by COMPUTINGSITE,gshare, corecount, jobstatus -order by COMPUTINGSITE,gshare, corecount, jobstatus -''' + SELECT COMPUTINGSITE,gshare, corecount, jobstatus,COUNT (*) + FROM (select COMPUTINGSITE,gshare, (CASE + WHEN corecount is null THEN 1 else corecount END + ) as corecount, + (CASE jobstatus + WHEN 'running' THEN 'running' + ELSE 'scheduled' + END) as jobstatus + from + {0}.jobsactive4 + UNION ALL + select COMPUTINGSITE,gshare, (CASE + WHEN corecount is null THEN 1 else corecount END + ) as corecount, + (CASE jobstatus + WHEN 'running' THEN 'running' + ELSE 'scheduled' + END) as jobstatus + from + {0}.JOBSDEFINED4 + UNION ALL + select COMPUTINGSITE,gshare, (CASE + WHEN corecount is null THEN 1 else corecount END + ) as corecount, (CASE jobstatus + WHEN 'running' THEN 'running' + ELSE 'scheduled' + END) as jobstatus from + {0}.JOBSWAITING4 + ) group by COMPUTINGSITE,gshare, corecount, jobstatus + order by COMPUTINGSITE,gshare, corecount, jobstatus + '''.format(settings.DB_SCHEMA_PANDA) #if isJobsss: #sqlRequest += ' WHERE '+ codename + '='+codeval # INPUT_EVENTS, TOTAL_EVENTS, STEP @@ -625,13 +594,10 @@ def resourcesType(request): IGNORE = 'ignore' resourcesList = [] resourcesDictSites = get_pq_resource_types() - sqlRequest = """SELECT computingsite, jobstatus_grouped, SUM(HS) - FROM (SELECT computingsite, HS, CASE WHEN jobstatus IN('activated') THEN 'queued' WHEN jobstatus IN('sent', 'running') THEN 'executing' - ELSE 'ignore' END jobstatus_grouped FROM ATLAS_PANDA.JOBS_SHARE_STATS JSS) GROUP BY computingsite, jobstatus_grouped""" - cur = connection.cursor() - cur.execute(sqlRequest) + + hs_distribution_raw = get_hs_distribution(group_by='computingsite', out_format='tuple') + # get the hs distribution data into a dictionary structure - hs_distribution_raw = cur.fetchall() hs_distribution_dict = {} hs_queued_total = 0 hs_executing_total = 0 @@ -646,8 +612,6 @@ def resourcesType(request): except: continue hs_distribution_dict.setdefault(resourcetype, {PLEDGED: 0, QUEUED: 0, EXECUTING: 0,IGNORE:0}) - #hs_distribution_dict[resourcetype][status_group] = hs - total_hs += hs # calculate totals @@ -660,6 +624,7 @@ def resourcesType(request): else: hs_ignore_total += hs hs_distribution_dict[resourcetype][status_group] += hs + ignore = 0 pled = 0 executing = 0 @@ -684,16 +649,19 @@ def resourcesType(request): hs_distribution_dict[hs_entry]['ignore_percent'] = (hs_distribution_dict[hs_entry]['ignore']/ignore)* 100 hs_distribution_dict[hs_entry]['executing_percent'] = (hs_distribution_dict[hs_entry]['executing'] /executing) * 100 hs_distribution_dict[hs_entry]['queued_percent'] = (hs_distribution_dict[hs_entry]['queued']/queued) * 100 - hs_distribution_list.append({'resource':hs_entry, 'pledged':hs_distribution_dict[hs_entry]['pledged'], - 'ignore':hs_distribution_dict[hs_entry]['ignore'], - 'ignore_percent':round(hs_distribution_dict[hs_entry]['ignore_percent'],2), - 'executing':hs_distribution_dict[hs_entry]['executing'], - 'executing_percent': round(hs_distribution_dict[hs_entry]['executing_percent'],2), - 'queued':hs_distribution_dict[hs_entry]['queued'], - 'queued_percent':round(hs_distribution_dict[hs_entry]['queued_percent'],2), - 'total_hs':hs_distribution_dict[hs_entry]['total_hs'], - 'total_hs_percent': round((hs_distribution_dict[hs_entry]['total_hs']/total_hs)*100,2) - }) + hs_distribution_list.append( + { + 'resource':hs_entry, 'pledged':hs_distribution_dict[hs_entry]['pledged'], + 'ignore':hs_distribution_dict[hs_entry]['ignore'], + 'ignore_percent':round(hs_distribution_dict[hs_entry]['ignore_percent'],2), + 'executing':hs_distribution_dict[hs_entry]['executing'], + 'executing_percent': round(hs_distribution_dict[hs_entry]['executing_percent'],2), + 'queued':hs_distribution_dict[hs_entry]['queued'], + 'queued_percent':round(hs_distribution_dict[hs_entry]['queued_percent'],2), + 'total_hs':hs_distribution_dict[hs_entry]['total_hs'], + 'total_hs_percent': round((hs_distribution_dict[hs_entry]['total_hs']/total_hs)*100,2) + } + ) return HttpResponse(json.dumps(hs_distribution_list, cls=DecimalEncoder), content_type='application/json') @@ -702,7 +670,7 @@ def fairsharePolicy(request): QUEUED = 'queued' PLEDGED = 'pledged' IGNORE = 'ignore' - #sqlrequest ="""select SITEID, fairsharepolicy from atlas_pandameta.schedconfig""" + fairsharepolicyDict = get_pq_fairshare_policy() newfairsharepolicyDict = {} fairsharepolicies = fairsharepolicyDict.values() @@ -727,13 +695,10 @@ def fairsharePolicy(request): newfairsharepolicyDict[site]['type=any'] = 60 else: newfairsharepolicyDict[site]['type=any'] = 100 - sqlRequest = """SELECT computingsite, jobstatus_grouped, SUM(HS) - FROM (SELECT computingsite, HS, CASE WHEN jobstatus IN('activated') THEN 'queued' WHEN jobstatus IN('sent', 'running') THEN 'executing' - ELSE 'ignore' END jobstatus_grouped FROM ATLAS_PANDA.JOBS_SHARE_STATS JSS) GROUP BY computingsite, jobstatus_grouped""" - cur = connection.cursor() - cur.execute(sqlRequest) + + hs_distribution_raw = get_hs_distribution(group_by='computingsite', out_format='tuple') + # get the hs distribution data into a dictionary structure - hs_distribution_raw = cur.fetchall() hs_distribution_dict = {} hs_queued_total = 0 hs_executing_total = 0 @@ -828,16 +793,16 @@ def coreTypes(request): json_value(gg.DATA, '$.fairsharepolicy') as fairsharepolicy, json_value(gg.DATA, '$.catchall') as catchall FROM - atlas_panda.jobs_share_stats jj, - atlas_panda.schedconfig_json gg + {0}.jobs_share_stats jj, + {0}.schedconfig_json gg where jj.COMPUTINGSITE = gg.PANDA_QUEUE) GROUP BY corecount, jobstatus_grouped order by corecount - """ + """.format(settings.DB_SCHEMA_PANDA) cur = connection.cursor() cur.execute(sqlRequest) - # get the hs distribution data into a dictionary structure - hs_distribution_raw = cur.fetchall() + + # get the hs distribution data into a dictionary structure hs_distribution_dict = {} hs_queued_total = 0 hs_executing_total = 0 @@ -849,7 +814,6 @@ def coreTypes(request): for hs_entry in hs_distribution_raw: corecount, status_group, hs = hs_entry hs_distribution_dict.setdefault(corecount, {PLEDGED: 0, QUEUED: 0, EXECUTING: 0, IGNORE:0}) - #hs_distribution_dict[corecount][status_group] = hs total_hs += hs # calculate totals diff --git a/core/libs/dropalgorithm.py b/core/libs/dropalgorithm.py index 3b15f0e4..4e923098 100644 --- a/core/libs/dropalgorithm.py +++ b/core/libs/dropalgorithm.py @@ -132,13 +132,13 @@ def insert_dropped_jobs_to_tmp_table(query, extra): h.oldpandaid, h.relationtype, h.newpandaid from ( select ja4.pandaid, ja4.jeditaskid, ja4.eventservice, ja4.specialhandling, ja4.jobstatus, ja4.jobsetid, ja4.jobsubstatus, ja4.processingtype - from ATLAS_PANDA.JOBSARCHIVED4 ja4 where ja4.jeditaskid = {3} + from {4}.JOBSARCHIVED4 ja4 where ja4.jeditaskid = {3} union select ja.pandaid, ja.jeditaskid, ja.eventservice, ja.specialhandling, ja.jobstatus, ja.jobsetid, ja.jobsubstatus, ja.processingtype - from ATLAS_PANDAARCH.JOBSARCHIVED ja where ja.jeditaskid = {4} + from {5}.JOBSARCHIVED ja where ja.jeditaskid = {3} ) j LEFT JOIN - ATLAS_PANDA.jedi_job_retry_history h + {4}.jedi_job_retry_history h ON (h.jeditaskid = j.jeditaskid AND h.oldpandaid = j.pandaid) OR (h.oldpandaid=j.jobsetid and h.jeditaskid = j.jeditaskid) ) @@ -171,7 +171,14 @@ def insert_dropped_jobs_to_tmp_table(query, extra): ) OR (jobstatus='closed' and (jobsubstatus in ('es_unused', 'es_inaction'))) ) - """.format(tmpTableName, transactionKey, timezone.now().strftime("%Y-%m-%d"), jeditaskid, jeditaskid) + """.format( + tmpTableName, + transactionKey, + timezone.now().strftime("%Y-%m-%d"), + jeditaskid, + settings.DB_SCHEMA_PANDA, + settings.DB_SCHEMA_PANDA_ARCH + ) new_cur.execute(ins_query) # form an extra query condition to exclude retried pandaids from selection From 70a4649f00239eb53f6ee66d9c2ca502d47d8272 Mon Sep 17 00:00:00 2001 From: Tatiana Korchuganova Date: Wed, 19 Apr 2023 16:51:25 +0200 Subject: [PATCH 5/5] harvester | fix worker info page for postgres --- core/harvester/views.py | 67 ++++++++++++++++++++++------------------- 1 file changed, 36 insertions(+), 31 deletions(-) diff --git a/core/harvester/views.py b/core/harvester/views.py index 8f2841aa..70cb743b 100644 --- a/core/harvester/views.py +++ b/core/harvester/views.py @@ -1488,37 +1488,42 @@ def getHarvesterJobs(request, instance='', workerid='', jobstatus='', fields='', values.append(v) sqlQuery = """ - SELECT {2} FROM - (SELECT {2} FROM {DB_SCHEMA_PANDA}.JOBSARCHIVED4, - (select - pandaid as pid - from {DB_SCHEMA_PANDA}.harvester_rel_jobs_workers where - {DB_SCHEMA_PANDA}.harvester_rel_jobs_workers.harvesterid {0} and {DB_SCHEMA_PANDA}.harvester_rel_jobs_workers.workerid {1}) - PIDACTIVE WHERE PIDACTIVE.pid={DB_SCHEMA_PANDA}.JOBSARCHIVED4.PANDAID {3} - UNION - SELECT {2} FROM {DB_SCHEMA_PANDA}.JOBSACTIVE4, - (select - pandaid as pid - from {DB_SCHEMA_PANDA}.harvester_rel_jobs_workers where - {DB_SCHEMA_PANDA}.harvester_rel_jobs_workers.harvesterid {0} and {DB_SCHEMA_PANDA}.harvester_rel_jobs_workers.workerid {1}) PIDACTIVE WHERE PIDACTIVE.pid={DB_SCHEMA_PANDA}.JOBSACTIVE4.PANDAID {3} - UNION - SELECT {2} FROM {DB_SCHEMA_PANDA}.JOBSDEFINED4, - (select - pandaid as pid - from {DB_SCHEMA_PANDA}.harvester_rel_jobs_workers where - {DB_SCHEMA_PANDA}.harvester_rel_jobs_workers.harvesterid {0} and {DB_SCHEMA_PANDA}.harvester_rel_jobs_workers.workerid {1}) PIDACTIVE WHERE PIDACTIVE.pid={DB_SCHEMA_PANDA}.JOBSDEFINED4.PANDAID {3} - UNION - SELECT {2} FROM {DB_SCHEMA_PANDA}.JOBSWAITING4, - (select - pandaid as pid - from {DB_SCHEMA_PANDA}.harvester_rel_jobs_workers where - {DB_SCHEMA_PANDA}.harvester_rel_jobs_workers.harvesterid {0} and {DB_SCHEMA_PANDA}.harvester_rel_jobs_workers.workerid {1}) PIDACTIVE WHERE PIDACTIVE.pid={DB_SCHEMA_PANDA}.JOBSWAITING4.PANDAID {3} - UNION - SELECT {2} FROM {DB_SCHEMA_PANDA_ARCH}.JOBSARCHIVED, - (select - pandaid as pid - from {DB_SCHEMA_PANDA}.harvester_rel_jobs_workers where - {DB_SCHEMA_PANDA}.harvester_rel_jobs_workers.harvesterid {0} and {DB_SCHEMA_PANDA}.harvester_rel_jobs_workers.workerid {1}) PIDACTIVE WHERE PIDACTIVE.pid={DB_SCHEMA_PANDA_ARCH}.JOBSARCHIVED.PANDAID {3}) + select {2} from ( + select {2} from {DB_SCHEMA_PANDA}.jobsarchived4 jarch4 , ( + select pandaid as pid + from {DB_SCHEMA_PANDA}.harvester_rel_jobs_workers + where harvesterid {0} and workerid {1} + ) hj + where hj.pid=jarch4.pandaid {3} + union + select {2} from {DB_SCHEMA_PANDA}.jobsactive4 jact4, ( + select pandaid as pid + from {DB_SCHEMA_PANDA}.harvester_rel_jobs_workers + where harvesterid {0} and workerid {1} + ) hj + where hj.pid=jact4.pandaid {3} + union + select {2} from {DB_SCHEMA_PANDA}.jobsdefined4 jd4, ( + select pandaid as pid + from {DB_SCHEMA_PANDA}.harvester_rel_jobs_workers + where harvesterid {0} and workerid {1} + ) hj + where hj.pid=jd4.pandaid {3} + union + select {2} FROM {DB_SCHEMA_PANDA}.jobswaiting4 jw4, ( + select pandaid as pid + from {DB_SCHEMA_PANDA}.harvester_rel_jobs_workers + where harvesterid {0} and workerid {1} + ) hj + where hj.pid=jw4.pandaid {3} + union + select {2} from {DB_SCHEMA_PANDA_ARCH}.jobsarchived ja, ( + select pandaid as pid + from {DB_SCHEMA_PANDA}.harvester_rel_jobs_workers + where harvesterid {0} and workerid {1} + ) hj + where hj.pid=ja.pandaid {3} + ) comb_data """ sqlQuery = sqlQuery.format(