Skip to content

Commit

Permalink
Merge pull request #140 from PanDAWMS/dev
Browse files Browse the repository at this point in the history
ATLASPANDA-783 jobs sorting
  • Loading branch information
tkorchug authored Mar 9, 2023
2 parents 452e765 + 1b21753 commit b022805
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 74 deletions.
Empty file removed core/globalpage/views.py
Empty file.
148 changes: 74 additions & 74 deletions core/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -1391,26 +1391,32 @@ def jobList(request, mode=None, param=None):
elif 'statenotupdated' in request.session['requestParams']:
jobs = stateNotUpdated(request, values=values, wildCardExtension=wildCardExtension)
elif 'harvesterinstance' in request.session['requestParams'] and 'workerid' in request.session['requestParams']:
jobs = getHarvesterJobs(request,
instance=request.session['requestParams']['harvesterinstance'],
workerid=request.session['requestParams']['workerid'],
jobstatus=harvesterjobstatus,
fields=values)
jobs = getHarvesterJobs(
request,
instance=request.session['requestParams']['harvesterinstance'],
workerid=request.session['requestParams']['workerid'],
jobstatus=harvesterjobstatus,
fields=values)
elif 'harvesterid' in request.session['requestParams'] and 'workerid' in request.session['requestParams']:
jobs = getHarvesterJobs(request,
instance=request.session['requestParams']['harvesterid'],
workerid=request.session['requestParams']['workerid'],
jobstatus=harvesterjobstatus,
fields=values)
elif ('harvesterinstance' not in request.session['requestParams'] and 'harvesterid' not in request.session[
'requestParams']) and 'workerid' in request.session['requestParams']:
jobs = getHarvesterJobs(request,
workerid=request.session['requestParams']['workerid'],
jobstatus=harvesterjobstatus,
fields=values)
jobs = getHarvesterJobs(
request,
instance=request.session['requestParams']['harvesterid'],
workerid=request.session['requestParams']['workerid'],
jobstatus=harvesterjobstatus,
fields=values)
elif 'harvesterinstance' not in request.session['requestParams'] and (
'harvesterid' not in request.session['requestParams']) and (
'workerid' in request.session['requestParams']):
jobs = getHarvesterJobs(
request,
workerid=request.session['requestParams']['workerid'],
jobstatus=harvesterjobstatus,
fields=values)
elif 'harvesterce' in request.session['requestParams']:
jobs = getCeHarvesterJobs(request, computingelment=request.session['requestParams']['harvesterce'])
else:
# apply order by to get recent jobs
order_by = '-modificationtime'
# exclude time from query for DB tables with active jobs
etquery = copy.deepcopy(query)
if ('modificationtime__castdate__range' in etquery and (
Expand All @@ -1420,10 +1426,10 @@ def jobList(request, mode=None, param=None):
del etquery['modificationtime__castdate__range']
warning['notimelimit'] = "no time window limiting was applied for active jobs in this selection"

jobs.extend(Jobsdefined4.objects.filter(**etquery).extra(where=[wildCardExtension])[:JOB_LIMIT].values(*values))
jobs.extend(Jobsactive4.objects.filter(**etquery).extra(where=[wildCardExtension])[:JOB_LIMIT].values(*values))
jobs.extend(Jobswaiting4.objects.filter(**etquery).extra(where=[wildCardExtension])[:JOB_LIMIT].values(*values))
jobs.extend(Jobsarchived4.objects.filter(**query).extra(where=[wildCardExtension])[:JOB_LIMIT].values(*values))
jobs.extend(Jobsdefined4.objects.filter(**etquery).extra(where=[wildCardExtension]).order_by(order_by)[:JOB_LIMIT].values(*values))
jobs.extend(Jobsactive4.objects.filter(**etquery).extra(where=[wildCardExtension]).order_by(order_by)[:JOB_LIMIT].values(*values))
jobs.extend(Jobswaiting4.objects.filter(**etquery).extra(where=[wildCardExtension]).order_by(order_by)[:JOB_LIMIT].values(*values))
jobs.extend(Jobsarchived4.objects.filter(**query).extra(where=[wildCardExtension]).order_by(order_by)[:JOB_LIMIT].values(*values))
listJobs = [Jobsarchived4, Jobsactive4, Jobswaiting4, Jobsdefined4]
if not noarchjobs:
queryFrozenStates = []
Expand Down Expand Up @@ -1454,7 +1460,9 @@ def jobList(request, mode=None, param=None):
'fulllist' in request.session['requestParams'] and (
request.session['requestParams']['fulllist'] == 'true'))):
del query['modificationtime__castdate__range']
archJobs = Jobsarchived.objects.filter(**query).extra(where=[wildCardExtension])[:JOB_LIMIT].values(*values)
# order by statechangetime to get recent jobs as it is an index
order_by = '-statechangetime'
archJobs = Jobsarchived.objects.filter(**query).extra(where=[wildCardExtension]).order_by(order_by)[:JOB_LIMIT].values(*values)
listJobs.append(Jobsarchived)
totalJobs = len(archJobs)
jobs.extend(archJobs)
Expand Down Expand Up @@ -1543,55 +1551,44 @@ def jobList(request, mode=None, param=None):
url_nolimit = request.get_full_path()
njobsmax = display_limit

sortby = 'time-descending'
sortby_reverse = True
sortby_key = 'modificationtime'
if fileid:
sortby = "fileattemptnr-descending"
if 'computingsite' in request.session['requestParams']:
sortby = 'time-descending'
if 'jeditaskid' in request.session['requestParams']:
sortby = "attemptnr-descending"
if 'sortby' in request.session['requestParams']:
sortby = request.session['requestParams']['sortby']

if sortby == 'create-ascending':
jobs = sorted(jobs,
key=lambda x: x['creationtime'] if not x['creationtime'] is None else datetime(1900, 1, 1))
if sortby == 'create-descending':
jobs = sorted(jobs,
key=lambda x: x['creationtime'] if not x['creationtime'] is None else datetime(1900, 1, 1),
reverse=True)
if sortby == 'time-ascending':
jobs = sorted(jobs,
key=lambda x: x['modificationtime'] if not x['modificationtime'] is None else datetime(1900,
1, 1))
if sortby == 'time-descending':
jobs = sorted(jobs,
key=lambda x: x['modificationtime'] if not x['modificationtime'] is None else datetime(1900,
1, 1),
reverse=True)
if sortby == 'statetime':
jobs = sorted(jobs,
key=lambda x: x['statechangetime'] if not x['statechangetime'] is None else datetime(1900, 1,
1),
reverse=True)
elif sortby == 'priority':
jobs = sorted(jobs, key=lambda x: x['currentpriority'] if not x['currentpriority'] is None else 0,
reverse=True)
elif sortby == 'attemptnr':
jobs = sorted(jobs, key=lambda x: x['attemptnr'], reverse=True)
elif sortby == 'duration-ascending':
jobs = sorted(jobs, key=lambda x: x['durationsec'])
elif sortby == 'duration-descending':
jobs = sorted(jobs, key=lambda x: x['durationsec'], reverse=True)
elif sortby == 'duration':
jobs = sorted(jobs, key=lambda x: x['durationsec'])
elif sortby == 'PandaID':
jobs = sorted(jobs, key=lambda x: x['pandaid'], reverse=True)
elif fileid:
sortby = "fileattemptnr-descending"
jobs = sorted(jobs, key=lambda x: x['fileattemptnr'], reverse=True)
elif 'computingsite' in request.session['requestParams']:
sortby = 'time-descending'
jobs = sorted(jobs,
key=lambda x: x['modificationtime'] if x['modificationtime'] is not None else datetime(1900, 1,
1),
reverse=True)
else:
sortby = "attemptnr-descending,pandaid-descending"
jobs = sorted(jobs, key=lambda x: [-x['attemptnr'], -x['pandaid']])
if sortby:
if sortby.endswith('-descending'):
sortby_reverse = True
elif sortby.endswith('-ascending'):
sortby_reverse = False

if sortby.startswith('create'):
sortby_key = 'creationtime'
elif sortby.startswith('time'):
sortby_key = 'modificationtime'
elif sortby.startswith('statetime'):
sortby_key = 'statechangetime'
elif sortby.startswith('priority'):
sortby_key = 'currentpriority'
elif sortby.startswith('duration'):
sortby_key = 'durationsec'
elif sortby.startswith('attemptnr'):
sortby_key = 'attemptnr'
elif sortby.startswith('PandaID'):
sortby_key = 'pandaid'

if 'time' in sortby_key:
# use default date for sorting if it is none
jobs = sorted(jobs, key=lambda x: x[sortby_key] if not None else datetime(1900, 1, 1), reverse=sortby_reverse)
else:
jobs = sorted(jobs, key=lambda x: x[sortby_key], reverse=sortby_reverse)
_logger.debug('Sorted joblist: {}'.format(time.time() - request.session['req_init_time']))

taskname = ''
Expand All @@ -1614,11 +1611,14 @@ def jobList(request, mode=None, param=None):
else:
showwarn = 1

sumd, esjobdict = job_summary_dict(
request,
jobs,
standard_fields + [
'corecount', 'noutputdatafiles', 'actualcorecount', 'schedulerid', 'pilotversion', 'computingelement',
'container_name', 'nevents'
])
# Sort in order to see the most important tasks
sumd, esjobdict = job_summary_dict(request, jobs,
standard_fields + ['corecount', 'noutputdatafiles', 'actualcorecount',
'schedulerid', 'pilotversion', 'computingelement',
'container_name', 'nevents'])
if sumd:
for item in sumd:
if item['field'] == 'jeditaskid':
Expand All @@ -1638,8 +1638,8 @@ def jobList(request, mode=None, param=None):
job['maxpss'] = "%0.2f" % (job['maxpss'] / 1024.)

testjobs = False
if 'prodsourcelabel' in request.session['requestParams'] and request.session['requestParams'][
'prodsourcelabel'].lower().find('test') >= 0:
if 'prodsourcelabel' in request.session['requestParams'] and (
request.session['requestParams']['prodsourcelabel'].lower().find('test') >= 0):
testjobs = True

errsByCount, _, _, _, errdSumd, _ = errorSummaryDict(request, jobs, testjobs, output=['errsByCount', 'errdSumd'])
Expand All @@ -1658,7 +1658,7 @@ def jobList(request, mode=None, param=None):
_logger.debug(
'Got file info for list of jobs to be shown: {}'.format(time.time() - request.session['req_init_time']))

# Getting PQ status for for list of jobs to be shown
# Getting PQ status for list of jobs to be shown
pq_dict = get_panda_queues()
for job in jobsToShow:
if job['computingsite'] in pq_dict:
Expand Down

0 comments on commit b022805

Please sign in to comment.