From 134a6fc70e0e7df78a275b7a1326823546595d64 Mon Sep 17 00:00:00 2001 From: Aleksandr Alekseev Date: Mon, 22 May 2023 14:56:37 +0200 Subject: [PATCH 1/3] First implementation --- core/libs/elasticsearch.py | 81 ++++++++++++++++++++++++-------------- core/libs/task.py | 10 +++-- core/settings/config.py | 5 +++ core/views.py | 19 ++++++--- 4 files changed, 76 insertions(+), 39 deletions(-) diff --git a/core/libs/elasticsearch.py b/core/libs/elasticsearch.py index b12f16ae..177cec0d 100644 --- a/core/libs/elasticsearch.py +++ b/core/libs/elasticsearch.py @@ -13,8 +13,7 @@ _logger = logging.getLogger('bigpandamon') - -def get_es_credentials(instance='es-atlas'): +def get_es_credentials(instance): """ Getting credentials from settings :param instance: str, es-atlas or es-monit @@ -23,47 +22,69 @@ def get_es_credentials(instance='es-atlas'): es_host = None es_user = None es_password = None - if instance == 'es-atlas' and hasattr(settings, 'ES'): - es_host = settings.ES.get('esHost', None) - es_port = settings.ES.get('esPort', None) - es_host = es_host + ':' + es_port + '/es' if es_host else None - es_user = settings.ES.get('esUser', None) - es_password = settings.ES.get('esPassword', None) - elif instance == 'es-monit' and hasattr(settings, 'ES_MONIT'): - es_host = settings.ES_MONIT.get('esHost', None) - es_port = settings.ES_MONIT.get('esPort', None) - es_host = es_host + ':' + es_port + '/es' if es_host else None - es_user = settings.ES_MONIT.get('esUser', None) - es_password = settings.ES_MONIT.get('esPassword', None) + if settings.DEPLOYMENT == 'ORACLE_ATLAS': + if instance == 'es-atlas' and hasattr(settings, 'ES'): + es_host = settings.ES.get('esHost', None) + es_port = settings.ES.get('esPort', None) + es_host = es_host + ':' + es_port + '/es' if es_host else None + es_user = settings.ES.get('esUser', None) + es_password = settings.ES.get('esPassword', None) + elif instance == 'es-monit' and hasattr(settings, 'ES_MONIT'): + es_host = settings.ES_MONIT.get('esHost', None) + es_port = settings.ES_MONIT.get('esPort', None) + es_host = es_host + ':' + es_port + '/es' if es_host else None + es_user = settings.ES_MONIT.get('esUser', None) + es_password = settings.ES_MONIT.get('esPassword', None) + else: + if hasattr(settings, 'ELASTIC'): + es_host = settings.ELASTIC.get('esHost', None) + es_port = settings.ELASTIC.get('esPort', None) + es_protocol = settings.ELASTIC.get('esProtocol', None) + es_path = settings.ELASTIC.get('esPath', None) + es_host = es_protocol + '://' + es_host + ':' + es_port + '/' + es_path if es_host else None + es_user = settings.ELASTIC.get('esUser', None) + es_password = settings.ELASTIC.get('esPassword', None) if any(i is None for i in (es_host, es_user, es_password)): raise Exception('ES cluster credentials was not found in settings') else: return es_host, es_user, es_password - -def create_es_connection(verify_certs=True, timeout=2000, max_retries=10, retry_on_timeout=True, instance='es-atlas'): +def create_es_connection(instance='es-atlas', protocol='https', timeout=2000, max_retries=10, + retry_on_timeout=True): """ Create a connection to ElasticSearch cluster """ es_host, es_user, es_password = get_es_credentials(instance) + try: - connection = Elasticsearch( - ['https://{0}'.format(es_host)], - http_auth=(es_user, es_password), - verify_certs=verify_certs, - timeout=timeout, - max_retries=max_retries, - retry_on_timeout=retry_on_timeout, - ca_certs='/etc/pki/tls/certs/ca-bundle.trust.crt' - ) + if protocol == 'https': + ca_certs = settings.CA_CERTS_ES + + connection = Elasticsearch( + ['{0}://{1}'.format(protocol, es_host)], + http_auth=(es_user, es_password), + verify_certs=True, + timeout=timeout, + max_retries=max_retries, + retry_on_timeout=retry_on_timeout, + ca_certs = ca_certs + ) + else: + connection = Elasticsearch( + ['{0}://{1}'.format(protocol, es_host)], + http_auth=(es_user, es_password), + timeout=timeout, + max_retries=max_retries, + retry_on_timeout=retry_on_timeout) return connection + except Exception as ex: _logger.error(ex) return None -def get_payloadlog(id, es_conn, start=0, length=50, mode='pandaid', sort='asc', search_string=''): +def get_payloadlog(id, es_conn, index, start=0, length=50, mode='pandaid', sort='asc', search_string=''): """ Get pilot logs from ATLAS ElasticSearch storage """ @@ -73,7 +94,8 @@ def get_payloadlog(id, es_conn, start=0, length=50, mode='pandaid', sort='asc', total = 0 flag_running_job = True end = start + length - s = Search(using=es_conn, index='atlas_pilotlogs*') + + s = Search(using=es_conn, index='{0}*'.format(index)) s = s.source(["@timestamp", "@timestamp_nanoseconds", "level", "message", "PandaJobID", "TaskID", "Harvester_WorkerID", "Harvester_ID"]) @@ -104,7 +126,6 @@ def get_payloadlog(id, es_conn, start=0, length=50, mode='pandaid', sort='asc', return logs_list, flag_running_job, total - def upload_data(es_conn, index_name_base, data, timestamp_param='creationdate', id_param='jeditaskid'): """ Push data to ElasticSearch cluster @@ -186,7 +207,6 @@ def upload_data(es_conn, index_name_base, data, timestamp_param='creationdate', return result - def get_split_rule_info(es_conn, jeditaskid): """ Get split rule entries from ATLAS Elastic @@ -195,7 +215,8 @@ def get_split_rule_info(es_conn, jeditaskid): :return: split rule messagees """ split_rules = [] - s = Search(using=es_conn, index='atlas_jedilogs*') + jedi_logs_index = settings.JEDI_LOGS_ESINDEX +'-*' + s = Search(using=es_conn, index=jedi_logs_index) s = s.source(['@timestamp', 'message']) s = s.filter('term', jediTaskID='{0}'.format(jeditaskid)) q = Q("match", message='change_split_rule') diff --git a/core/libs/task.py b/core/libs/task.py index 7afbe634..d3faf7f2 100644 --- a/core/libs/task.py +++ b/core/libs/task.py @@ -880,9 +880,11 @@ def get_logs_by_taskid(jeditaskid): tasks_logs = [] - connection = create_es_connection() + es_conn = create_es_connection() - s = Search(using=connection, index='atlas_jedilogs-*') + jedi_logs_index = settings.JEDI_LOGS_ESINDEX +'-*' + + s = Search(using=es_conn, index=jedi_logs_index) s = s.filter('term', **{'jediTaskID': jeditaskid}) @@ -900,7 +902,9 @@ def get_logs_by_taskid(jeditaskid): tasks_logs.append({'jediTaskID': jeditaskid, 'logname': type, 'loglevel': levelname, 'lcount': str(levelnames['doc_count'])}) - s = Search(using=connection, index='atlas_pandalogs-*') + panda_logs_index = settings.PANDA_LOGS_ESINDEX + '-*' + + s = Search(using=connection, index=panda_logs_index) s = s.filter('term', **{'jediTaskID': jeditaskid}) diff --git a/core/settings/config.py b/core/settings/config.py index 59392fb4..0fb28241 100644 --- a/core/settings/config.py +++ b/core/settings/config.py @@ -44,6 +44,11 @@ # PanDA server URL PANDA_SERVER_URL = os.environ.get('PANDA_SERVER_URL', 'https://pandaserver.cern.ch/server/panda') +# ElasticSearch +PANDA_LOGS_ESINDEX = os.environ.get('PANDA_LOGS_ESINDEX', 'atlas_pandalogs') +JEDI_LOGS_ESINDEX = os.environ.get('JEDI_LOGS_ESINDEX', 'atlas_jedilogs') +PILOT_LOGS_ESINDEX = os.environ.get('PILOT_LOGS_ESINDEX', 'atlas_pilotlogs') +CA_CERTS_ES = os.environ.get('CA_CERTS_ES', '/etc/pki/tls/certs/ca-bundle.trust.crt') # DB_ROUTERS for atlas's prodtask DATABASE_ROUTERS = [ 'core.dbrouter.ProdMonDBRouter', diff --git a/core/views.py b/core/views.py index dba88921..c85a6aaf 100644 --- a/core/views.py +++ b/core/views.py @@ -6766,11 +6766,13 @@ def esatlasPandaLoggerJson(request): return response if settings.DEPLOYMENT != 'ORACLE_ATLAS': - return HttpResponse('It does not exist for non ATLAS BipPanDA monintoring system', content_type='text/html') + return HttpResponse('It does not exist for non ATLAS BipPanDA monitoring system', content_type='text/html') connection = create_es_connection() - s = Search(using=connection, index='atlas_jedilogs-*') + index = settings.JEDI_LOGS_ESINDEX + + s = Search(using=connection, index='{0}*'.format(index)) s.aggs.bucket('jediTaskID', 'terms', field='jediTaskID', size=100) \ .bucket('type', 'terms', field='fields.type.keyword') \ @@ -6881,7 +6883,10 @@ def esatlasPandaLogger(request): } jediCat = ['cat1', 'cat2', 'cat3', 'cat4', 'cat5', 'cat6', 'cat7'] - indices = ['atlas_pandalogs-', 'atlas_jedilogs-'] + panda_index = settings.PANDA_LOGS_ESINDEX + '-' + jedi_index = settings.JEDI_LOGS_ESINDEX + '-' + + indices = [panda_index, jedi_index] panda = {} jedi = {} @@ -6895,7 +6900,7 @@ def esatlasPandaLogger(request): res = s.execute() - if index == "atlas_pandalogs-": + if index == panda_index: for cat in pandaCat: panda[cat] = {} for agg in res['aggregations']['logName']['buckets']: @@ -6912,7 +6917,7 @@ def esatlasPandaLogger(request): panda[cat][name][type][levelname] = {} panda[cat][name][type][levelname]['logLevel'] = levelname panda[cat][name][type][levelname]['lcount'] = str(levelnames['doc_count']) - elif index == "atlas_jedilogs-": + elif index == jedi_index: for cat in jediCat: jedi[cat] = {} for agg in res['aggregations']['logName']['buckets']: @@ -8762,7 +8767,9 @@ def getPayloadLog(request): else: search_string = request.POST['search'] - payloadlog, job_running_flag, total = get_payloadlog(id, connection, start=start_var, length=length_var, mode=mode, + pilot_logs_index = settings.PILOT_LOGS_ESINDEX + + payloadlog, job_running_flag, total = get_payloadlog(id, connection, pilot_logs_index, start=start_var, length=length_var, mode=mode, sort=sort, search_string=search_string) log_content['payloadlog'] = payloadlog From 49097f6723078859002914cca2540e5e86b74263 Mon Sep 17 00:00:00 2001 From: Aleksandr Alekseev Date: Mon, 22 May 2023 15:03:08 +0200 Subject: [PATCH 2/3] Code review --- core/libs/elasticsearch.py | 5 +++-- core/libs/task.py | 4 ++-- core/settings/config.py | 7 ++++--- core/views.py | 6 +++--- 4 files changed, 12 insertions(+), 10 deletions(-) diff --git a/core/libs/elasticsearch.py b/core/libs/elasticsearch.py index 177cec0d..8e2cfbda 100644 --- a/core/libs/elasticsearch.py +++ b/core/libs/elasticsearch.py @@ -95,7 +95,7 @@ def get_payloadlog(id, es_conn, index, start=0, length=50, mode='pandaid', sort= flag_running_job = True end = start + length - s = Search(using=es_conn, index='{0}*'.format(index)) + s = Search(using=es_conn, index=index) s = s.source(["@timestamp", "@timestamp_nanoseconds", "level", "message", "PandaJobID", "TaskID", "Harvester_WorkerID", "Harvester_ID"]) @@ -215,7 +215,8 @@ def get_split_rule_info(es_conn, jeditaskid): :return: split rule messagees """ split_rules = [] - jedi_logs_index = settings.JEDI_LOGS_ESINDEX +'-*' + jedi_logs_index = settings.JEDI_LOGS_ESINDEX + s = Search(using=es_conn, index=jedi_logs_index) s = s.source(['@timestamp', 'message']) s = s.filter('term', jediTaskID='{0}'.format(jeditaskid)) diff --git a/core/libs/task.py b/core/libs/task.py index d3faf7f2..3ee9fb2e 100644 --- a/core/libs/task.py +++ b/core/libs/task.py @@ -882,7 +882,7 @@ def get_logs_by_taskid(jeditaskid): es_conn = create_es_connection() - jedi_logs_index = settings.JEDI_LOGS_ESINDEX +'-*' + jedi_logs_index = settings.JEDI_LOGS_ESINDEX s = Search(using=es_conn, index=jedi_logs_index) @@ -902,7 +902,7 @@ def get_logs_by_taskid(jeditaskid): tasks_logs.append({'jediTaskID': jeditaskid, 'logname': type, 'loglevel': levelname, 'lcount': str(levelnames['doc_count'])}) - panda_logs_index = settings.PANDA_LOGS_ESINDEX + '-*' + panda_logs_index = settings.PANDA_LOGS_ESINDEX s = Search(using=connection, index=panda_logs_index) diff --git a/core/settings/config.py b/core/settings/config.py index 0fb28241..ece15ae9 100644 --- a/core/settings/config.py +++ b/core/settings/config.py @@ -45,10 +45,11 @@ PANDA_SERVER_URL = os.environ.get('PANDA_SERVER_URL', 'https://pandaserver.cern.ch/server/panda') # ElasticSearch -PANDA_LOGS_ESINDEX = os.environ.get('PANDA_LOGS_ESINDEX', 'atlas_pandalogs') -JEDI_LOGS_ESINDEX = os.environ.get('JEDI_LOGS_ESINDEX', 'atlas_jedilogs') -PILOT_LOGS_ESINDEX = os.environ.get('PILOT_LOGS_ESINDEX', 'atlas_pilotlogs') +PANDA_LOGS_ESINDEX = os.environ.get('PANDA_LOGS_ESINDEX', 'atlas_pandalogs*') +JEDI_LOGS_ESINDEX = os.environ.get('JEDI_LOGS_ESINDEX', 'atlas_jedilogs*') +PILOT_LOGS_ESINDEX = os.environ.get('PILOT_LOGS_ESINDEX', 'atlas_pilotlogs*') CA_CERTS_ES = os.environ.get('CA_CERTS_ES', '/etc/pki/tls/certs/ca-bundle.trust.crt') + # DB_ROUTERS for atlas's prodtask DATABASE_ROUTERS = [ 'core.dbrouter.ProdMonDBRouter', diff --git a/core/views.py b/core/views.py index c85a6aaf..15a76340 100644 --- a/core/views.py +++ b/core/views.py @@ -6768,11 +6768,11 @@ def esatlasPandaLoggerJson(request): if settings.DEPLOYMENT != 'ORACLE_ATLAS': return HttpResponse('It does not exist for non ATLAS BipPanDA monitoring system', content_type='text/html') - connection = create_es_connection() + es_conn = create_es_connection() - index = settings.JEDI_LOGS_ESINDEX + jedi_logs_index = settings.JEDI_LOGS_ESINDEX - s = Search(using=connection, index='{0}*'.format(index)) + s = Search(using=es_conn, index=jedi_logs_index) s.aggs.bucket('jediTaskID', 'terms', field='jediTaskID', size=100) \ .bucket('type', 'terms', field='fields.type.keyword') \ From 32a790982cf9a5daeaced588ac95ac2c07715fc4 Mon Sep 17 00:00:00 2001 From: Aleksandr Alekseev Date: Mon, 22 May 2023 15:40:45 +0200 Subject: [PATCH 3/3] Code review --- core/libs/elasticsearch.py | 6 +++--- core/libs/task.py | 4 ++-- core/settings/config.py | 8 ++++---- core/views.py | 14 +++++++++----- 4 files changed, 18 insertions(+), 14 deletions(-) diff --git a/core/libs/elasticsearch.py b/core/libs/elasticsearch.py index 8e2cfbda..2074bcff 100644 --- a/core/libs/elasticsearch.py +++ b/core/libs/elasticsearch.py @@ -36,7 +36,7 @@ def get_es_credentials(instance): es_user = settings.ES_MONIT.get('esUser', None) es_password = settings.ES_MONIT.get('esPassword', None) else: - if hasattr(settings, 'ELASTIC'): + if hasattr(settings, 'ES_CLUSTER'): es_host = settings.ELASTIC.get('esHost', None) es_port = settings.ELASTIC.get('esPort', None) es_protocol = settings.ELASTIC.get('esProtocol', None) @@ -59,7 +59,7 @@ def create_es_connection(instance='es-atlas', protocol='https', timeout=2000, ma try: if protocol == 'https': - ca_certs = settings.CA_CERTS_ES + ca_certs = settings.ES_CA_CERT connection = Elasticsearch( ['{0}://{1}'.format(protocol, es_host)], @@ -215,7 +215,7 @@ def get_split_rule_info(es_conn, jeditaskid): :return: split rule messagees """ split_rules = [] - jedi_logs_index = settings.JEDI_LOGS_ESINDEX + jedi_logs_index = settings.ES_INDEX_JEDI_LOGS s = Search(using=es_conn, index=jedi_logs_index) s = s.source(['@timestamp', 'message']) diff --git a/core/libs/task.py b/core/libs/task.py index 3ee9fb2e..146d02b3 100644 --- a/core/libs/task.py +++ b/core/libs/task.py @@ -882,7 +882,7 @@ def get_logs_by_taskid(jeditaskid): es_conn = create_es_connection() - jedi_logs_index = settings.JEDI_LOGS_ESINDEX + jedi_logs_index = settings.ES_INDEX_JEDI_LOGS s = Search(using=es_conn, index=jedi_logs_index) @@ -902,7 +902,7 @@ def get_logs_by_taskid(jeditaskid): tasks_logs.append({'jediTaskID': jeditaskid, 'logname': type, 'loglevel': levelname, 'lcount': str(levelnames['doc_count'])}) - panda_logs_index = settings.PANDA_LOGS_ESINDEX + panda_logs_index = settings.ES_INDEX_PANDA_LOGS s = Search(using=connection, index=panda_logs_index) diff --git a/core/settings/config.py b/core/settings/config.py index ece15ae9..7bdff32f 100644 --- a/core/settings/config.py +++ b/core/settings/config.py @@ -45,10 +45,10 @@ PANDA_SERVER_URL = os.environ.get('PANDA_SERVER_URL', 'https://pandaserver.cern.ch/server/panda') # ElasticSearch -PANDA_LOGS_ESINDEX = os.environ.get('PANDA_LOGS_ESINDEX', 'atlas_pandalogs*') -JEDI_LOGS_ESINDEX = os.environ.get('JEDI_LOGS_ESINDEX', 'atlas_jedilogs*') -PILOT_LOGS_ESINDEX = os.environ.get('PILOT_LOGS_ESINDEX', 'atlas_pilotlogs*') -CA_CERTS_ES = os.environ.get('CA_CERTS_ES', '/etc/pki/tls/certs/ca-bundle.trust.crt') +ES_INDEX_PANDA_LOGS = os.environ.get('ES_INDEX_PANDA_LOGS', 'atlas_pandalogs*') +ES_INDEX_JEDI_LOGS = os.environ.get('ES_INDEX_JEDI_LOGS', 'atlas_jedilogs*') +ES_INDEX_PILOT_LOGS = os.environ.get('ES_INDEX_PILOT_LOGS', 'atlas_pilotlogs*') +ES_CA_CERT = os.environ.get('ES_CA_CERT', '/etc/pki/tls/certs/ca-bundle.trust.crt') # DB_ROUTERS for atlas's prodtask DATABASE_ROUTERS = [ diff --git a/core/views.py b/core/views.py index 15a76340..ddf32033 100644 --- a/core/views.py +++ b/core/views.py @@ -6770,7 +6770,7 @@ def esatlasPandaLoggerJson(request): es_conn = create_es_connection() - jedi_logs_index = settings.JEDI_LOGS_ESINDEX + jedi_logs_index = settings.ES_INDEX_JEDI_LOGS s = Search(using=es_conn, index=jedi_logs_index) @@ -6883,8 +6883,8 @@ def esatlasPandaLogger(request): } jediCat = ['cat1', 'cat2', 'cat3', 'cat4', 'cat5', 'cat6', 'cat7'] - panda_index = settings.PANDA_LOGS_ESINDEX + '-' - jedi_index = settings.JEDI_LOGS_ESINDEX + '-' + panda_index = settings.ES_INDEX_PANDA_LOGS[:-1]+'-' + jedi_index = settings.ES_INDEX_JEDI_LOGS[:-1]+'-' indices = [panda_index, jedi_index] @@ -8256,7 +8256,11 @@ def initSelfMonitor(request): else: remote = request.META['REMOTE_ADDR'] - urlProto = request.META['wsgi.url_scheme'] + if 'wsgi.url_scheme' in request.META: + urlProto = request.META['wsgi.url_scheme'] + else: + urlProto = 'http' + if 'HTTP_X_FORWARDED_PROTO' in request.META: urlProto = request.META['HTTP_X_FORWARDED_PROTO'] urlProto = str(urlProto) + "://" @@ -8767,7 +8771,7 @@ def getPayloadLog(request): else: search_string = request.POST['search'] - pilot_logs_index = settings.PILOT_LOGS_ESINDEX + pilot_logs_index = settings.ES_INDEX_PILOT_LOGS payloadlog, job_running_flag, total = get_payloadlog(id, connection, pilot_logs_index, start=start_var, length=length_var, mode=mode, sort=sort, search_string=search_string)