Skip to content

Commit

Permalink
Merge pull request #138 from HSF/dev
Browse files Browse the repository at this point in the history
replace update_contents_to_others
  • Loading branch information
wguanicedew authored Feb 23, 2023
2 parents e2366d5 + 16c3743 commit 2a6041f
Show file tree
Hide file tree
Showing 7 changed files with 139 additions and 49 deletions.
26 changes: 23 additions & 3 deletions main/etc/sql/oracle_update.sql
Original file line number Diff line number Diff line change
Expand Up @@ -275,19 +275,39 @@ CREATE OR REPLACE procedure update_contents_from_others(request_id_in NUMBER, tr
BEGIN
update (select c.content_id, c.substatus as c_substatus, t.substatus as t_substatus from contents c inner join
(select content_id, substatus from contents where request_id = request_id_in and content_relation_type = 1) t
on c.content_dep_id = t.content_id where c.request_id = request_id_in and c.transform_id = transform_id_in and c.substatus != t.substatus) set c_substatus = t_substatus;
on c.content_dep_id = t.content_id where c.request_id = request_id_in and c.transform_id = transform_id_in and c.content_relation_type = 3 and c.substatus != t.substatus) set c_substatus = t_substatus;
END;


CREATE OR REPLACE procedure update_contents_to_others(request_id_in NUMBER, transform_id_in NUMBER) AS
BEGIN
update (select c.content_id, c.substatus as c_substatus, t.substatus as t_substatus from contents c inner join
(select content_id, substatus from contents where request_id = request_id_in and transform_id = transform_id_in and content_relation_type = 1) t
on c.content_dep_id = t.content_id where c.request_id = request_id_in and c.substatus != t.substatus) set c_substatus = t_substatus;
on c.content_dep_id = t.content_id where c.request_id = request_id_in and c.content_relation_type = 3 and c.substatus != t.substatus) set c_substatus = t_substatus;
END;



-- 2023.02.14
drop index CONTENTS_REQ_TF_COLL_IDX
CREATE INDEX CONTENTS_REQ_TF_COLL_IDX ON CONTENTS (request_id, transform_id, workload_id, coll_id, status, substatus) LOCAL;
CREATE INDEX CONTENTS_REQ_TF_COLL_IDX ON CONTENTS (request_id, transform_id, workload_id, coll_id, content_relation_type, status, substatus) LOCAL;


-- 2023.02.22

CREATE OR REPLACE procedure update_contents_from_others(request_id_in NUMBER, transform_id_in NUMBER) AS
BEGIN
update (select c.content_id, c.substatus as c_substatus, t.substatus as t_substatus from
(select content_id, substatus, content_dep_id from contents where request_id = request_id_in and transform_id = transform_id_in and content_relation_type = 3) c inner join
(select content_id, substatus from contents where request_id = request_id_in and content_relation_type = 1) t
on c.content_dep_id = t.content_id where c.substatus != t.substatus) set c_substatus = t_substatus;
END;


CREATE OR REPLACE procedure update_contents_to_others(request_id_in NUMBER, transform_id_in NUMBER) AS
BEGIN
update (select c.content_id, c.substatus as c_substatus, t.substatus as t_substatus from
(select content_id, substatus, content_dep_id from contents where request_id = request_id_in and content_relation_type = 3) c inner join
(select content_id, substatus from contents where request_id = request_id_in and transform_id = transform_id_in and content_relation_type = 1) t
on c.content_dep_id = t.content_id where c.substatus != t.substatus) set c_substatus = t_substatus;
END;
15 changes: 5 additions & 10 deletions main/lib/idds/agents/carrier/trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,12 +186,13 @@ def process_trigger_processing_real(self, event):
self.update_processing(ret, pr)

if new_update_contents or ret_update_contents:
self.logger.info(log_pre + "update_contents_to_others_by_dep_id")
core_catalog.update_contents_to_others_by_dep_id(request_id=pr['request_id'], transform_id=pr['transform_id'])
self.logger.info(log_pre + "update_contents_to_others_by_dep_id done")
# self.logger.info(log_pre + "update_contents_to_others_by_dep_id")
# core_catalog.update_contents_to_others_by_dep_id(request_id=pr['request_id'], transform_id=pr['transform_id'])
# self.logger.info(log_pre + "update_contents_to_others_by_dep_id done")

# core_catalog.delete_contents_update(request_id=pr['request_id'], transform_id=pr['transform_id'])
update_transforms = core_catalog.get_updated_transforms_by_content_status(request_id=pr['request_id'])
update_transforms = core_catalog.get_updated_transforms_by_content_status(request_id=pr['request_id'],
transform_id=pr['transform_id'])
self.logger.info(log_pre + "update_transforms: %s" % str(update_transforms))
for update_transform in update_transforms:
if 'transform_id' in update_transform:
Expand All @@ -202,12 +203,6 @@ def process_trigger_processing_real(self, event):
content={'event': 'Trigger'})
self.logger.info(log_pre + "Trigger UpdateTransformEvent(transform_id: %s)" % update_transform_id)
self.event_bus.send(event)
else:
ret1 = self.handle_trigger_processing(pr)
new_update_contents = ret1.get('new_update_contents', None)
ret1['new_update_contents'] = None
self.update_processing(ret1, pr)
# pass

if (('processing_status' in ret and ret['processing_status'] == ProcessingStatus.Terminating)
or (event._content and 'Terminated' in event._content and event._content['Terminated'])): # noqa W503
Expand Down
104 changes: 82 additions & 22 deletions main/lib/idds/agents/carrier/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import json
import logging
import time
import threading

from idds.common.constants import (ProcessingStatus,
CollectionStatus,
Expand Down Expand Up @@ -984,7 +985,7 @@ def handle_trigger_processing(processing, agent_attributes, trigger_new_updates=
work = proc.work
work.set_agent_attributes(agent_attributes, processing)

if not work.use_dependency_to_release_jobs():
if (not work.use_dependency_to_release_jobs()) or workload_id is None:
return processing['substatus'], [], [], {}, {}, {}, []
else:
if trigger_new_updates:
Expand Down Expand Up @@ -1063,19 +1064,38 @@ def get_collection_id_transform_id_map(coll_id, request_id, request_ids=[]):
return coll_tf_id_map[coll_id]


def get_workload_id_transform_id_map(workload_id):
workload_id_lock = threading.Lock()


def get_workload_id_transform_id_map(workload_id, logger=None, log_prefix=''):
cache = get_redis_cache()
workload_id_transform_id_map_key = "all_worloadid2transformid_map"
workload_id_transform_id_map = cache.get(workload_id_transform_id_map_key, default={})

workload_id_transform_id_map_notexist_key = "all_worloadid2transformid_map_notexist"
workload_id_transform_id_map_notexist = cache.get(workload_id_transform_id_map_notexist_key, default=[])
workload_id_transform_id_map_notexist = cache.get(workload_id_transform_id_map_notexist_key, default={})

if type(workload_id_transform_id_map_notexist) in (list, tuple):
workload_id_transform_id_map_notexist = {}

if workload_id in workload_id_transform_id_map_notexist:
workload_id_str = str(workload_id)
if workload_id_str in workload_id_transform_id_map:
return workload_id_transform_id_map[workload_id_str]

if workload_id_str in workload_id_transform_id_map_notexist and workload_id_transform_id_map_notexist[workload_id_str] + 600 < time.time():
return None

# lock area
workload_id_lock.acquire()

workload_id_transform_id_map = cache.get(workload_id_transform_id_map_key, default={})
workload_id_transform_id_map_notexist = cache.get(workload_id_transform_id_map_notexist_key, default={})

if type(workload_id_transform_id_map_notexist) in (list, tuple):
workload_id_transform_id_map_notexist = {}

request_ids = []
if not workload_id_transform_id_map or workload_id not in workload_id_transform_id_map or len(workload_id_transform_id_map[workload_id]) < 5:
if not workload_id_transform_id_map or workload_id_str not in workload_id_transform_id_map or len(workload_id_transform_id_map[workload_id_str]) < 5:
processing_status = [ProcessingStatus.New,
ProcessingStatus.Submitting, ProcessingStatus.Submitted,
ProcessingStatus.Running, ProcessingStatus.FinishedOnExec,
Expand All @@ -1091,26 +1111,42 @@ def get_workload_id_transform_id_map(workload_id):
processing = proc['processing_metadata']['processing']
work = processing.work
if work.use_dependency_to_release_jobs():
workload_id_transform_id_map[proc['workload_id']] = (proc['request_id'],
proc['transform_id'],
proc['processing_id'],
proc['status'].value,
proc['substatus'].value)
workload_id_transform_id_map[str(proc['workload_id'])] = (proc['request_id'],
proc['transform_id'],
proc['processing_id'],
proc['status'].value,
proc['substatus'].value)
if proc['request_id'] not in request_ids:
request_ids.append(proc['request_id'])

cache.set(workload_id_transform_id_map_key, workload_id_transform_id_map)

for key in workload_id_transform_id_map:
if key in workload_id_transform_id_map_notexist:
del workload_id_transform_id_map_notexist[key]

# renew the collection to transform map
if request_ids:
get_collection_id_transform_id_map(coll_id=None, request_id=request_ids[0], request_ids=request_ids)

keys = list(workload_id_transform_id_map_notexist.keys())
for key in keys:
if workload_id_transform_id_map_notexist[key] + 7200 < time.time():
del workload_id_transform_id_map_notexist[key]

cache.set(workload_id_transform_id_map_notexist_key, workload_id_transform_id_map_notexist)
# for tasks running in some other instances
if workload_id not in workload_id_transform_id_map:
workload_id_transform_id_map_notexist.append(workload_id)
if workload_id_str not in workload_id_transform_id_map:
if workload_id_str not in workload_id_transform_id_map_notexist:
workload_id_transform_id_map_notexist[workload_id_str] = time.time()
cache.set(workload_id_transform_id_map_notexist_key, workload_id_transform_id_map_notexist)

workload_id_lock.release()
return None
else:
workload_id_lock.release()

return workload_id_transform_id_map[workload_id]
return workload_id_transform_id_map[workload_id_str]


def get_input_name_content_id_map(request_id, workload_id, transform_id):
Expand Down Expand Up @@ -1162,6 +1198,9 @@ def get_content_id_from_job_id(request_id, workload_id, transform_id, job_id, in
return content_id, to_update_jobid


pending_lock = threading.Lock()


def whether_to_process_pending_workload_id(workload_id, logger=None, log_prefix=''):
cache = get_redis_cache()
processed_pending_workload_id_map_key = "processed_pending_workload_id_map"
Expand All @@ -1173,15 +1212,21 @@ def whether_to_process_pending_workload_id(workload_id, logger=None, log_prefix=
if workload_id in processed_pending_workload_id_map:
return False

# lock area
pending_lock.acquire()
processed_pending_workload_id_map = cache.get(processed_pending_workload_id_map_key, default={})

processed_pending_workload_id_map[workload_id] = time.time()
if processed_pending_workload_id_map_time is None or processed_pending_workload_id_map_time + 86400 < time.time():
cache.set(processed_pending_workload_id_map_time_key, int(time.time()), expire_seconds=86400)

for workload_id in processed_pending_workload_id_map.keys():
keys = list(processed_pending_workload_id_map.keys())
for workload_id in keys:
if processed_pending_workload_id_map[workload_id] + 86400 < time.time():
del processed_pending_workload_id_map[workload_id]

cache.set(processed_pending_workload_id_map_key, processed_pending_workload_id_map, expire_seconds=86400)
pending_lock.release()
return True


Expand All @@ -1199,19 +1244,19 @@ def handle_messages_processing(messages, logger=None, log_prefix=''):
if 'taskid' not in msg or not msg['taskid']:
continue

workload_id = msg['taskid']
ret_req_tf_pr_id = get_workload_id_transform_id_map(workload_id)
if not ret_req_tf_pr_id:
# request is submitted by some other instances
continue

logger.debug(log_prefix + "Received message: %s" % str(ori_msg))
logger.debug(log_prefix + "(request_id, transform_id, processing_id, status, substatus): %s" % str(ret_req_tf_pr_id))

if msg['msg_type'] in ['task_status']:
workload_id = msg['taskid']
status = msg['status']
if status in ['pending']: # 'prepared'
ret_req_tf_pr_id = get_workload_id_transform_id_map(workload_id, logger=logger, log_prefix=log_prefix)
if not ret_req_tf_pr_id:
# request is submitted by some other instances
logger.debug(log_prefix + "No matched workload_id, discard message: %s" % str(ori_msg))
continue

logger.debug(log_prefix + "(request_id, transform_id, processing_id, status, substatus): %s" % str(ret_req_tf_pr_id))
req_id, tf_id, processing_id, r_status, r_substatus = ret_req_tf_pr_id
if whether_to_process_pending_workload_id(workload_id, logger=logger, log_prefix=log_prefix):
# new_processings.append((req_id, tf_id, processing_id, workload_id, status))
Expand All @@ -1221,6 +1266,13 @@ def handle_messages_processing(messages, logger=None, log_prefix=''):
else:
logger.debug(log_prefix + "Processing %s is already processed, not add it to update processing" % (str(processing_id)))
elif status in ['finished', 'done']:
ret_req_tf_pr_id = get_workload_id_transform_id_map(workload_id, logger=logger, log_prefix=log_prefix)
if not ret_req_tf_pr_id:
# request is submitted by some other instances
logger.debug(log_prefix + "No matched workload_id, discard message: %s" % str(ori_msg))
continue

logger.debug(log_prefix + "(request_id, transform_id, processing_id, status, substatus): %s" % str(ret_req_tf_pr_id))
req_id, tf_id, processing_id, r_status, r_substatus = ret_req_tf_pr_id
# update_processings.append((processing_id, status))
if processing_id not in update_processings:
Expand All @@ -1233,6 +1285,14 @@ def handle_messages_processing(messages, logger=None, log_prefix=''):
status = msg['status']
inputs = msg['inputs']
if inputs and status in ['finished']:
ret_req_tf_pr_id = get_workload_id_transform_id_map(workload_id, logger=logger, log_prefix=log_prefix)
if not ret_req_tf_pr_id:
# request is submitted by some other instances
logger.debug(log_prefix + "No matched workload_id, discard message: %s" % str(ori_msg))
continue

logger.debug(log_prefix + "(request_id, transform_id, processing_id, status, substatus): %s" % str(ret_req_tf_pr_id))

req_id, tf_id, processing_id, r_status, r_substatus = ret_req_tf_pr_id
content_id, to_update_jobid = get_content_id_from_job_id(req_id, workload_id, tf_id, job_id, inputs)
if content_id:
Expand Down Expand Up @@ -1355,7 +1415,7 @@ def sync_collection_status(request_id, transform_id, workload_id, work, input_ou
'failed_ext_files': coll.failed_ext_files,
'missing_ext_files': coll.missing_ext_files}

if coll in input_collections:
if coll in input_collections and (workload_id is not None):
if coll.total_files == coll.processed_files + coll.failed_files + coll.missing_files:
coll_db = core_catalog.get_collection(coll_id=coll.coll_id)
coll.status = coll_db['status']
Expand Down
10 changes: 5 additions & 5 deletions main/lib/idds/orm/base/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,7 @@ class Content(BASE, ModelBase):
Index('CONTENTS_STATUS_UPDATED_IDX', 'status', 'locking', 'updated_at', 'created_at'),
Index('CONTENTS_ID_NAME_IDX', 'coll_id', 'scope', 'name', 'status'),
Index('CONTENTS_DEP_IDX', 'request_id', 'transform_id', 'content_dep_id'),
Index('CONTENTS_REQ_TF_COLL_IDX', 'request_id', 'transform_id', 'workload_id', 'coll_id', 'status', 'substatus'))
Index('CONTENTS_REQ_TF_COLL_IDX', 'request_id', 'transform_id', 'workload_id', 'coll_id', 'content_relation_type', 'status', 'substatus'))


class Content_update(BASE, ModelBase):
Expand Down Expand Up @@ -833,10 +833,10 @@ def create_proc_to_update_contents():
BEGIN
UPDATE %s.contents set substatus = d.substatus from
(select content_id, content_dep_id, substatus from %s.contents where request_id = request_id_in and transform_id = transform_id_in and content_relation_type = 1 and status != 0) d
where %s.contents.request_id = request_id_in and %s.contents.substatus != d.substatus and d.content_id = %s.contents.content_dep_id;
where %s.contents.request_id = request_id_in and %s.contents.content_relation_type = 3 and %s.contents.substatus != d.substatus and d.content_id = %s.contents.content_dep_id;
END;
$$ LANGUAGE PLPGSQL
""" % (DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME,
""" % (DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME,
DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME))

func2 = DDL("""
Expand All @@ -847,11 +847,11 @@ def create_proc_to_update_contents():
UPDATE %s.contents set substatus = d.substatus from
(select content_id, content_dep_id, substatus from %s.contents where request_id = request_id_in and content_relation_type = 1 and status != 0) d
where %s.contents.request_id = request_id_in and %s.contents.transform_id = transform_id_in and %s.contents.substatus != d.substatus and d.content_id = %s.contents.content_dep_id;
where %s.contents.request_id = request_id_in and %s.contents.transform_id = transform_id_in and %s.contents.content_relation_type = 3 and %s.contents.substatus != d.substatus and d.content_id = %s.contents.content_dep_id;
END;
$$ LANGUAGE PLPGSQL
""" % (DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME,
DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME))
DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME))

event.listen(Content.__table__, "after_create", func1.execute_if(dialect="postgresql"))
event.listen(Content.__table__, "after_create", func2.execute_if(dialect="postgresql"))
Expand Down
Loading

0 comments on commit 2a6041f

Please sign in to comment.