diff --git a/core/libs/elasticsearch.py b/core/libs/elasticsearch.py index b12f16ae..2074bcff 100644 --- a/core/libs/elasticsearch.py +++ b/core/libs/elasticsearch.py @@ -13,8 +13,7 @@ _logger = logging.getLogger('bigpandamon') - -def get_es_credentials(instance='es-atlas'): +def get_es_credentials(instance): """ Getting credentials from settings :param instance: str, es-atlas or es-monit @@ -23,47 +22,69 @@ def get_es_credentials(instance='es-atlas'): es_host = None es_user = None es_password = None - if instance == 'es-atlas' and hasattr(settings, 'ES'): - es_host = settings.ES.get('esHost', None) - es_port = settings.ES.get('esPort', None) - es_host = es_host + ':' + es_port + '/es' if es_host else None - es_user = settings.ES.get('esUser', None) - es_password = settings.ES.get('esPassword', None) - elif instance == 'es-monit' and hasattr(settings, 'ES_MONIT'): - es_host = settings.ES_MONIT.get('esHost', None) - es_port = settings.ES_MONIT.get('esPort', None) - es_host = es_host + ':' + es_port + '/es' if es_host else None - es_user = settings.ES_MONIT.get('esUser', None) - es_password = settings.ES_MONIT.get('esPassword', None) + if settings.DEPLOYMENT == 'ORACLE_ATLAS': + if instance == 'es-atlas' and hasattr(settings, 'ES'): + es_host = settings.ES.get('esHost', None) + es_port = settings.ES.get('esPort', None) + es_host = es_host + ':' + es_port + '/es' if es_host else None + es_user = settings.ES.get('esUser', None) + es_password = settings.ES.get('esPassword', None) + elif instance == 'es-monit' and hasattr(settings, 'ES_MONIT'): + es_host = settings.ES_MONIT.get('esHost', None) + es_port = settings.ES_MONIT.get('esPort', None) + es_host = es_host + ':' + es_port + '/es' if es_host else None + es_user = settings.ES_MONIT.get('esUser', None) + es_password = settings.ES_MONIT.get('esPassword', None) + else: + if hasattr(settings, 'ES_CLUSTER'): + es_host = settings.ELASTIC.get('esHost', None) + es_port = settings.ELASTIC.get('esPort', None) + es_protocol = settings.ELASTIC.get('esProtocol', None) + es_path = settings.ELASTIC.get('esPath', None) + es_host = es_protocol + '://' + es_host + ':' + es_port + '/' + es_path if es_host else None + es_user = settings.ELASTIC.get('esUser', None) + es_password = settings.ELASTIC.get('esPassword', None) if any(i is None for i in (es_host, es_user, es_password)): raise Exception('ES cluster credentials was not found in settings') else: return es_host, es_user, es_password - -def create_es_connection(verify_certs=True, timeout=2000, max_retries=10, retry_on_timeout=True, instance='es-atlas'): +def create_es_connection(instance='es-atlas', protocol='https', timeout=2000, max_retries=10, + retry_on_timeout=True): """ Create a connection to ElasticSearch cluster """ es_host, es_user, es_password = get_es_credentials(instance) + try: - connection = Elasticsearch( - ['https://{0}'.format(es_host)], - http_auth=(es_user, es_password), - verify_certs=verify_certs, - timeout=timeout, - max_retries=max_retries, - retry_on_timeout=retry_on_timeout, - ca_certs='/etc/pki/tls/certs/ca-bundle.trust.crt' - ) + if protocol == 'https': + ca_certs = settings.ES_CA_CERT + + connection = Elasticsearch( + ['{0}://{1}'.format(protocol, es_host)], + http_auth=(es_user, es_password), + verify_certs=True, + timeout=timeout, + max_retries=max_retries, + retry_on_timeout=retry_on_timeout, + ca_certs = ca_certs + ) + else: + connection = Elasticsearch( + ['{0}://{1}'.format(protocol, es_host)], + http_auth=(es_user, es_password), + timeout=timeout, + max_retries=max_retries, + retry_on_timeout=retry_on_timeout) return connection + except Exception as ex: _logger.error(ex) return None -def get_payloadlog(id, es_conn, start=0, length=50, mode='pandaid', sort='asc', search_string=''): +def get_payloadlog(id, es_conn, index, start=0, length=50, mode='pandaid', sort='asc', search_string=''): """ Get pilot logs from ATLAS ElasticSearch storage """ @@ -73,7 +94,8 @@ def get_payloadlog(id, es_conn, start=0, length=50, mode='pandaid', sort='asc', total = 0 flag_running_job = True end = start + length - s = Search(using=es_conn, index='atlas_pilotlogs*') + + s = Search(using=es_conn, index=index) s = s.source(["@timestamp", "@timestamp_nanoseconds", "level", "message", "PandaJobID", "TaskID", "Harvester_WorkerID", "Harvester_ID"]) @@ -104,7 +126,6 @@ def get_payloadlog(id, es_conn, start=0, length=50, mode='pandaid', sort='asc', return logs_list, flag_running_job, total - def upload_data(es_conn, index_name_base, data, timestamp_param='creationdate', id_param='jeditaskid'): """ Push data to ElasticSearch cluster @@ -186,7 +207,6 @@ def upload_data(es_conn, index_name_base, data, timestamp_param='creationdate', return result - def get_split_rule_info(es_conn, jeditaskid): """ Get split rule entries from ATLAS Elastic @@ -195,7 +215,9 @@ def get_split_rule_info(es_conn, jeditaskid): :return: split rule messagees """ split_rules = [] - s = Search(using=es_conn, index='atlas_jedilogs*') + jedi_logs_index = settings.ES_INDEX_JEDI_LOGS + + s = Search(using=es_conn, index=jedi_logs_index) s = s.source(['@timestamp', 'message']) s = s.filter('term', jediTaskID='{0}'.format(jeditaskid)) q = Q("match", message='change_split_rule') diff --git a/core/libs/task.py b/core/libs/task.py index 7afbe634..146d02b3 100644 --- a/core/libs/task.py +++ b/core/libs/task.py @@ -880,9 +880,11 @@ def get_logs_by_taskid(jeditaskid): tasks_logs = [] - connection = create_es_connection() + es_conn = create_es_connection() - s = Search(using=connection, index='atlas_jedilogs-*') + jedi_logs_index = settings.ES_INDEX_JEDI_LOGS + + s = Search(using=es_conn, index=jedi_logs_index) s = s.filter('term', **{'jediTaskID': jeditaskid}) @@ -900,7 +902,9 @@ def get_logs_by_taskid(jeditaskid): tasks_logs.append({'jediTaskID': jeditaskid, 'logname': type, 'loglevel': levelname, 'lcount': str(levelnames['doc_count'])}) - s = Search(using=connection, index='atlas_pandalogs-*') + panda_logs_index = settings.ES_INDEX_PANDA_LOGS + + s = Search(using=connection, index=panda_logs_index) s = s.filter('term', **{'jediTaskID': jeditaskid}) diff --git a/core/settings/config.py b/core/settings/config.py index 59392fb4..7bdff32f 100644 --- a/core/settings/config.py +++ b/core/settings/config.py @@ -44,6 +44,12 @@ # PanDA server URL PANDA_SERVER_URL = os.environ.get('PANDA_SERVER_URL', 'https://pandaserver.cern.ch/server/panda') +# ElasticSearch +ES_INDEX_PANDA_LOGS = os.environ.get('ES_INDEX_PANDA_LOGS', 'atlas_pandalogs*') +ES_INDEX_JEDI_LOGS = os.environ.get('ES_INDEX_JEDI_LOGS', 'atlas_jedilogs*') +ES_INDEX_PILOT_LOGS = os.environ.get('ES_INDEX_PILOT_LOGS', 'atlas_pilotlogs*') +ES_CA_CERT = os.environ.get('ES_CA_CERT', '/etc/pki/tls/certs/ca-bundle.trust.crt') + # DB_ROUTERS for atlas's prodtask DATABASE_ROUTERS = [ 'core.dbrouter.ProdMonDBRouter', diff --git a/core/views.py b/core/views.py index dba88921..ddf32033 100644 --- a/core/views.py +++ b/core/views.py @@ -6766,11 +6766,13 @@ def esatlasPandaLoggerJson(request): return response if settings.DEPLOYMENT != 'ORACLE_ATLAS': - return HttpResponse('It does not exist for non ATLAS BipPanDA monintoring system', content_type='text/html') + return HttpResponse('It does not exist for non ATLAS BipPanDA monitoring system', content_type='text/html') - connection = create_es_connection() + es_conn = create_es_connection() - s = Search(using=connection, index='atlas_jedilogs-*') + jedi_logs_index = settings.ES_INDEX_JEDI_LOGS + + s = Search(using=es_conn, index=jedi_logs_index) s.aggs.bucket('jediTaskID', 'terms', field='jediTaskID', size=100) \ .bucket('type', 'terms', field='fields.type.keyword') \ @@ -6881,7 +6883,10 @@ def esatlasPandaLogger(request): } jediCat = ['cat1', 'cat2', 'cat3', 'cat4', 'cat5', 'cat6', 'cat7'] - indices = ['atlas_pandalogs-', 'atlas_jedilogs-'] + panda_index = settings.ES_INDEX_PANDA_LOGS[:-1]+'-' + jedi_index = settings.ES_INDEX_JEDI_LOGS[:-1]+'-' + + indices = [panda_index, jedi_index] panda = {} jedi = {} @@ -6895,7 +6900,7 @@ def esatlasPandaLogger(request): res = s.execute() - if index == "atlas_pandalogs-": + if index == panda_index: for cat in pandaCat: panda[cat] = {} for agg in res['aggregations']['logName']['buckets']: @@ -6912,7 +6917,7 @@ def esatlasPandaLogger(request): panda[cat][name][type][levelname] = {} panda[cat][name][type][levelname]['logLevel'] = levelname panda[cat][name][type][levelname]['lcount'] = str(levelnames['doc_count']) - elif index == "atlas_jedilogs-": + elif index == jedi_index: for cat in jediCat: jedi[cat] = {} for agg in res['aggregations']['logName']['buckets']: @@ -8251,7 +8256,11 @@ def initSelfMonitor(request): else: remote = request.META['REMOTE_ADDR'] - urlProto = request.META['wsgi.url_scheme'] + if 'wsgi.url_scheme' in request.META: + urlProto = request.META['wsgi.url_scheme'] + else: + urlProto = 'http' + if 'HTTP_X_FORWARDED_PROTO' in request.META: urlProto = request.META['HTTP_X_FORWARDED_PROTO'] urlProto = str(urlProto) + "://" @@ -8762,7 +8771,9 @@ def getPayloadLog(request): else: search_string = request.POST['search'] - payloadlog, job_running_flag, total = get_payloadlog(id, connection, start=start_var, length=length_var, mode=mode, + pilot_logs_index = settings.ES_INDEX_PILOT_LOGS + + payloadlog, job_running_flag, total = get_payloadlog(id, connection, pilot_logs_index, start=start_var, length=length_var, mode=mode, sort=sort, search_string=search_string) log_content['payloadlog'] = payloadlog