Skip to content

Commit

Permalink
Merge pull request #125 from wguanicedew/dev
Browse files Browse the repository at this point in the history
improve task statistics and improve stomp connections
  • Loading branch information
wguanicedew authored Nov 18, 2022
2 parents e7d361f + a3290d7 commit 7d2956a
Show file tree
Hide file tree
Showing 24 changed files with 297 additions and 82 deletions.
5 changes: 5 additions & 0 deletions atlas/lib/idds/atlas/workflowv2/atlaspandawork.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,11 @@ def parse_task_parameters(self, task_parameters):
if jobP['param_type'] == 'input':
input_c = jobP['dataset']
scope, name = extract_scope_atlas(input_c, scopes=[])
if len(name) > 255:
if "consolidate" in jobP:
scope, name = extract_scope_atlas(jobP['consolidate'], scopes=[])
else:
name = name[:250]
input_coll = {'scope': scope, 'name': name}
self.set_primary_input_collection(input_coll)
if jobP['param_type'] == 'output':
Expand Down
4 changes: 2 additions & 2 deletions doma/lib/idds/doma/workflow/domapandawork.py
Original file line number Diff line number Diff line change
Expand Up @@ -856,8 +856,8 @@ def get_job_maps(self, input_output_maps):
if 'panda_id' in content['content_metadata']:
finished_jobs.append(content['content_metadata']['panda_id'])
elif content['substatus'] in [ContentStatus.Failed, ContentStatus.FinalFailed,
ContentStatus.Lost, ContentStatus.Deleted,
ContentStatus.Missing]:
ContentStatus.Lost, ContentStatus.Deleted,
ContentStatus.Missing]:
if 'panda_id' in content['content_metadata']:
failed_jobs.append(content['content_metadata']['panda_id'])
for content in inputs:
Expand Down
1 change: 1 addition & 0 deletions main/etc/sql/oracle_11.sql
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ CREATE TABLE TRANSFORMS
started_at DATE,
finished_at DATE,
expired_at DATE,
name VARCHAR2(255),
transform_metadata CLOB,
running_metadata CLOB,
CONSTRAINT TRANSFORMS_PK PRIMARY KEY (transform_id)
Expand Down
1 change: 1 addition & 0 deletions main/etc/sql/oracle_19.sql
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ CREATE TABLE TRANSFORMS
started_at DATE,
finished_at DATE,
expired_at DATE,
name VARCHAR2(255),
transform_metadata CLOB constraint TRANSFORM_METADATA_ENSURE_JSON CHECK(transform_metadata IS JSON(LAX)),
running_metadata CLOB,
CONSTRAINT TRANSFORMS_PK PRIMARY KEY (transform_id)
Expand Down
4 changes: 4 additions & 0 deletions main/etc/sql/oracle_update.sql
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,7 @@ CREATE INDEX COMMANDS_TYPE_ST_IDX ON COMMANDS (cmd_type, status, destination, re
CREATE INDEX COMMANDS_TYPE_ST_TF_IDX ON COMMANDS (cmd_type, status, destination, transform_id);
CREATE INDEX COMMANDS_TYPE_ST_PR_IDX ON COMMANDS (cmd_type, status, destination, processing_id);

-- 2022.11.03
alter table transforms add name VARCHAR2(255);
alter table collections add failed_files NUMBER(10);
alter table collections add missing_files NUMBER(10);
20 changes: 18 additions & 2 deletions main/lib/idds/agents/carrier/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1160,14 +1160,21 @@ def sync_collection_status(request_id, transform_id, workload_id, work, input_ou

for content in inputs + outputs + logs:
if content['coll_id'] not in coll_status:
coll_status[content['coll_id']] = {'total_files': 0, 'processed_files': 0, 'processing_files': 0, 'bytes': 0}
coll_status[content['coll_id']] = {'total_files': 0, 'processed_files': 0, 'processing_files': 0, 'bytes': 0,
'new_files': 0, 'failed_files': 0, 'missing_files': 0}
coll_status[content['coll_id']]['total_files'] += 1

if content['status'] in [ContentStatus.Available, ContentStatus.Mapped,
ContentStatus.Available.value, ContentStatus.Mapped.value,
ContentStatus.FakeAvailable, ContentStatus.FakeAvailable.value]:
coll_status[content['coll_id']]['processed_files'] += 1
coll_status[content['coll_id']]['bytes'] += content['bytes']
elif content['status'] in [ContentStatus.New]:
coll_status[content['coll_id']]['new_files'] += 1
elif content['status'] in [ContentStatus.Failed, ContentStatus.FinalFailed]:
coll_status[content['coll_id']]['failed_files'] += 1
elif content['status'] in [ContentStatus.Lost, ContentStatus.Deleted, ContentStatus.Missing]:
coll_status[content['coll_id']]['missing_files'] += 1
else:
coll_status[content['coll_id']]['processing_files'] += 1

Expand All @@ -1188,15 +1195,24 @@ def sync_collection_status(request_id, transform_id, workload_id, work, input_ou
coll.processed_files = coll_status[coll.coll_id]['processed_files']
coll.processing_files = coll_status[coll.coll_id]['processing_files']
coll.bytes = coll_status[coll.coll_id]['bytes']
coll.new_files = coll_status[coll.coll_id]['new_files']
coll.failed_files = coll_status[coll.coll_id]['failed_files']
coll.missing_files = coll_status[coll.coll_id]['missing_files']
else:
coll.total_files = 0
coll.processed_files = 0
coll.processing_files = 0
coll.new_files = 0
coll.failed_files = 0
coll.missing_files = 0

u_coll = {'coll_id': coll.coll_id,
'total_files': coll.total_files,
'processed_files': coll.processed_files,
'processing_files': coll.processing_files,
'new_files': coll.new_files,
'failed_files': coll.failed_files,
'missing_files': coll.missing_files,
'bytes': coll.bytes}
if terminate:
if force_close_collection or close_collection and all_updates_flushed or coll.status == CollectionStatus.Closed:
Expand Down Expand Up @@ -1342,5 +1358,5 @@ def handle_resume_processing(processing, agent_attributes, logger=None, log_pref
input_output_maps = get_input_output_maps(transform_id, work)
update_contents = reactive_contents(request_id, transform_id, workload_id, work, input_output_maps)

processing['status'] = ProcessingStatus.Processing
processing['status'] = ProcessingStatus.Running
return processing, update_collections, update_contents
1 change: 1 addition & 0 deletions main/lib/idds/agents/clerk/clerk.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ def generate_transform(self, req, work):
'priority': req['priority'],
'status': TransformStatus.New,
'retries': 0,
'name': work.get_work_name(),
'new_poll_period': self.new_poll_period,
'update_poll_period': self.update_poll_period,
'max_new_retries': req['max_new_retries'] if req['max_new_retries'] is not None else self.max_new_retries,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def get(self, event_type, wait=0):
def execute(self):
while not self.graceful_stop.is_set():
try:
self.graceful_stop.wait(1)
self.graceful_stop.wait(0.1)
except Exception as error:
self.logger.critical("Caught an exception: %s\n%s" % (str(error), traceback.format_exc()))

Expand Down
164 changes: 123 additions & 41 deletions main/lib/idds/agents/common/plugins/messaging.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ def __init__(self, name="MessagingSender", **kwargs):
self.graceful_stop = threading.Event()
self.request_queue = None
self.output_queue = None
self.response_queue = None

if not hasattr(self, 'brokers'):
raise Exception('brokers is required but not defined.')
Expand All @@ -72,7 +73,7 @@ def __init__(self, name="MessagingSender", **kwargs):
if not hasattr(self, 'destination'):
raise Exception('destination is required but not defined.')
if not hasattr(self, 'broker_timeout'):
self.broker_timeout = 10
self.broker_timeout = 60
else:
self.broker_timeout = int(self.broker_timeout)

Expand All @@ -87,7 +88,10 @@ def set_request_queue(self, request_queue):
def set_output_queue(self, output_queue):
self.output_queue = output_queue

def connect_to_messaging_brokers(self):
def set_response_queue(self, response_queue):
self.response_queue = response_queue

def connect_to_messaging_brokers(self, sender=True):
broker_addresses = []
for b in self.brokers:
try:
Expand All @@ -105,18 +109,47 @@ def connect_to_messaging_brokers(self):

self.logger.info("Resolved broker addresses: %s" % broker_addresses)

timeout = None
if sender:
timeout = self.broker_timeout

conns = []
for broker, port in broker_addresses:
conn = stomp.Connection12(host_and_ports=[(broker, port)],
vhost=self.vhost,
keepalive=True,
timeout=self.broker_timeout)
self.conns.append(conn)
heartbeats=(60000, 60000), # one minute
timeout=timeout)
conns.append(conn)
return conns

def send_message(self, msg):
def disconnect(self, conns):
for conn in conns:
try:
conn.disconnect()
except Exception:
pass

def get_connection(self):
try:
conn = random.sample(self.conns, 1)[0]
if not conn.is_connected():
# conn.start()
conn.connect(self.username, self.password, wait=True)
return conn
except Exception as error:
self.logger.error("Failed to connect to message broker(will re-resolve brokers): %s" % str(error))

self.disconnect(self.conns)

self.conns = self.connect_to_messaging_brokers(sender=True)
conn = random.sample(self.conns, 1)[0]
if not conn.is_connected():
# conn.start()
conn.connect(self.username, self.password, wait=True)
return conn

def send_message(self, msg):
conn = self.get_connection()

self.logger.info("Sending message to message broker: %s" % msg['msg_id'])
self.logger.debug("Sending message to message broker: %s" % json.dumps(msg['msg_content']))
Expand All @@ -128,21 +161,27 @@ def send_message(self, msg):
'vo': 'atlas',
'msg_type': str(msg['msg_type']).lower()})

def run(self):
self.connect_to_messaging_brokers()
def execute_send(self):
self.conns = self.connect_to_messaging_brokers(sender=True)

while not self.graceful_stop.is_set():
try:
if not self.request_queue.empty():
msg = self.request_queue.get(False)
if msg:
self.send_message(msg)
self.output_queue.put(msg)
self.response_queue.put(msg)
else:
time.sleep(1)
time.sleep(0.1)
except Exception as error:
self.logger.error("Messaging sender throws an exception: %s, %s" % (error, traceback.format_exc()))

def run(self):
try:
self.execute_send()
except Exception as error:
self.logger.error("Messaging sender throws an exception: %s, %s" % (error, traceback.format_exc()))

def __call__(self):
self.run()

Expand All @@ -151,65 +190,108 @@ class MessagingReceiver(MessagingSender):
def __init__(self, name="MessagingReceiver", **kwargs):
super(MessagingReceiver, self).__init__(name=name, **kwargs)
self.listener = None
self.receiver_conns = []

def get_listener(self, broker):
if self.listener is None:
self.listener = MessagingListener(broker, self.output_queue)
return self.listener

def subscribe(self):
self.conns = []

broker_addresses = []
for b in self.brokers:
try:
if ":" in b:
b, port = b.split(":")
else:
port = self.port

addrinfos = socket.getaddrinfo(b, 0, socket.AF_INET, 0, socket.IPPROTO_TCP)
for addrinfo in addrinfos:
b_addr = addrinfo[4][0]
broker_addresses.append((b_addr, port))
except socket.gaierror as error:
self.logger.error('Cannot resolve hostname %s: %s' % (b, str(error)))
self.receiver_conns = self.connect_to_messaging_brokers()

self.logger.info("Resolved broker addresses: %s" % broker_addresses)

for broker, port in broker_addresses:
conn = stomp.Connection12(host_and_ports=[(broker, port)],
vhost=self.vhost,
keepalive=True)
for conn in self.receiver_conns:
self.logger.info('connecting to %s' % conn.transport._Transport__host_and_ports[0][0])
conn.set_listener('message-receiver', self.get_listener(conn.transport._Transport__host_and_ports[0]))
conn.connect(self.username, self.password, wait=True)
conn.subscribe(destination=self.destination, id='atlas-idds-messaging', ack='auto')
self.conns.append(conn)

def execute_subscribe(self):
self.subscribe()

while not self.graceful_stop.is_set():
has_failed_connection = False
try:
for conn in self.conns:
for conn in self.receiver_conns:
if not conn.is_connected():
self.logger.info('connecting to %s' % conn.transport._Transport__host_and_ports[0][0])
conn.set_listener('message-receiver', self.get_listener(conn.transport._Transport__host_and_ports[0]))
# conn.start()
conn.connect(self.username, self.password, wait=True)
conn.subscribe(destination=self.destination, id='atlas-idds-messaging', ack='auto')
time.sleep(1)
time.sleep(0.1)
except Exception as error:
self.logger.error("Messaging receiver throws an exception: %s, %s" % (error, traceback.format_exc()))
has_failed_connection = True

if has_failed_connection:
# re-subscribe
self.disconnect(self.receiver_conns)
self.subscribe()

self.logger.info('receiver graceful stop requested')

for conn in self.conns:
self.disconnect(self.receiver_conns)

def run(self):
try:
self.execute_subscribe()
except Exception as error:
self.logger.error("Messaging receiver throws an exception: %s, %s" % (error, traceback.format_exc()))

def __call__(self):
self.run()


class MessagingMessager(MessagingReceiver):
def __init__(self, name="MessagingMessager", **kwargs):
super(MessagingMessager, self).__init__(name=name, **kwargs)

def execute_send_subscribe(self):
self.conns = self.connect_to_messaging_brokers(sender=True)
self.subscribe()

while not self.graceful_stop.is_set():
# send
while True:
try:
if not self.request_queue.empty():
msg = self.request_queue.get(False)
if msg:
self.send_message(msg)
if self.response_queue:
self.response_queue.put(msg)
else:
break
except Exception as error:
self.logger.error("Messaging sender throws an exception: %s, %s" % (error, traceback.format_exc()))

# subscribe
has_failed_connection = False
try:
conn.disconnect()
except Exception:
pass
for conn in self.receiver_conns:
if not conn.is_connected():
conn.set_listener('message-receiver', self.get_listener(conn.transport._Transport__host_and_ports[0]))
# conn.start()
conn.connect(self.username, self.password, wait=True)
conn.subscribe(destination=self.destination, id='atlas-idds-messaging', ack='auto')
except Exception as error:
self.logger.error("Messaging receiver throws an exception: %s, %s" % (error, traceback.format_exc()))
has_failed_connection = True

if has_failed_connection:
# re-subscribe
self.disconnect(self.receiver_conns)
self.subscribe()

time.sleep(0.1)

self.logger.info('receiver graceful stop requested')
self.disconnect(self.conns)
self.disconnect(self.receiver_conns)

def run(self):
try:
self.subscribe()
self.execute_send_subscribe()
except Exception as error:
self.logger.error("Messaging receiver throws an exception: %s, %s" % (error, traceback.format_exc()))

Expand Down
2 changes: 1 addition & 1 deletion main/lib/idds/agents/conductor/conductor.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def start_notifier(self):

self.logger.info("Starting notifier: %s" % self.notifier)
self.notifier.set_request_queue(self.message_queue)
self.notifier.set_output_queue(self.output_message_queue)
self.notifier.set_response_queue(self.output_message_queue)
self.notifier.start()

def stop_notifier(self):
Expand Down
2 changes: 1 addition & 1 deletion main/lib/idds/core/requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ def generate_collection(transform, collection, relation_type=CollectionRelationT
'workload_id': transform['workload_id'],
'coll_type': coll_type,
'scope': collection.scope,
'name': collection.name,
'name': collection.name[:254],
'relation_type': relation_type,
'bytes': coll_metadata['bytes'] if 'bytes' in coll_metadata else 0,
'total_files': coll_metadata['total_files'] if 'total_files' in coll_metadata else 0,
Expand Down
Loading

0 comments on commit 7d2956a

Please sign in to comment.