Skip to content

Commit

Permalink
Generalize 069-consistency.
Browse files Browse the repository at this point in the history
Check that all fields supplied in input data are present in ES and their
values are matching the input data, instead of working only with tasks and
their timestamps. This will allow checking tasks' other fields as well as
different types of documents such as datasets.

Add stage 016 into consistency chain because it adds the fields required
for getting documents of given type from ES.
  • Loading branch information
Evildoor committed Apr 5, 2019
1 parent 2296f6d commit b8a2ab1
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 36 deletions.
66 changes: 35 additions & 31 deletions Utils/Dataflow/069_upload2es/consistency.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,27 +92,29 @@ def es_connect(cfg):
es = elasticsearch.Elasticsearch([s])


def get_field(index, taskid, field):
''' Get field value by given taskid.
def get_fields(index, _id, _type, fields):
''' Get fields value by given _id and _type.
:param es: elasticsearch client
:type es: elasticsearch.client.Elasticsearch
:param index: index containing tasks
:param index: index to search in
:type index: str
:param taskid: taskid of the task to look for
:type taskid: int or str
:param index: field name
:type index: str
:return: field value, or False if the task was not found
:rtype: int or bool
:param _id: id of the document to look for
:type _id: int or str
:param _type: type of the document to look for
:type _type: str
:param fields: field names
:type fields: list
:return: field values, or False if the document was not found
:rtype: dict or bool
'''
try:
results = es.get(index=index, doc_type='_all', id=taskid,
_source=[field])
results = es.get(index=index, doc_type=_type, id=_id,
_source=fields)
except elasticsearch.exceptions.NotFoundError:
return False
return results['_source'].get(field)
return results['_source']


def process(stage, message):
Expand All @@ -123,34 +125,36 @@ def process(stage, message):
:param stage: stage instance
:type stage: pyDKB.dataflow.stage.ProcessorStage
:param msg: input message with task info
:param msg: input message with document info
:type msg: pyDKB.dataflow.Message
'''
data = message.content()
if type(data) is not dict:
log('Incorrect data:' + str(data), 'INPUT')
return False
taskid = data.get('taskid')
if taskid is None:
log('No taskid in data:' + str(data), 'INPUT')
_id = data.pop('_id')
_type = data.pop('_type')
if _id is None or _type is None:
log('Insufficient ES info in data:' + str(data), 'INPUT')
return False
timestamp = data.get('task_timestamp')
if timestamp is None:
log('No timestamp supplied for taskid ' + str(taskid), 'INPUT')

# Crutch. Remove unwanted (for now) field added by Stage 016.
if 'phys_category' in data:
del data['phys_category']

# Do not check empty documents with valid _id and _type.
# It's unlikely that such documents will be produced in DKB. In general,
# such documents should be checked by es.exists(), and not es.get().
if not data:
log('Nothing to check for document (%s, %d)' % (_type, _id), 'INPUT')
return False

es_timestamp = get_field(INDEX, taskid, 'task_timestamp')
if es_timestamp is None:
log('No timestamp in ES for taskid ' + str(taskid), 'DIFF')
elif not es_timestamp:
log('Taskid %d not found in ES' % taskid, 'DIFF')
elif es_timestamp != timestamp:
log('Taskid %d has timestamp %s in ES, %s in Oracle' % (taskid,
es_timestamp,
timestamp),
'DIFF')
es_data = get_fields(INDEX, _id, _type, data.keys())
if data != es_data:
log('Document (%s, %d) differs between Oracle and ES: Oracle:%s ES:%s'
% (_type, _id, data, es_data), 'DIFF')
else:
log('Taskid %d is up to date in ES' % taskid, 'INFO')
log('Document (%s, %d) is up to date in ES' % (_type, _id), 'INFO')

return True

Expand Down
20 changes: 15 additions & 5 deletions Utils/Dataflow/run/data4es-consistency-check
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,26 @@ get_config() {
done
}

# EOP filter (required due to the unconfigurable EOP marker in pyDKB)
eop_filter() {
sed -e"s/\\x00//g"
}

# Oracle
cfg009=`get_config "consistency009.cfg"`
cmd_009="$base_dir/../009_oracleConnector/Oracle2JSON.py --config $cfg009"
[ -n "$DEBUG" ] &&
err009="009.err" ||
err009=/dev/null

# Formatting
cmd_016="$base_dir/../016_task2es/task2es.py -m s"
[ -n "$DEBUG" ] &&
err016="016.err" ||
err016=/dev/null

# ES
cfg_es=`get_config "es"`
cmd_069="$base_dir/../069_upload2es/consistency.py -m s --conf $cfg_es"

[ -n "$DEBUG" ] &&
err009="009.err" ||
err009=/dev/null

$cmd_009 2>"$err009" | $cmd_069
$cmd_009 2>"$err009" | $cmd_016 2>"$err016" | eop_filter | $cmd_069 >/dev/null

0 comments on commit b8a2ab1

Please sign in to comment.