Skip to content

Commit

Permalink
Merge pull request #261 from PanDAWMS/dev
Browse files Browse the repository at this point in the history
core | make sure we create a tmp table before inserting data for postgres backend
  • Loading branch information
tkorchug authored Jan 31, 2024
2 parents 712db46 + 9e6b6a2 commit f4c05da
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 80 deletions.
12 changes: 2 additions & 10 deletions core/datacarousel/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from core.reports.sendMail import send_mail_bp
from core.reports.models import ReportEmails
from core.views import setupView
from core.libs.exlib import dictfetchall, get_tmp_table_name, convert_epoch_to_datetime
from core.libs.exlib import dictfetchall, get_tmp_table_name, convert_epoch_to_datetime, insert_to_temp_table
from core.libs.elasticsearch import create_es_connection
from core.schedresource.utils import getCRICSEs
from core.filebrowser.ruciowrapper import ruciowrapper
Expand Down Expand Up @@ -143,15 +143,7 @@ def getStagingData(request):
if 'jeditaskid' in request.session['requestParams']:
jeditaskid = request.session['requestParams']['jeditaskid']
taskl = [int(jeditaskid)] if '|' not in jeditaskid else [int(taskid) for taskid in jeditaskid.split('|')]
new_cur = connection.cursor()
transactionKey = random.randrange(1000000)
executionData = []
for id in taskl:
executionData.append((id, transactionKey))

query = """insert into """ + tmpTableName + """(id,transactionkey) values (%s, %s)"""
new_cur.executemany(query, executionData)
connection.commit()
transactionKey = insert_to_temp_table(taskl)
selection += "and t2.taskid in (SELECT tmp.id FROM %s tmp where transactionkey=%i)" % (tmpTableName, transactionKey)
else:
selection += "and t2.TASKID in (select taskid from ATLAS_DEFT.T_ACTION_STAGING)"
Expand Down
33 changes: 10 additions & 23 deletions core/errorsscattering/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from django.db import connection
from django.utils.cache import patch_response_headers
from core.libs.cache import getCacheEntry, setCacheEntry, setCacheData
from core.libs.exlib import dictfetchall, get_tmp_table_name, create_temporary_table
from core.libs.exlib import dictfetchall, get_tmp_table_name, insert_to_temp_table
from core.libs.DateEncoder import DateEncoder
from core.oauth.utils import login_customrequired
from core.views import initRequest, setupView
Expand Down Expand Up @@ -61,24 +61,17 @@ def errorsScattering(request):

# print ('tasks found %i') % len(tasks)

random.seed()
taskListByReq = {}
transactionKey = random.randrange(1000000)
executionData = []
task_ids = []
for id in tasks:
executionData.append((id['jeditaskid'], transactionKey))
task_ids.append(id['jeditaskid'])
# full the list of jeditaskids for each reqid to put into cache for consistentcy with jobList
if id['reqid'] not in taskListByReq:
taskListByReq[id['reqid']] = ''
taskListByReq[id['reqid']] += str(id['jeditaskid']) + ','

new_cur = connection.cursor()
tmpTableName = get_tmp_table_name()
if settings.DEPLOYMENT == "POSTGRES":
create_temporary_table(new_cur, tmpTableName)
ins_query = """insert into """ + tmpTableName + """(id,transactionkey) values (%s, %s)"""
new_cur.executemany(ins_query, executionData)
connection.commit()
transactionKey = insert_to_temp_table(task_ids)

jcondition = '(1=1)'
if isExcludeScouts:
Expand Down Expand Up @@ -106,7 +99,7 @@ def errorsScattering(request):
where j.allc > 0
""".format(settings.DB_SCHEMA_PANDA, tmpTableName, transactionKey, query['modificationtime__castdate__range'][0], jcondition,
settings.DB_SCHEMA_PANDA_ARCH, settings.DB_SCHEMA_PANDA_META)

new_cur = connection.cursor()
new_cur.execute(querystr)

errorsRaw = dictfetchall(new_cur)
Expand Down Expand Up @@ -351,23 +344,17 @@ def errorsScatteringDetailed(request, cloud, reqid):

print ('tasks found %i' % (len(tasks)))

random.seed()
tmpTableName = get_tmp_table_name()

taskListByReq = {}
transactionKey = random.randrange(1000000)
executionData = []
task_ids = []
for id in tasks:
executionData.append((id['jeditaskid'], transactionKey))
task_ids.append(id['jeditaskid'])
# full the list of jeditaskids for each reqid to put into cache for consistentcy with jobList
if id['reqid'] not in taskListByReq:
taskListByReq[id['reqid']] = ''
taskListByReq[id['reqid']] += str(id['jeditaskid']) + ','

new_cur = connection.cursor()
insquery = """insert into """ + tmpTableName + """(id,transactionkey) values (%s, %s)"""
new_cur.executemany(insquery, executionData)
connection.commit()
tmpTableName = get_tmp_table_name()
transactionKey = insert_to_temp_table(task_ids)

jcondition = '(1=1)'
if isExcludeScouts:
Expand Down Expand Up @@ -405,7 +392,7 @@ def errorsScatteringDetailed(request, cloud, reqid):
settings.DB_SCHEMA_PANDA,
settings.DB_SCHEMA_PANDA_ARCH
)

new_cur = connection.cursor()
new_cur.execute(querystr)

errorsRaw = dictfetchall(new_cur)
Expand Down
34 changes: 33 additions & 1 deletion core/libs/eventservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,4 +152,36 @@ def add_event_summary_to_tasklist(tasks, transaction_key=None):
if task['jeditaskid'] in event_info_dict.keys():
task['eventsData'] = event_info_dict[task['jeditaskid']]

return tasks
return tasks


def get_event_status_summary(pandaids, eventservicestatelist):
"""
Getting event statuses summary for list of pandaids of ES jobs
:param pandaids: list
:return: dict of status: nevents
"""
summary = {}

tmpTableName = get_tmp_table_name()

transactionKey = insert_to_temp_table(pandaids)
new_cur = connection.cursor()
new_cur.execute(
"""
select status, count(status) as countstat
from (
select /*+ dynamic_sampling(tmp_ids1 0) cardinality(tmp_ids1 10) index_rs_asc(ev jedi_events_pandaid_status_idx) no_index_ffs(ev jedi_events_pk) no_index_ss(ev jedi_events_pk) */ pandaid, status
from {2}.jedi_events ev, {0}
where transactionkey = {1} and pandaid = id
) t1
group by status""".format(tmpTableName, transactionKey, settings.DB_SCHEMA_PANDA))

evtable = dictfetchall(new_cur)


for ev in evtable:
evstat = eventservicestatelist[ev['STATUS']]
summary[evstat] = ev['COUNTSTAT']

return summary
38 changes: 0 additions & 38 deletions core/libs/exlib.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,44 +76,6 @@ def insert_to_temp_table(list_of_items, transactionKey = -1):
return transactionKey


def get_event_status_summary(pandaids, eventservicestatelist):
"""
Getting event statuses summary for list of pandaids of ES jobs
:param pandaids: list
:return: dict of status: nevents
"""
summary = {}

tmpTableName = get_tmp_table_name()

transactionKey = random.randrange(1000000)

new_cur = connection.cursor()
executionData = []
for id in pandaids:
executionData.append((id, transactionKey))
query = """insert into """ + tmpTableName + """(id,transactionkey) values (%s, %s)"""
new_cur.executemany(query, executionData)

new_cur.execute(
"""
select status, count(status) as countstat
from (
select /*+ dynamic_sampling(tmp_ids1 0) cardinality(tmp_ids1 10) index_rs_asc(ev jedi_events_pandaid_status_idx) no_index_ffs(ev jedi_events_pk) no_index_ss(ev jedi_events_pk) */ pandaid, status
from {2}.jedi_events ev, {0}
where transactionkey = {1} and pandaid = id
) t1
group by status""".format(tmpTableName, transactionKey, settings.DB_SCHEMA_PANDA))

evtable = dictfetchall(new_cur)

for ev in evtable:
evstat = eventservicestatelist[ev['STATUS']]
summary[evstat] = ev['COUNTSTAT']

return summary


def dictfetchall(cursor, **kwargs):
"Returns all rows from a cursor as a dict"
style = 'default'
Expand Down
3 changes: 2 additions & 1 deletion core/pandajob/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
Jobsarchived_y2018, Jobsarchived_y2019, Jobsarchived_y2020, Jobsarchived_y2021, Jobsarchived, Jobsarchived4
from core.libs.datetimestrings import parse_datetime
from core.libs.job import is_event_service
from core.libs.exlib import get_event_status_summary, split_into_intervals
from core.libs.eventservice import get_event_status_summary
from core.libs.exlib import split_into_intervals

from django.conf import settings
import core.constants as const
Expand Down
8 changes: 1 addition & 7 deletions core/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -7586,14 +7586,8 @@ def g4exceptions(request):
if 'amitag' in request.session['requestParams']:

tmpTableName = get_tmp_table_name()

transactionKey = random.randrange(1000000)
transactionKey = insert_to_temp_table([job['pandaid'] for job in jobs]) # Backend dependable
new_cur = connection.cursor()
if settings.DEPLOYMENT == "POSTGRES":
create_temporary_table(new_cur, tmpTableName)
for job in jobs:
new_cur.execute("INSERT INTO %s(ID,TRANSACTIONKEY) VALUES (%i,%i)" % (
tmpTableName, job['pandaid'], transactionKey)) # Backend dependable
new_cur.execute("""
SELECT JOBPARAMETERS, PANDAID
FROM {}.JOBPARAMSTABLE
Expand Down

0 comments on commit f4c05da

Please sign in to comment.