Skip to content

Commit

Permalink
DF/091: make stage working woth pyDKB.storages.
Browse files Browse the repository at this point in the history
  • Loading branch information
mgolosova committed Apr 26, 2019
1 parent ce8290d commit 9066bab
Showing 1 changed file with 21 additions and 55 deletions.
76 changes: 21 additions & 55 deletions Utils/Dataflow/091_datasetsRucio/datasets_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,12 @@
dkb_dir = os.path.join(base_dir, os.pardir)
sys.path.append(dkb_dir)
import pyDKB
from pyDKB import storages
except Exception, err:
sys.stderr.write("(ERROR) Failed to import pyDKB library: %s\n" % err)
sys.exit(1)

try:
import elasticsearch
from elasticsearch.exceptions import NotFoundError
except ImportError, err:
sys.stderr.write("(WARN) Failed to import elasticsearch module: %s\n"
% err)

rucio_client = None
es_client = None

OUTPUT = 'o'
INPUT = 'i'
Expand Down Expand Up @@ -91,9 +84,8 @@ def main(argv):
stage.parse_args(argv)
if stage.ARGS.es:
cfg = read_es_config(stage.ARGS.es)
init_es_client(cfg)
stage.ARGS.es.close()
stage.ARGS.es = cfg
storages.create("ES", storages.storageType.ES, cfg)
if stage.ARGS.ds_type == OUTPUT:
stage.process = process_output_ds
elif stage.ARGS.ds_type == INPUT:
Expand Down Expand Up @@ -136,36 +128,9 @@ def read_es_config(cfg_file):
except KeyError:
sys.stderr.write("(WARN) Unknown configuration parameter: "
"'%s'.\n" % key)

sys.stderr.write("(INFO) Stage 091 configuration (%s):\n"
% cfg_file.name)
key_len = len(max(cfg.keys(), key=len))
pattern = "(INFO) %%-%ds : '%%s'\n" % key_len
sys.stderr.write("(INFO) ---\n")
for key in cfg:
if key.startswith('__'):
continue
sys.stderr.write(pattern % (key, cfg[key]))
sys.stderr.write("(INFO) ---\n")
return cfg


def init_es_client(cfg):
""" Initialize global variable `es_client`.
:param cfg: configuration parameters
:type cfg: dict
"""
global es_client
try:
elasticsearch
except NameError:
sys.stderr.write("(ERROR) Elasticsearch module is not installed.\n")
sys.exit(2)
address = '%s:%s' % (cfg['host'], cfg['port'])
es_client = elasticsearch.Elasticsearch(address)


def init_rucio_client():
""" Initialize global variable `rucio_client`. """
global rucio_client
Expand Down Expand Up @@ -203,7 +168,7 @@ def process_output_ds(stage, message):

for dataset in datasets:
taskid = json_str.get('taskid')
ds = get_output_ds_info(dataset, stage.ARGS.es, taskid)
ds = get_output_ds_info(dataset, taskid)
ds['taskid'] = taskid
if not add_es_index_info(ds):
sys.stderr.write("(WARN) Skip message (not enough info"
Expand All @@ -222,6 +187,10 @@ def process_input_ds(stage, message):
* bytes
* deleted
"""
try:
es = storages.get("ES")
except storages.StorageNotConfigured:
es = None
data = message.content()
mfields = META_FIELDS[INPUT]
ds_name = data.get(SRC_FIELD[INPUT])
Expand All @@ -235,11 +204,10 @@ def process_input_ds(stage, message):
data[mfields['bytes']] = -1
data[mfields['deleted']] = True

if data.get(mfields['deleted']) and stage.ARGS.es:
if data.get(mfields['deleted']) and es:
es_fields = mfields.values()
es_fields.remove(mfields['deleted'])
es_mdata = get_es_metadata(stage.ARGS.es, data.get('taskid'),
mfields.values())
es_mdata = get_es_metadata(data.get('taskid'), mfields.values())
for f in es_fields:
if es_mdata.get(f) and data.get(f) != es_mdata.get(f):
sys.stderr.write("(DEBUG) Update Rucio info with data from ES:"
Expand All @@ -251,7 +219,7 @@ def process_input_ds(stage, message):
return True


def get_output_ds_info(dataset, es=None, parent=None):
def get_output_ds_info(dataset, parent=None):
""" Construct dictionary with dataset info.
Dict format:
Expand All @@ -261,6 +229,10 @@ def get_output_ds_info(dataset, es=None, parent=None):
:param dataset: dataset name
:return: dict
"""
try:
es = storages.get("ES")
except storages.StorageNotConfigured:
es = None
ds_dict = {}
ds_dict['datasetname'] = dataset
try:
Expand All @@ -279,7 +251,7 @@ def get_output_ds_info(dataset, es=None, parent=None):
if ds_dict.get(mfields['deleted']) and es:
es_fields = mfields.values()
es_fields.remove(mfields['deleted'])
es_mdata = get_es_metadata(es, dataset, mfields.values(), parent)
es_mdata = get_es_metadata(dataset, mfields.values(), parent)
for f in es_fields:
if es_mdata.get(f) and ds_dict.get(f) != es_mdata.get(f):
sys.stderr.write("(DEBUG) Update Rucio info with data from ES:"
Expand Down Expand Up @@ -337,20 +309,14 @@ def get_metadata(dsn, attributes=None):
return result


def get_es_metadata(cfg, oid, fields=[], parent=None):
kwargs = {'index': cfg['index'], 'doc_type': '_all'}
kwargs['id'] = oid
if fields is not None:
kwargs['_source'] = fields
if parent:
kwargs['parent'] = parent
def get_es_metadata(oid, fields=[], parent=None):
es = storages.get("ES")
try:
r = es_client.get(**kwargs)
except NotFoundError, err:
sys.stderr.write("(WARN) Failed to get information from ES: id='%s'\n"
% kwargs['id'])
r = es.get(oid, fields, parent=parent)
except storages.exceptions.NotFound, err:
sys.stderr.write("(WARN) %s\n" % err)
return None
return r.get('_source', {})
return r


def adjust_metadata(mdata):
Expand Down

0 comments on commit 9066bab

Please sign in to comment.