From 75a0ec5481b794c125a2f8fc8b548987958d9c4d Mon Sep 17 00:00:00 2001 From: Marina Golosova Date: Thu, 21 Mar 2019 16:50:00 +0100 Subject: [PATCH 1/8] DF/91: allow stage to use DKB ES storage as second (backup) source. If `--es-config FILE` parameter is specified, use DKB ES storage as a backup metadata source in case that in primary source (Rucio) information was removed. Information from Rucio is removed together with the dataset, so if Rucio says that dataset is deleted (or not found), the stage can query internat DKB storage (ES) to get missed information. This functionality is mostly required for "archived" data reprocessing (when we change the ETL process or recover missed records). --- .../091_datasetsRucio/datasets_processing.py | 126 +++++++++++++++++- 1 file changed, 122 insertions(+), 4 deletions(-) diff --git a/Utils/Dataflow/091_datasetsRucio/datasets_processing.py b/Utils/Dataflow/091_datasetsRucio/datasets_processing.py index 0daa12948..b1df46726 100755 --- a/Utils/Dataflow/091_datasetsRucio/datasets_processing.py +++ b/Utils/Dataflow/091_datasetsRucio/datasets_processing.py @@ -13,6 +13,7 @@ import sys import os +import argparse base_dir = os.path.abspath(os.path.dirname(__file__)) @@ -39,7 +40,15 @@ sys.stderr.write("(ERROR) Failed to import pyDKB library: %s\n" % err) sys.exit(1) +try: + import elasticsearch + from elasticsearch.exceptions import NotFoundError +except ImportError, err: + sys.stderr.write("(WARN) Failed to import elasticsearch module: %s\n" + % err) + rucio_client = None +es_client = None OUTPUT = 'o' INPUT = 'i' @@ -69,9 +78,22 @@ def main(argv): choices=[INPUT, OUTPUT], dest='ds_type' ) + stage.add_argument('--es-config', action='store', + type=argparse.FileType('r'), + help=u'Use ES as a backup source for dataset info' + ' in order to save information even if it was' + ' removed from the original source', + nargs='?', + dest='es' + ) exit_code = 0 try: stage.parse_args(argv) + if stage.ARGS.es: + cfg = read_es_config(stage.ARGS.es) + init_es_client(cfg) + stage.ARGS.es.close() + stage.ARGS.es = cfg if stage.ARGS.ds_type == OUTPUT: stage.process = process_output_ds elif stage.ARGS.ds_type == INPUT: @@ -89,6 +111,61 @@ def main(argv): sys.exit(exit_code) +def read_es_config(cfg_file): + """ Read ES configuration file. + + :param cfg_file: open file descriptor with ES access configuration + :type cfg_file: file descriptor + """ + keys = {'ES_HOST': 'host', + 'ES_PORT': 'port', + 'ES_USER': 'user', + 'ES_PASSWORD': '__passwd', + 'ES_INDEX': 'index' + } + cfg = {} + for line in cfg_file.readlines(): + if line.strip().startswith('#'): + continue + line = line.split('#')[0].strip() + if '=' not in line: + continue + key, val = line.split('=')[:2] + try: + cfg[keys[key]] = val + except KeyError: + sys.stderr.write("(WARN) Unknown configuration parameter: " + "'%s'.\n" % key) + + sys.stderr.write("(INFO) Stage 091 configuration (%s):\n" + % cfg_file.name) + key_len = len(max(cfg.keys(), key=len)) + pattern = "(INFO) %%-%ds : '%%s'\n" % key_len + sys.stderr.write("(INFO) ---\n") + for key in cfg: + if key.startswith('__'): + continue + sys.stderr.write(pattern % (key, cfg[key])) + sys.stderr.write("(INFO) ---\n") + return cfg + + +def init_es_client(cfg): + """ Initialize global variable `es_client`. + + :param cfg: configuration parameters + :type cfg: dict + """ + global es_client + try: + elasticsearch + except NameError: + sys.stderr.write("(ERROR) Elasticsearch module is not installed.\n") + sys.exit(2) + address = '%s:%s' % (cfg['host'], cfg['port']) + es_client = elasticsearch.Elasticsearch(address) + + def init_rucio_client(): """ Initialize global variable `rucio_client`. """ global rucio_client @@ -125,8 +202,9 @@ def process_output_ds(stage, message): datasets = [datasets] for dataset in datasets: - ds = get_output_ds_info(dataset) - ds['taskid'] = json_str.get('taskid') + taskid = json_str.get('taskid') + ds = get_output_ds_info(dataset, stage.ARGS.es, taskid) + ds['taskid'] = taskid if not add_es_index_info(ds): sys.stderr.write("(WARN) Skip message (not enough info" " for ES indexing).\n") @@ -156,12 +234,24 @@ def process_input_ds(stage, message): except RucioException: data[mfields['bytes']] = -1 data[mfields['deleted']] = True - stage.output(pyDKB.dataflow.messages.JSONMessage(data)) + if data.get(mfields['deleted']) and stage.ARGS.es: + es_fields = mfields.values() + es_fields.remove(mfields['deleted']) + es_mdata = get_es_metadata(stage.ARGS.es, data.get('taskid'), + mfields.values()) + for f in es_fields: + if es_mdata.get(f) and data.get(f) != es_mdata.get(f): + sys.stderr.write("(DEBUG) Update Rucio info with data from ES:" + " %s = '%s' (was: '%s')\n" % (f, es_mdata[f], + data.get(f))) + data[f] = es_mdata[f] + + stage.output(pyDKB.dataflow.messages.JSONMessage(data)) return True -def get_output_ds_info(dataset): +def get_output_ds_info(dataset, es=None, parent=None): """ Construct dictionary with dataset info. Dict format: @@ -185,6 +275,18 @@ def get_output_ds_info(dataset): # the length of file is set to -1 ds_dict[mfields['bytes']] = -1 ds_dict[mfields['deleted']] = True + + if ds_dict.get(mfields['deleted']) and es: + es_fields = mfields.values() + es_fields.remove(mfields['deleted']) + es_mdata = get_es_metadata(es, dataset, mfields.values(), parent) + for f in es_fields: + if es_mdata.get(f) and ds_dict.get(f) != es_mdata.get(f): + sys.stderr.write("(DEBUG) Update Rucio info with data from ES:" + " %s = '%s' (was: '%s')\n" % (f, es_mdata[f], + ds_dict.get(f))) + ds_dict[f] = es_mdata[f] + return ds_dict @@ -235,6 +337,22 @@ def get_metadata(dsn, attributes=None): return result +def get_es_metadata(cfg, oid, fields=[], parent=None): + kwargs = {'index': cfg['index'], 'doc_type': '_all'} + kwargs['id'] = oid + if fields is not None: + kwargs['_source'] = fields + if parent: + kwargs['parent'] = parent + try: + r = es_client.get(**kwargs) + except NotFoundError, err: + sys.stderr.write("(WARN) Failed to get information from ES: id='%s'\n" + % kwargs['id']) + return None + return r.get('_source', {}) + + def adjust_metadata(mdata): """ Update metadata taken from Rucio with values required to proceed. """ if not mdata: From 6ce0facceea4a94a940a69642f391d8211156c44 Mon Sep 17 00:00:00 2001 From: Marina Golosova Date: Fri, 26 Apr 2019 20:15:33 +0200 Subject: [PATCH 2/8] pyDKB/storages: add possibility to log storage configuration. Configuration parameters, started with '__', won't be logged. --- Utils/Dataflow/pyDKB/storages/Storage.py | 19 +++++++++++++++++++ Utils/Dataflow/pyDKB/storages/es.py | 1 + 2 files changed, 20 insertions(+) diff --git a/Utils/Dataflow/pyDKB/storages/Storage.py b/Utils/Dataflow/pyDKB/storages/Storage.py index cdb05621a..e3a030a75 100644 --- a/Utils/Dataflow/pyDKB/storages/Storage.py +++ b/Utils/Dataflow/pyDKB/storages/Storage.py @@ -44,6 +44,25 @@ def log(self, level, message): self.__class__.__name__, message)) + def log_cfg(self, cfg): + """ Log storage configuration. + + :param cfg: configuration to be logged + :type cfg: dict + """ + fname = '' + if cfg.get('__file'): + fname = ' (%s)' % cfg['__file'] + self.log("INFO", "'%s' storage configuration%s:" % (self.name, fname)) + key_len = len(max(cfg.keys(), key=len)) + pattern = "%%-%ds : '%%s'" % key_len + self.log("INFO", "---") + for key in cfg: + if key.startswith('__'): + continue + self.log("INFO", pattern % (key, cfg[key])) + self.log("INFO", "---") + def configure(self, cfg): """ Apply storage configuration (initialize client). diff --git a/Utils/Dataflow/pyDKB/storages/es.py b/Utils/Dataflow/pyDKB/storages/es.py index 69637ed06..57e833791 100644 --- a/Utils/Dataflow/pyDKB/storages/es.py +++ b/Utils/Dataflow/pyDKB/storages/es.py @@ -59,6 +59,7 @@ def configure(self, cfg): :param cfg: configuration parameters :type cfg: dict """ + self.log_cfg(cfg) hosts = [{'host': cfg.get('host', DEFAULT_CFG['host']), 'port': cfg.get('port', DEFAULT_CFG['port'])}] kwargs = {} From 3537f087726e7bd4f973c2ec59c2ffa9858b142c Mon Sep 17 00:00:00 2001 From: Marina Golosova Date: Fri, 26 Apr 2019 20:16:50 +0200 Subject: [PATCH 3/8] pyDKB/storages: hide password in ES storage logs. --- Utils/Dataflow/pyDKB/storages/es.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/Utils/Dataflow/pyDKB/storages/es.py b/Utils/Dataflow/pyDKB/storages/es.py index 57e833791..dbd1a6c70 100644 --- a/Utils/Dataflow/pyDKB/storages/es.py +++ b/Utils/Dataflow/pyDKB/storages/es.py @@ -50,11 +50,11 @@ def configure(self, cfg): """ Configure ES client. Configuration parameters: - host (str: '127.0.0.1') - port (str: '9200') - index (str) - user (str) - passwd (str) + host (str: '127.0.0.1') + port (str: '9200') + index (str) + user (str) + __passwd (str) :param cfg: configuration parameters :type cfg: dict @@ -64,7 +64,7 @@ def configure(self, cfg): 'port': cfg.get('port', DEFAULT_CFG['port'])}] kwargs = {} if cfg.get('user'): - kwargs['http_auth'] = '%(user)s:%(passwd)s' % cfg + kwargs['http_auth'] = '%(user)s:%(__passwd)s' % cfg if cfg.get('index'): self.index = cfg['index'] self.c = elasticsearch.Elasticsearch(hosts, **kwargs) From 0a3a321b5064eb36f2ba7e82bf965d63e70c2afe Mon Sep 17 00:00:00 2001 From: Marina Golosova Date: Fri, 26 Apr 2019 20:37:07 +0200 Subject: [PATCH 4/8] pyDKB/storages: improve configuration logging (add default parameters). Without it we won't see in the log those parameters that have default values, if they are missed in the user passed config. --- Utils/Dataflow/pyDKB/storages/Storage.py | 11 +++++++---- Utils/Dataflow/pyDKB/storages/es.py | 2 +- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/Utils/Dataflow/pyDKB/storages/Storage.py b/Utils/Dataflow/pyDKB/storages/Storage.py index e3a030a75..1c290e4e0 100644 --- a/Utils/Dataflow/pyDKB/storages/Storage.py +++ b/Utils/Dataflow/pyDKB/storages/Storage.py @@ -44,23 +44,26 @@ def log(self, level, message): self.__class__.__name__, message)) - def log_cfg(self, cfg): + def log_cfg(self, cfg, defaults={}): """ Log storage configuration. :param cfg: configuration to be logged :type cfg: dict + :param defaults: default parameter values, if any (will be logged only + if the parameter is missed in ``cfg``) + :type defaults: dict """ fname = '' if cfg.get('__file'): fname = ' (%s)' % cfg['__file'] self.log("INFO", "'%s' storage configuration%s:" % (self.name, fname)) - key_len = len(max(cfg.keys(), key=len)) + key_len = len(max(cfg.keys() + defaults.keys(), key=len)) pattern = "%%-%ds : '%%s'" % key_len self.log("INFO", "---") - for key in cfg: + for key in set(cfg.keys() + defaults.keys()): if key.startswith('__'): continue - self.log("INFO", pattern % (key, cfg[key])) + self.log("INFO", pattern % (key, cfg.get(key, defaults.get(key)))) self.log("INFO", "---") def configure(self, cfg): diff --git a/Utils/Dataflow/pyDKB/storages/es.py b/Utils/Dataflow/pyDKB/storages/es.py index dbd1a6c70..907e7167f 100644 --- a/Utils/Dataflow/pyDKB/storages/es.py +++ b/Utils/Dataflow/pyDKB/storages/es.py @@ -59,7 +59,7 @@ def configure(self, cfg): :param cfg: configuration parameters :type cfg: dict """ - self.log_cfg(cfg) + self.log_cfg(cfg, DEFAULT_CFG) hosts = [{'host': cfg.get('host', DEFAULT_CFG['host']), 'port': cfg.get('port', DEFAULT_CFG['port'])}] kwargs = {} From 8f4370f6b21a98587bfa449d8fe0d2a529e65d70 Mon Sep 17 00:00:00 2001 From: Marina Golosova Date: Fri, 26 Apr 2019 21:34:26 +0200 Subject: [PATCH 5/8] DF/091: make stage working woth pyDKB.storages. --- .../091_datasetsRucio/datasets_processing.py | 76 +++++-------------- 1 file changed, 21 insertions(+), 55 deletions(-) diff --git a/Utils/Dataflow/091_datasetsRucio/datasets_processing.py b/Utils/Dataflow/091_datasetsRucio/datasets_processing.py index b1df46726..e4f9602cb 100755 --- a/Utils/Dataflow/091_datasetsRucio/datasets_processing.py +++ b/Utils/Dataflow/091_datasetsRucio/datasets_processing.py @@ -36,19 +36,12 @@ dkb_dir = os.path.join(base_dir, os.pardir) sys.path.append(dkb_dir) import pyDKB + from pyDKB import storages except Exception, err: sys.stderr.write("(ERROR) Failed to import pyDKB library: %s\n" % err) sys.exit(1) -try: - import elasticsearch - from elasticsearch.exceptions import NotFoundError -except ImportError, err: - sys.stderr.write("(WARN) Failed to import elasticsearch module: %s\n" - % err) - rucio_client = None -es_client = None OUTPUT = 'o' INPUT = 'i' @@ -91,9 +84,8 @@ def main(argv): stage.parse_args(argv) if stage.ARGS.es: cfg = read_es_config(stage.ARGS.es) - init_es_client(cfg) stage.ARGS.es.close() - stage.ARGS.es = cfg + storages.create("ES", storages.storageType.ES, cfg) if stage.ARGS.ds_type == OUTPUT: stage.process = process_output_ds elif stage.ARGS.ds_type == INPUT: @@ -136,36 +128,9 @@ def read_es_config(cfg_file): except KeyError: sys.stderr.write("(WARN) Unknown configuration parameter: " "'%s'.\n" % key) - - sys.stderr.write("(INFO) Stage 091 configuration (%s):\n" - % cfg_file.name) - key_len = len(max(cfg.keys(), key=len)) - pattern = "(INFO) %%-%ds : '%%s'\n" % key_len - sys.stderr.write("(INFO) ---\n") - for key in cfg: - if key.startswith('__'): - continue - sys.stderr.write(pattern % (key, cfg[key])) - sys.stderr.write("(INFO) ---\n") return cfg -def init_es_client(cfg): - """ Initialize global variable `es_client`. - - :param cfg: configuration parameters - :type cfg: dict - """ - global es_client - try: - elasticsearch - except NameError: - sys.stderr.write("(ERROR) Elasticsearch module is not installed.\n") - sys.exit(2) - address = '%s:%s' % (cfg['host'], cfg['port']) - es_client = elasticsearch.Elasticsearch(address) - - def init_rucio_client(): """ Initialize global variable `rucio_client`. """ global rucio_client @@ -203,7 +168,7 @@ def process_output_ds(stage, message): for dataset in datasets: taskid = json_str.get('taskid') - ds = get_output_ds_info(dataset, stage.ARGS.es, taskid) + ds = get_output_ds_info(dataset, taskid) ds['taskid'] = taskid if not add_es_index_info(ds): sys.stderr.write("(WARN) Skip message (not enough info" @@ -222,6 +187,10 @@ def process_input_ds(stage, message): * bytes * deleted """ + try: + es = storages.get("ES") + except storages.StorageNotConfigured: + es = None data = message.content() mfields = META_FIELDS[INPUT] ds_name = data.get(SRC_FIELD[INPUT]) @@ -235,11 +204,10 @@ def process_input_ds(stage, message): data[mfields['bytes']] = -1 data[mfields['deleted']] = True - if data.get(mfields['deleted']) and stage.ARGS.es: + if data.get(mfields['deleted']) and es: es_fields = mfields.values() es_fields.remove(mfields['deleted']) - es_mdata = get_es_metadata(stage.ARGS.es, data.get('taskid'), - mfields.values()) + es_mdata = get_es_metadata(data.get('taskid'), mfields.values()) for f in es_fields: if es_mdata.get(f) and data.get(f) != es_mdata.get(f): sys.stderr.write("(DEBUG) Update Rucio info with data from ES:" @@ -251,7 +219,7 @@ def process_input_ds(stage, message): return True -def get_output_ds_info(dataset, es=None, parent=None): +def get_output_ds_info(dataset, parent=None): """ Construct dictionary with dataset info. Dict format: @@ -261,6 +229,10 @@ def get_output_ds_info(dataset, es=None, parent=None): :param dataset: dataset name :return: dict """ + try: + es = storages.get("ES") + except storages.StorageNotConfigured: + es = None ds_dict = {} ds_dict['datasetname'] = dataset try: @@ -279,7 +251,7 @@ def get_output_ds_info(dataset, es=None, parent=None): if ds_dict.get(mfields['deleted']) and es: es_fields = mfields.values() es_fields.remove(mfields['deleted']) - es_mdata = get_es_metadata(es, dataset, mfields.values(), parent) + es_mdata = get_es_metadata(dataset, mfields.values(), parent) for f in es_fields: if es_mdata.get(f) and ds_dict.get(f) != es_mdata.get(f): sys.stderr.write("(DEBUG) Update Rucio info with data from ES:" @@ -337,20 +309,14 @@ def get_metadata(dsn, attributes=None): return result -def get_es_metadata(cfg, oid, fields=[], parent=None): - kwargs = {'index': cfg['index'], 'doc_type': '_all'} - kwargs['id'] = oid - if fields is not None: - kwargs['_source'] = fields - if parent: - kwargs['parent'] = parent +def get_es_metadata(oid, fields=[], parent=None): + es = storages.get("ES") try: - r = es_client.get(**kwargs) - except NotFoundError, err: - sys.stderr.write("(WARN) Failed to get information from ES: id='%s'\n" - % kwargs['id']) + r = es.get(oid, fields, parent=parent) + except storages.exceptions.NotFound, err: + sys.stderr.write("(WARN) %s\n" % err) return None - return r.get('_source', {}) + return r def adjust_metadata(mdata): From 2734099679f7594cfe6baa96c433d5e0296f99b0 Mon Sep 17 00:00:00 2001 From: Marina Golosova Date: Fri, 26 Apr 2019 22:25:36 +0200 Subject: [PATCH 6/8] DF/091: refactoring (make code more DRY). Move metadata "update" with info from ES to a separate function. --- .../091_datasetsRucio/datasets_processing.py | 58 ++++++++++--------- 1 file changed, 31 insertions(+), 27 deletions(-) diff --git a/Utils/Dataflow/091_datasetsRucio/datasets_processing.py b/Utils/Dataflow/091_datasetsRucio/datasets_processing.py index e4f9602cb..49a017bdd 100755 --- a/Utils/Dataflow/091_datasetsRucio/datasets_processing.py +++ b/Utils/Dataflow/091_datasetsRucio/datasets_processing.py @@ -168,12 +168,13 @@ def process_output_ds(stage, message): for dataset in datasets: taskid = json_str.get('taskid') - ds = get_output_ds_info(dataset, taskid) + ds = get_output_ds_info(dataset) ds['taskid'] = taskid if not add_es_index_info(ds): sys.stderr.write("(WARN) Skip message (not enough info" " for ES indexing).\n") return True + fix_ds_info(ds) del(ds['taskid']) stage.output(pyDKB.dataflow.messages.JSONMessage(ds)) @@ -187,10 +188,6 @@ def process_input_ds(stage, message): * bytes * deleted """ - try: - es = storages.get("ES") - except storages.StorageNotConfigured: - es = None data = message.content() mfields = META_FIELDS[INPUT] ds_name = data.get(SRC_FIELD[INPUT]) @@ -204,22 +201,13 @@ def process_input_ds(stage, message): data[mfields['bytes']] = -1 data[mfields['deleted']] = True - if data.get(mfields['deleted']) and es: - es_fields = mfields.values() - es_fields.remove(mfields['deleted']) - es_mdata = get_es_metadata(data.get('taskid'), mfields.values()) - for f in es_fields: - if es_mdata.get(f) and data.get(f) != es_mdata.get(f): - sys.stderr.write("(DEBUG) Update Rucio info with data from ES:" - " %s = '%s' (was: '%s')\n" % (f, es_mdata[f], - data.get(f))) - data[f] = es_mdata[f] + fix_ds_info(data) stage.output(pyDKB.dataflow.messages.JSONMessage(data)) return True -def get_output_ds_info(dataset, parent=None): +def get_output_ds_info(dataset): """ Construct dictionary with dataset info. Dict format: @@ -229,10 +217,6 @@ def get_output_ds_info(dataset, parent=None): :param dataset: dataset name :return: dict """ - try: - es = storages.get("ES") - except storages.StorageNotConfigured: - es = None ds_dict = {} ds_dict['datasetname'] = dataset try: @@ -248,18 +232,38 @@ def get_output_ds_info(dataset, parent=None): ds_dict[mfields['bytes']] = -1 ds_dict[mfields['deleted']] = True - if ds_dict.get(mfields['deleted']) and es: + return ds_dict + + +def fix_ds_info(data): + """ Fix dataset metadata with data from ES, if needed and possible. + + :param data: data + :type data: dict + """ + try: + es = storages.get("ES") + except storages.StorageNotConfigured: + # ES configuration was not passed to the stage + return None + if data.get('_type') == 'output_dataset': + mfields = META_FIELDS[OUTPUT] + oid = data.get('_id') + else: + mfields = META_FIELDS[INPUT] + oid = data.get('taskid') + + if data.get(mfields['deleted']) and es: es_fields = mfields.values() es_fields.remove(mfields['deleted']) - es_mdata = get_es_metadata(dataset, mfields.values(), parent) + es_mdata = get_es_metadata(oid, mfields.values(), data.get('_parent')) for f in es_fields: - if es_mdata.get(f) and ds_dict.get(f) != es_mdata.get(f): + if es_mdata.get(f) and data.get(f) != es_mdata.get(f): sys.stderr.write("(DEBUG) Update Rucio info with data from ES:" " %s = '%s' (was: '%s')\n" % (f, es_mdata[f], - ds_dict.get(f))) - ds_dict[f] = es_mdata[f] - - return ds_dict + data.get(f))) + data[f] = es_mdata[f] + return True def extract_scope(dsn): From d39022df3b535b00b249972dbe1103a78bfc20ec Mon Sep 17 00:00:00 2001 From: Marina Golosova Date: Fri, 26 Apr 2019 22:42:25 +0200 Subject: [PATCH 7/8] Fix codestyle. --- Utils/Dataflow/091_datasetsRucio/datasets_processing.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Utils/Dataflow/091_datasetsRucio/datasets_processing.py b/Utils/Dataflow/091_datasetsRucio/datasets_processing.py index 49a017bdd..6990851b3 100755 --- a/Utils/Dataflow/091_datasetsRucio/datasets_processing.py +++ b/Utils/Dataflow/091_datasetsRucio/datasets_processing.py @@ -74,8 +74,8 @@ def main(argv): stage.add_argument('--es-config', action='store', type=argparse.FileType('r'), help=u'Use ES as a backup source for dataset info' - ' in order to save information even if it was' - ' removed from the original source', + ' in order to save information even if it was' + ' removed from the original source', nargs='?', dest='es' ) From 18781e4db6d15155b8d66eda175c62fedeaab429 Mon Sep 17 00:00:00 2001 From: Marina Golosova Date: Sun, 28 Apr 2019 00:08:50 +0200 Subject: [PATCH 8/8] DF/91: unify return types of `get_es_metadata()`. --- Utils/Dataflow/091_datasetsRucio/datasets_processing.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Utils/Dataflow/091_datasetsRucio/datasets_processing.py b/Utils/Dataflow/091_datasetsRucio/datasets_processing.py index 6990851b3..adf69b7ea 100755 --- a/Utils/Dataflow/091_datasetsRucio/datasets_processing.py +++ b/Utils/Dataflow/091_datasetsRucio/datasets_processing.py @@ -315,11 +315,11 @@ def get_metadata(dsn, attributes=None): def get_es_metadata(oid, fields=[], parent=None): es = storages.get("ES") + r = {} try: r = es.get(oid, fields, parent=parent) except storages.exceptions.NotFound, err: sys.stderr.write("(WARN) %s\n" % err) - return None return r