Skip to content

Commit

Permalink
Merge pull request #206 from wguanicedew/dev
Browse files Browse the repository at this point in the history
fix download logs
  • Loading branch information
wguanicedew authored Sep 19, 2023
2 parents be5cfa3 + 20ba77b commit 61fdfeb
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 86 deletions.
83 changes: 43 additions & 40 deletions client/lib/idds/client/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ def build_url(self, url, path=None, params=None, doseq=False):

return full_url

def get_request_response(self, url, type='GET', data=None, headers=None, auth_setup_step=False):
def get_request_response(self, url, type='GET', data=None, headers=None, auth_setup_step=False, return_result_directly=False):
"""
Send request to the IDDS server and get the response.
Expand Down Expand Up @@ -243,49 +243,52 @@ def get_request_response(self, url, type='GET', data=None, headers=None, auth_se
raise exceptions.ConnectionException('ConnectionError: ' + str(error))

if result is not None:
# print(result.text)
# print(result.headers)
# print(result.status_code)
if result.status_code == HTTP_STATUS_CODE.OK:
if return_result_directly:
return result
else:
# print(result.text)
if result.text:
return json_loads(result.text)
else:
return None
elif result.headers and 'ExceptionClass' in result.headers:
try:
if result.headers and 'ExceptionClass' in result.headers:
cls = getattr(exceptions, result.headers['ExceptionClass'])
msg = result.headers['ExceptionMessage']
raise cls(msg)
# print(result.headers)
# print(result.status_code)
if result.status_code == HTTP_STATUS_CODE.OK:
# print(result.text)
if result.text:
return json_loads(result.text)
else:
if result.text:
data = json_loads(result.text)
raise exceptions.IDDSException(**data)
return None
elif result.headers and 'ExceptionClass' in result.headers:
try:
if result.headers and 'ExceptionClass' in result.headers:
cls = getattr(exceptions, result.headers['ExceptionClass'])
msg = result.headers['ExceptionMessage']
raise cls(msg)
else:
raise exceptions.IDDSException("Unknow exception: %s" % (result.text))
except AttributeError:
if result.text:
data = json_loads(result.text)
raise exceptions.IDDSException(**data)
else:
raise exceptions.IDDSException("Unknow exception: %s" % (result.text))
except AttributeError:
raise exceptions.IDDSException(result.text)
elif result.status_code in [HTTP_STATUS_CODE.BadRequest,
HTTP_STATUS_CODE.Unauthorized,
HTTP_STATUS_CODE.Forbidden,
HTTP_STATUS_CODE.NotFound,
HTTP_STATUS_CODE.NoMethod,
HTTP_STATUS_CODE.InternalError]:
raise exceptions.IDDSException(result.text)
elif result.status_code in [HTTP_STATUS_CODE.BadRequest,
HTTP_STATUS_CODE.Unauthorized,
HTTP_STATUS_CODE.Forbidden,
HTTP_STATUS_CODE.NotFound,
HTTP_STATUS_CODE.NoMethod,
HTTP_STATUS_CODE.InternalError]:
raise exceptions.IDDSException(result.text)
else:
try:
if result.headers and 'ExceptionClass' in result.headers:
cls = getattr(exceptions, result.headers['ExceptionClass'])
msg = result.headers['ExceptionMessage']
raise cls(msg)
else:
if result.text:
data = json_loads(result.text)
raise exceptions.IDDSException(**data)
else:
try:
if result.headers and 'ExceptionClass' in result.headers:
cls = getattr(exceptions, result.headers['ExceptionClass'])
msg = result.headers['ExceptionMessage']
raise cls(msg)
else:
raise exceptions.IDDSException("Unknow exception: %s" % (result.text))
except AttributeError:
raise exceptions.IDDSException(result.text)
if result.text:
data = json_loads(result.text)
raise exceptions.IDDSException(**data)
else:
raise exceptions.IDDSException("Unknow exception: %s" % (result.text))
except AttributeError:
raise exceptions.IDDSException(result.text)
if result is None:
raise exceptions.IDDSException('Response is None')
6 changes: 3 additions & 3 deletions client/lib/idds/client/logsclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,14 @@
# http://www.apache.org/licenses/LICENSE-2.0OA
#
# Authors:
# - Wen Guan, <[email protected]>, 2020
# - Wen Guan, <[email protected]>, 2020 - 2023


"""
Cacher Rest client to access IDDS system.
"""

import os
import requests

from idds.common import exceptions
from idds.client.base import BaseRestClient
Expand Down Expand Up @@ -70,7 +69,8 @@ def download_logs(self, workload_id=None, request_id=None, dest_dir='./', filena
with open(filename, 'w') as fp:
fp.write(content)
"""
response = requests.get(url, verify=False)
# response = requests.get(url, verify=False)
response = self.get_request_response(url, return_result_directly=True)
# print(response)
# print(response.text)
# print(response.content)
Expand Down
38 changes: 21 additions & 17 deletions main/lib/idds/agents/common/baseagent.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,9 @@ def is_self(self, health_item):
def get_health_payload(self):
return None

def is_ready(self):
return True

def health_heartbeat(self, heartbeat_delay=None):
if heartbeat_delay:
self.heartbeat_delay = heartbeat_delay
Expand All @@ -240,23 +243,24 @@ def health_heartbeat(self, heartbeat_delay=None):
thread_id = self.get_thread_id()
thread_name = self.get_thread_name()
payload = self.get_health_payload()
self.logger.debug("health heartbeat: agent %s, pid %s, thread %s, delay %s, payload %s" % (self.get_name(), pid, thread_name, self.heartbeat_delay, payload))
core_health.add_health_item(agent=self.get_name(), hostname=hostname, pid=pid,
thread_id=thread_id, thread_name=thread_name, payload=payload)
core_health.clean_health(older_than=self.heartbeat_delay * 2)

health_items = core_health.retrieve_health_items()
pids, pid_not_exists = [], []
for health_item in health_items:
if health_item['hostname'] == hostname:
pid = health_item['pid']
if pid not in pids:
pids.append(pid)
for pid in pids:
if not pid_exists(pid):
pid_not_exists.append(pid)
if pid_not_exists:
core_health.clean_health(hostname=hostname, pids=pid_not_exists, older_than=None)
if self.is_ready():
self.logger.debug("health heartbeat: agent %s, pid %s, thread %s, delay %s, payload %s" % (self.get_name(), pid, thread_name, self.heartbeat_delay, payload))
core_health.add_health_item(agent=self.get_name(), hostname=hostname, pid=pid,
thread_id=thread_id, thread_name=thread_name, payload=payload)
core_health.clean_health(older_than=self.heartbeat_delay * 2)

health_items = core_health.retrieve_health_items()
pids, pid_not_exists = [], []
for health_item in health_items:
if health_item['hostname'] == hostname:
pid = health_item['pid']
if pid not in pids:
pids.append(pid)
for pid in pids:
if not pid_exists(pid):
pid_not_exists.append(pid)
if pid_not_exists:
core_health.clean_health(hostname=hostname, pids=pid_not_exists, older_than=None)

def add_default_tasks(self):
task = self.create_task(task_func=self.health_heartbeat, task_output_queue=None,
Expand Down
42 changes: 22 additions & 20 deletions main/lib/idds/agents/common/eventbus/msgeventbusbackend.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,26 +329,28 @@ def set_manager(self, manager):
if not manager:
manager = self.get_manager()

if (not self.manager or self.manager['connect'] != manager['connect']
or self.manager['username'] != manager['username'] # noqa W503, E129
or self.manager['password'] != manager['password']): # noqa W503, E129
with self._lock:
try:
self.manager = manager
self.manager_socket = self.context.socket(zmq.REQ)
self.manager_socket.plain_username = manager['username'].encode('utf-8')
self.manager_socket.plain_password = manager['password'].encode('utf-8')
self.manager_socket.connect(manager['connect'])
except (zmq.error.ZMQError, zmq.Again) as error:
self.logger.critical("Caught an exception: %s\n%s" % (str(error), traceback.format_exc()))
self.num_failures += 1
except Exception as error:
self.logger.critical("Caught an exception: %s\n%s" % (str(error), traceback.format_exc()))
self.num_failures += 1

def get_manager(self):
if self.manager:
return self.manager
if (not self.manager and not manager['connect'] and not manager['username'] and not manager['password']):
if (self.manager['connect'] != manager['connect']
or self.manager['username'] != manager['username'] # noqa W503, E129
or self.manager['password'] != manager['password']): # noqa W503, E129
with self._lock:
try:
self.manager = manager
self.manager_socket = self.context.socket(zmq.REQ)
self.manager_socket.plain_username = manager['username'].encode('utf-8')
self.manager_socket.plain_password = manager['password'].encode('utf-8')
self.manager_socket.connect(manager['connect'])
except (zmq.error.ZMQError, zmq.Again) as error:
self.logger.critical("Caught an exception: %s\n%s" % (str(error), traceback.format_exc()))
self.num_failures += 1
except Exception as error:
self.logger.critical("Caught an exception: %s\n%s" % (str(error), traceback.format_exc()))
self.num_failures += 1

def get_manager(self, myself=False):
if not myself:
if (self.manager and self.manager['connect'] and self.manager['username'] and self.manager['password']):
return self.manager
manager = {'connect': self.coordinator_con_string,
'username': self._username,
'password': self._password}
Expand Down
6 changes: 6 additions & 0 deletions main/lib/idds/agents/coordinator/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,12 @@ def __init__(self, num_threads=1, coordination_interval_delay=300,
def __del__(self):
self.stop_coordinator()

def is_ready(self):
# manager = self.event_bus.get_manager()
# if (self.manager and manager['connect'] and manager['username'] and manager['password']):
# return True
return True

def get_health_payload(self):
manager = self.event_bus.get_manager()
payload = {'manager': manager}
Expand Down
12 changes: 6 additions & 6 deletions monitor/data/conf.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@

var appConfig = {
'iddsAPI_request': "https://lxplus808.cern.ch:443/idds/monitor_request/null/null",
'iddsAPI_transform': "https://lxplus808.cern.ch:443/idds/monitor_transform/null/null",
'iddsAPI_processing': "https://lxplus808.cern.ch:443/idds/monitor_processing/null/null",
'iddsAPI_request_detail': "https://lxplus808.cern.ch:443/idds/monitor/null/null/true/false/false",
'iddsAPI_transform_detail': "https://lxplus808.cern.ch:443/idds/monitor/null/null/false/true/false",
'iddsAPI_processing_detail': "https://lxplus808.cern.ch:443/idds/monitor/null/null/false/false/true"
'iddsAPI_request': "https://lxplus812.cern.ch:443/idds/monitor_request/null/null",
'iddsAPI_transform': "https://lxplus812.cern.ch:443/idds/monitor_transform/null/null",
'iddsAPI_processing': "https://lxplus812.cern.ch:443/idds/monitor_processing/null/null",
'iddsAPI_request_detail': "https://lxplus812.cern.ch:443/idds/monitor/null/null/true/false/false",
'iddsAPI_transform_detail': "https://lxplus812.cern.ch:443/idds/monitor/null/null/false/true/false",
'iddsAPI_processing_detail': "https://lxplus812.cern.ch:443/idds/monitor/null/null/false/false/true"
}

0 comments on commit 61fdfeb

Please sign in to comment.