From 9066bab7e91597adc7bc73f237599a6dcca88a29 Mon Sep 17 00:00:00 2001 From: Marina Golosova Date: Fri, 26 Apr 2019 21:34:26 +0200 Subject: [PATCH] DF/091: make stage working woth pyDKB.storages. --- .../091_datasetsRucio/datasets_processing.py | 76 +++++-------------- 1 file changed, 21 insertions(+), 55 deletions(-) diff --git a/Utils/Dataflow/091_datasetsRucio/datasets_processing.py b/Utils/Dataflow/091_datasetsRucio/datasets_processing.py index b1df46726..e4f9602cb 100755 --- a/Utils/Dataflow/091_datasetsRucio/datasets_processing.py +++ b/Utils/Dataflow/091_datasetsRucio/datasets_processing.py @@ -36,19 +36,12 @@ dkb_dir = os.path.join(base_dir, os.pardir) sys.path.append(dkb_dir) import pyDKB + from pyDKB import storages except Exception, err: sys.stderr.write("(ERROR) Failed to import pyDKB library: %s\n" % err) sys.exit(1) -try: - import elasticsearch - from elasticsearch.exceptions import NotFoundError -except ImportError, err: - sys.stderr.write("(WARN) Failed to import elasticsearch module: %s\n" - % err) - rucio_client = None -es_client = None OUTPUT = 'o' INPUT = 'i' @@ -91,9 +84,8 @@ def main(argv): stage.parse_args(argv) if stage.ARGS.es: cfg = read_es_config(stage.ARGS.es) - init_es_client(cfg) stage.ARGS.es.close() - stage.ARGS.es = cfg + storages.create("ES", storages.storageType.ES, cfg) if stage.ARGS.ds_type == OUTPUT: stage.process = process_output_ds elif stage.ARGS.ds_type == INPUT: @@ -136,36 +128,9 @@ def read_es_config(cfg_file): except KeyError: sys.stderr.write("(WARN) Unknown configuration parameter: " "'%s'.\n" % key) - - sys.stderr.write("(INFO) Stage 091 configuration (%s):\n" - % cfg_file.name) - key_len = len(max(cfg.keys(), key=len)) - pattern = "(INFO) %%-%ds : '%%s'\n" % key_len - sys.stderr.write("(INFO) ---\n") - for key in cfg: - if key.startswith('__'): - continue - sys.stderr.write(pattern % (key, cfg[key])) - sys.stderr.write("(INFO) ---\n") return cfg -def init_es_client(cfg): - """ Initialize global variable `es_client`. - - :param cfg: configuration parameters - :type cfg: dict - """ - global es_client - try: - elasticsearch - except NameError: - sys.stderr.write("(ERROR) Elasticsearch module is not installed.\n") - sys.exit(2) - address = '%s:%s' % (cfg['host'], cfg['port']) - es_client = elasticsearch.Elasticsearch(address) - - def init_rucio_client(): """ Initialize global variable `rucio_client`. """ global rucio_client @@ -203,7 +168,7 @@ def process_output_ds(stage, message): for dataset in datasets: taskid = json_str.get('taskid') - ds = get_output_ds_info(dataset, stage.ARGS.es, taskid) + ds = get_output_ds_info(dataset, taskid) ds['taskid'] = taskid if not add_es_index_info(ds): sys.stderr.write("(WARN) Skip message (not enough info" @@ -222,6 +187,10 @@ def process_input_ds(stage, message): * bytes * deleted """ + try: + es = storages.get("ES") + except storages.StorageNotConfigured: + es = None data = message.content() mfields = META_FIELDS[INPUT] ds_name = data.get(SRC_FIELD[INPUT]) @@ -235,11 +204,10 @@ def process_input_ds(stage, message): data[mfields['bytes']] = -1 data[mfields['deleted']] = True - if data.get(mfields['deleted']) and stage.ARGS.es: + if data.get(mfields['deleted']) and es: es_fields = mfields.values() es_fields.remove(mfields['deleted']) - es_mdata = get_es_metadata(stage.ARGS.es, data.get('taskid'), - mfields.values()) + es_mdata = get_es_metadata(data.get('taskid'), mfields.values()) for f in es_fields: if es_mdata.get(f) and data.get(f) != es_mdata.get(f): sys.stderr.write("(DEBUG) Update Rucio info with data from ES:" @@ -251,7 +219,7 @@ def process_input_ds(stage, message): return True -def get_output_ds_info(dataset, es=None, parent=None): +def get_output_ds_info(dataset, parent=None): """ Construct dictionary with dataset info. Dict format: @@ -261,6 +229,10 @@ def get_output_ds_info(dataset, es=None, parent=None): :param dataset: dataset name :return: dict """ + try: + es = storages.get("ES") + except storages.StorageNotConfigured: + es = None ds_dict = {} ds_dict['datasetname'] = dataset try: @@ -279,7 +251,7 @@ def get_output_ds_info(dataset, es=None, parent=None): if ds_dict.get(mfields['deleted']) and es: es_fields = mfields.values() es_fields.remove(mfields['deleted']) - es_mdata = get_es_metadata(es, dataset, mfields.values(), parent) + es_mdata = get_es_metadata(dataset, mfields.values(), parent) for f in es_fields: if es_mdata.get(f) and ds_dict.get(f) != es_mdata.get(f): sys.stderr.write("(DEBUG) Update Rucio info with data from ES:" @@ -337,20 +309,14 @@ def get_metadata(dsn, attributes=None): return result -def get_es_metadata(cfg, oid, fields=[], parent=None): - kwargs = {'index': cfg['index'], 'doc_type': '_all'} - kwargs['id'] = oid - if fields is not None: - kwargs['_source'] = fields - if parent: - kwargs['parent'] = parent +def get_es_metadata(oid, fields=[], parent=None): + es = storages.get("ES") try: - r = es_client.get(**kwargs) - except NotFoundError, err: - sys.stderr.write("(WARN) Failed to get information from ES: id='%s'\n" - % kwargs['id']) + r = es.get(oid, fields, parent=parent) + except storages.exceptions.NotFound, err: + sys.stderr.write("(WARN) %s\n" % err) return None - return r.get('_source', {}) + return r def adjust_metadata(mdata):