Skip to content

Commit

Permalink
DF/091: refactoring (make code more DRY).
Browse files Browse the repository at this point in the history
Move metadata "update" with info from ES to a separate function.
  • Loading branch information
mgolosova committed Apr 26, 2019
1 parent 9066bab commit 8da382d
Showing 1 changed file with 31 additions and 27 deletions.
58 changes: 31 additions & 27 deletions Utils/Dataflow/091_datasetsRucio/datasets_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,12 +168,13 @@ def process_output_ds(stage, message):

for dataset in datasets:
taskid = json_str.get('taskid')
ds = get_output_ds_info(dataset, taskid)
ds = get_output_ds_info(dataset)
ds['taskid'] = taskid
if not add_es_index_info(ds):
sys.stderr.write("(WARN) Skip message (not enough info"
" for ES indexing).\n")
return True
fix_ds_info(ds)
del(ds['taskid'])
stage.output(pyDKB.dataflow.messages.JSONMessage(ds))

Expand All @@ -187,10 +188,6 @@ def process_input_ds(stage, message):
* bytes
* deleted
"""
try:
es = storages.get("ES")
except storages.StorageNotConfigured:
es = None
data = message.content()
mfields = META_FIELDS[INPUT]
ds_name = data.get(SRC_FIELD[INPUT])
Expand All @@ -204,22 +201,13 @@ def process_input_ds(stage, message):
data[mfields['bytes']] = -1
data[mfields['deleted']] = True

if data.get(mfields['deleted']) and es:
es_fields = mfields.values()
es_fields.remove(mfields['deleted'])
es_mdata = get_es_metadata(data.get('taskid'), mfields.values())
for f in es_fields:
if es_mdata.get(f) and data.get(f) != es_mdata.get(f):
sys.stderr.write("(DEBUG) Update Rucio info with data from ES:"
" %s = '%s' (was: '%s')\n" % (f, es_mdata[f],
data.get(f)))
data[f] = es_mdata[f]
fix_ds_info(data)

stage.output(pyDKB.dataflow.messages.JSONMessage(data))
return True


def get_output_ds_info(dataset, parent=None):
def get_output_ds_info(dataset):
""" Construct dictionary with dataset info.
Dict format:
Expand All @@ -229,10 +217,6 @@ def get_output_ds_info(dataset, parent=None):
:param dataset: dataset name
:return: dict
"""
try:
es = storages.get("ES")
except storages.StorageNotConfigured:
es = None
ds_dict = {}
ds_dict['datasetname'] = dataset
try:
Expand All @@ -248,18 +232,38 @@ def get_output_ds_info(dataset, parent=None):
ds_dict[mfields['bytes']] = -1
ds_dict[mfields['deleted']] = True

if ds_dict.get(mfields['deleted']) and es:
return ds_dict


def fix_ds_info(data):
""" Fix dataset metadata with data from ES, if needed and possible.
:param data: data
:type data: dict
"""
try:
es = storages.get("ES")
except storages.StorageNotConfigured:
# ES configuration was not passed to the stage
return None
if data.get('_type') == 'output_dataset':
mfields = META_FIELDS[OUTPUT]
oid = data.get('_id')
else:
mfields = META_FIELDS[INPUT]
oid = data.get('taskid')

if data.get(mfields['deleted']) and es:
es_fields = mfields.values()
es_fields.remove(mfields['deleted'])
es_mdata = get_es_metadata(dataset, mfields.values(), parent)
es_mdata = get_es_metadata(oid, mfields.values(), data.get('_parent'))
for f in es_fields:
if es_mdata.get(f) and ds_dict.get(f) != es_mdata.get(f):
if es_mdata.get(f) and data.get(f) != es_mdata.get(f):
sys.stderr.write("(DEBUG) Update Rucio info with data from ES:"
" %s = '%s' (was: '%s')\n" % (f, es_mdata[f],
ds_dict.get(f)))
ds_dict[f] = es_mdata[f]

return ds_dict
data.get(f)))
data[f] = es_mdata[f]
return True


def extract_scope(dsn):
Expand Down

0 comments on commit 8da382d

Please sign in to comment.