Skip to content

Commit

Permalink
Merge pull request #315 from wguanicedew/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
wguanicedew authored Jun 7, 2024
2 parents b3e5763 + aff705c commit d5d924b
Show file tree
Hide file tree
Showing 20 changed files with 779 additions and 141 deletions.
9 changes: 9 additions & 0 deletions common/lib/idds/common/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ def event_type(self):
return self._event_type.name

def able_to_merge(self, event):
"""
if self._event_type == event._event_type and self.get_event_id() == event.get_event_id():
return True
if self._event_type == event._event_type and self.get_event_id() == event.get_event_id() and self._counter == event._counter:
Expand All @@ -98,6 +99,14 @@ def able_to_merge(self, event):
# ddiff = DeepDiff(self._content, event._content, ignore_order=True)
# if not ddiff:
# return True
"""
if self._event_type == event._event_type:
if (self._content is None and event._content is None):
return True
elif (self._content is not None and event._content is not None):
ddiff = DeepDiff(self._content, event._content, ignore_order=True)
if not ddiff:
return True
return False

def changed(self):
Expand Down
6 changes: 3 additions & 3 deletions main/etc/sql/postgresql.sql
Original file line number Diff line number Diff line change
Expand Up @@ -548,15 +548,15 @@ CREATE INDEX "CONTENTS_ID_NAME_IDX" ON doma_idds.contents (coll_id, scope, md5('
$$ LANGUAGE PLPGSQL


CREATE SEQUENCE doma_idds."METAINFO_ID_SEQ" START WITH 1
CREATE SEQUENCE doma_idds."METAINFO_ID_SEQ" START WITH 1;
CREATE TABLE meta_info
(
meta_id BIGINT NOT NULL,
name VARCHAR2(50),
name VARCHAR(50),
status INTEGER,
created_at TIMESTAMP WITHOUT TIME ZONE NOT NULL,
updated_at TIMESTAMP WITHOUT TIME ZONE NOT NULL,
description VARCHAR2(1000),
description VARCHAR(1000),
meta_info JSONB,
CONSTRAINT "METAINFO_PK" PRIMARY KEY (meta_id), -- USING INDEX LOCAL,
CONSTRAINT "METAINFO_NAME_UQ" UNIQUE (name)
Expand Down
127 changes: 110 additions & 17 deletions main/lib/idds/agents/carrier/finisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
handle_resume_processing,
# is_process_terminated,
sync_processing)
from .iutils import sync_iprocessing
from .iutils import sync_iprocessing, handle_abort_iprocessing, handle_resume_iprocessing
from .poller import Poller

setup_logging(__name__)
Expand Down Expand Up @@ -346,6 +346,43 @@ def handle_abort_processing(self, processing, log_prefix=""):
return ret
return None

def handle_abort_iprocessing(self, processing, log_prefix=""):
"""
process abort processing
"""
try:
plugin = None
if processing['processing_type']:
plugin_name = processing['processing_type'].name.lower() + '_poller'
plugin = self.get_plugin(plugin_name)
else:
raise exceptions.ProcessSubmitFailed('No corresponding poller plugins for %s' % processing['processing_type'])

processing_status, update_collections, update_contents, messages = handle_abort_iprocessing(processing, self.agent_attributes, plugin=plugin, logger=self.logger, log_prefix=log_prefix)

update_processing = {'processing_id': processing['processing_id'],
'parameters': {'status': processing_status,
'substatus': ProcessingStatus.ToCancel,
'locking': ProcessingLocking.Idle}}
ret = {'update_processing': update_processing,
'update_collections': update_collections,
'update_contents': update_contents,
'messages': messages
}
return ret
except Exception as ex:
self.logger.error(ex)
self.logger.error(traceback.format_exc())
error = {'abort_err': {'msg': truncate_string('%s' % (ex), length=200)}}
update_processing = {'processing_id': processing['processing_id'],
'parameters': {'status': ProcessingStatus.ToCancel,
'locking': ProcessingLocking.Idle,
'errors': processing['errors'] if processing['errors'] else {}}}
update_processing['parameters']['errors'].update(error)
ret = {'update_processing': update_processing}
return ret
return None

def process_abort_processing(self, event):
self.number_workers += 1
pro_ret = ReturnCode.Ok.value
Expand Down Expand Up @@ -373,17 +410,27 @@ def process_abort_processing(self, event):
self.logger.info(log_pre + "process_abort_processing result: %s" % str(ret))
self.update_processing(ret, pr)
elif pr:
ret = self.handle_abort_processing(pr, log_prefix=log_pre)
ret_copy = {}
for ret_key in ret:
if ret_key != 'messages':
ret_copy[ret_key] = ret[ret_key]
self.logger.info(log_pre + "process_abort_processing result: %s" % str(ret_copy))
if pr['processing_type'] and pr['processing_type'] in [ProcessingType.iWorkflow, ProcessingType.iWork]:
ret = self.handle_abort_iprocessing(pr, log_prefix=log_pre)
self.logger.info(log_pre + "process_abort_processing result: %s" % str(ret))

self.update_processing(ret, pr)
self.logger.info(log_pre + "UpdateTransformEvent(transform_id: %s)" % pr['transform_id'])
event = UpdateTransformEvent(publisher_id=self.id, transform_id=pr['transform_id'], content=event._content)
self.event_bus.send(event)
self.update_processing(ret, pr, use_bulk_update_mappings=False)

self.logger.info(log_pre + "UpdateTransformEvent(transform_id: %s)" % pr['transform_id'])
event = UpdateTransformEvent(publisher_id=self.id, transform_id=pr['transform_id'], content=event._content)
self.event_bus.send(event)
else:
ret = self.handle_abort_processing(pr, log_prefix=log_pre)
ret_copy = {}
for ret_key in ret:
if ret_key != 'messages':
ret_copy[ret_key] = ret[ret_key]
self.logger.info(log_pre + "process_abort_processing result: %s" % str(ret_copy))

self.update_processing(ret, pr)
self.logger.info(log_pre + "UpdateTransformEvent(transform_id: %s)" % pr['transform_id'])
event = UpdateTransformEvent(publisher_id=self.id, transform_id=pr['transform_id'], content=event._content)
self.event_bus.send(event)
except Exception as ex:
self.logger.error(ex)
self.logger.error(traceback.format_exc())
Expand Down Expand Up @@ -419,6 +466,42 @@ def handle_resume_processing(self, processing, log_prefix=""):
return ret
return None

def handle_resume_iprocessing(self, processing, log_prefix=""):
"""
process resume processing
"""
try:
plugin = None
if processing['processing_type']:
plugin_name = processing['processing_type'].name.lower() + '_poller'
plugin = self.get_plugin(plugin_name)
else:
raise exceptions.ProcessSubmitFailed('No corresponding poller plugins for %s' % processing['processing_type'])

processing_status, update_collections, update_contents = handle_resume_iprocessing(processing, self.agent_attributes, plugin=plugin, logger=self.logger, log_prefix=log_prefix)

update_processing = {'processing_id': processing['processing_id'],
'parameters': {'status': processing_status,
'substatus': ProcessingStatus.ToResume,
'locking': ProcessingLocking.Idle}}
ret = {'update_processing': update_processing,
'update_collections': update_collections,
'update_contents': update_contents,
}
return ret
except Exception as ex:
self.logger.error(ex)
self.logger.error(traceback.format_exc())
error = {'resume_err': {'msg': truncate_string('%s' % (ex), length=200)}}
update_processing = {'processing_id': processing['processing_id'],
'parameters': {'status': ProcessingStatus.ToResume,
'locking': ProcessingLocking.Idle,
'errors': processing['errors'] if processing['errors'] else {}}}
update_processing['parameters']['errors'].update(error)
ret = {'update_processing': update_processing}
return ret
return None

def process_resume_processing(self, event):
self.number_workers += 1
pro_ret = ReturnCode.Ok.value
Expand All @@ -445,14 +528,24 @@ def process_resume_processing(self, event):

self.update_processing(ret, pr)
elif pr:
ret = self.handle_resume_processing(pr, log_prefix=log_pre)
self.logger.info(log_pre + "process_resume_processing result: %s" % str(ret))
if pr['processing_type'] and pr['processing_type'] in [ProcessingType.iWorkflow, ProcessingType.iWork]:
ret = self.handle_resume_iprocessing(pr, log_prefix=log_pre)
self.logger.info(log_pre + "process_resume_processing result: %s" % str(ret))

self.update_processing(ret, pr, use_bulk_update_mappings=False)
self.update_processing(ret, pr, use_bulk_update_mappings=False)

self.logger.info(log_pre + "UpdateTransformEvent(transform_id: %s)" % pr['transform_id'])
event = UpdateTransformEvent(publisher_id=self.id, transform_id=pr['transform_id'], content=event._content)
self.event_bus.send(event)
self.logger.info(log_pre + "UpdateTransformEvent(transform_id: %s)" % pr['transform_id'])
event = UpdateTransformEvent(publisher_id=self.id, transform_id=pr['transform_id'], content=event._content)
self.event_bus.send(event)
else:
ret = self.handle_resume_processing(pr, log_prefix=log_pre)
self.logger.info(log_pre + "process_resume_processing result: %s" % str(ret))

self.update_processing(ret, pr, use_bulk_update_mappings=False)

self.logger.info(log_pre + "UpdateTransformEvent(transform_id: %s)" % pr['transform_id'])
event = UpdateTransformEvent(publisher_id=self.id, transform_id=pr['transform_id'], content=event._content)
self.event_bus.send(event)
except Exception as ex:
self.logger.error(ex)
self.logger.error(traceback.format_exc())
Expand Down
30 changes: 30 additions & 0 deletions main/lib/idds/agents/carrier/iutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,36 @@ def handle_update_iprocessing(processing, agent_attributes, plugin=None, max_upd
return status, [], [], [], [], [], [], []


def handle_abort_iprocessing(processing, agent_attributes, plugin=None, logger=None, log_prefix=''):
logger = get_logger(logger)

workload_id = processing['workload_id']

try:
status = plugin.abort(workload_id, logger=logger, log_prefix=log_prefix)
logger.info(log_prefix + "abort work (status: %s, workload_id: %s)" % (status, workload_id))
except Exception as ex:
err_msg = "abort work failed with exception: %s" % (ex)
logger.error(log_prefix + err_msg)
raise Exception(err_msg)
return status, [], [], []


def handle_resume_iprocessing(processing, agent_attributes, plugin=None, logger=None, log_prefix=''):
logger = get_logger(logger)

workload_id = processing['workload_id']

try:
status = plugin.resume(workload_id, logger=logger, log_prefix=log_prefix)
logger.info(log_prefix + "resume work (status: %s, workload_id: %s)" % (status, workload_id))
except Exception as ex:
err_msg = "resume work failed with exception: %s" % (ex)
logger.error(log_prefix + err_msg)
raise Exception(err_msg)
return status, [], []


def sync_iprocessing(processing, agent_attributes, terminate=False, abort=False, logger=None, log_prefix=""):
# logger = get_logger()

Expand Down
33 changes: 33 additions & 0 deletions main/lib/idds/agents/carrier/plugins/panda.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,3 +159,36 @@ def poll(self, workload_id, logger=None, log_prefix=''):
logger.error(log_prefix + str(ex))
logger.error(traceback.format_exc())
raise ex

def abort(self, workload_id, logger=None, log_prefix=''):
from pandaclient import Client

try:
if logger:
logger.info(log_prefix + f"aborting task {workload_id}")
Client.killTask(workload_id, soft=True)
status, task_status = Client.getTaskStatus(workload_id)
if status == 0:
return self.get_processing_status(task_status)
else:
msg = "Failed to abort task %s: status: %s, task_status: %s" % (workload_id, status, task_status)
raise Exception(msg)
except Exception as ex:
if logger:
logger.error(log_prefix + str(ex))
logger.error(traceback.format_exc())
raise ex

def resume(self, workload_id, logger=None, log_prefix=''):
from pandaclient import Client

try:
if logger:
logger.info(log_prefix + f"resuming task {workload_id}")
status, out = Client.retryTask(workload_id, newParams={})
return ProcessingStatus.Running
except Exception as ex:
if logger:
logger.error(log_prefix + str(ex))
logger.error(traceback.format_exc())
raise ex
2 changes: 1 addition & 1 deletion main/lib/idds/agents/carrier/poller.py
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ def handle_update_iprocessing(self, processing):
plugin_name = processing['processing_type'].name.lower() + '_poller'
plugin = self.get_plugin(plugin_name)
else:
raise exceptions.ProcessSubmitFailed('No corresponding submitter plugins for %s' % processing['processing_type'])
raise exceptions.ProcessSubmitFailed('No corresponding poller plugins for %s' % processing['processing_type'])

ret_handle_update_processing = handle_update_iprocessing(processing,
self.agent_attributes,
Expand Down
Loading

0 comments on commit d5d924b

Please sign in to comment.