From 57159c1da03928f4a0b65b5579d22343abe6d05e Mon Sep 17 00:00:00 2001 From: Evildoor Date: Thu, 4 Apr 2019 13:43:52 +0200 Subject: [PATCH] Generalize 069-consistency. Check that all fields supplied in input data are present in ES and their values are matching the input data, instead of working only with tasks and their timestamps. This will allow checking tasks' other fields as well as different types of documents such as datasets. Add stage 016 into consistency chain because it adds the fields required for getting documents of given type from ES. --- Utils/Dataflow/069_upload2es/consistency.py | 61 ++++++++++---------- Utils/Dataflow/run/data4es-consistency-check | 20 +++++-- 2 files changed, 44 insertions(+), 37 deletions(-) diff --git a/Utils/Dataflow/069_upload2es/consistency.py b/Utils/Dataflow/069_upload2es/consistency.py index 2215195c1..92ea0696a 100755 --- a/Utils/Dataflow/069_upload2es/consistency.py +++ b/Utils/Dataflow/069_upload2es/consistency.py @@ -92,27 +92,29 @@ def es_connect(cfg): es = elasticsearch.Elasticsearch([s]) -def get_field(index, taskid, field): - ''' Get field value by given taskid. +def get_fields(index, _id, _type, fields): + ''' Get fields value by given _id and _type. :param es: elasticsearch client :type es: elasticsearch.client.Elasticsearch - :param index: index containing tasks + :param index: index to search in :type index: str - :param taskid: taskid of the task to look for - :type taskid: int or str - :param index: field name - :type index: str - - :return: field value, or False if the task was not found - :rtype: int or bool + :param _id: id of the document to look for + :type _id: int or str + :param _type: type of the document to look for + :type _type: str + :param fields: field names + :type fields: list + + :return: field values, or False if the document was not found + :rtype: dict or bool ''' try: - results = es.get(index=index, doc_type='_all', id=taskid, - _source=[field]) + results = es.get(index=index, doc_type=_type, id=_id, + _source=fields) except elasticsearch.exceptions.NotFoundError: return False - return results['_source'].get(field) + return results['_source'] def process(stage, message): @@ -123,34 +125,29 @@ def process(stage, message): :param stage: stage instance :type stage: pyDKB.dataflow.stage.ProcessorStage - :param msg: input message with task info + :param msg: input message with document info :type msg: pyDKB.dataflow.Message ''' data = message.content() if type(data) is not dict: log('Incorrect data:' + str(data), 'INPUT') return False - taskid = data.get('taskid') - if taskid is None: - log('No taskid in data:' + str(data), 'INPUT') - return False - timestamp = data.get('task_timestamp') - if timestamp is None: - log('No timestamp supplied for taskid ' + str(taskid), 'INPUT') + _id = data.pop('_id') + _type = data.pop('_type') + if _id is None or _type is None: + log('Insufficient ES info in data:' + str(data), 'INPUT') return False - es_timestamp = get_field(INDEX, taskid, 'task_timestamp') - if es_timestamp is None: - log('No timestamp in ES for taskid ' + str(taskid), 'DIFF') - elif not es_timestamp: - log('Taskid %d not found in ES' % taskid, 'DIFF') - elif es_timestamp != timestamp: - log('Taskid %d has timestamp %s in ES, %s in Oracle' % (taskid, - es_timestamp, - timestamp), - 'DIFF') + # Crutch. Remove unwanted (for now) field added by Stage 016. + if 'phys_category' in data: + del data['phys_category'] + + es_data = get_fields(INDEX, _id, _type, data.keys()) + if data != es_data: + log('Document (%s, %d) differs between Oracle and ES: Oracle:%s ES:%s' + % (_type, _id, data, es_data), 'DIFF') else: - log('Taskid %d is up to date in ES' % taskid, 'INFO') + log('Document (%s, %d) is up to date in ES' % (_type, _id), 'INFO') return True diff --git a/Utils/Dataflow/run/data4es-consistency-check b/Utils/Dataflow/run/data4es-consistency-check index da8b8f524..74b7a5533 100755 --- a/Utils/Dataflow/run/data4es-consistency-check +++ b/Utils/Dataflow/run/data4es-consistency-check @@ -22,16 +22,26 @@ get_config() { done } +# EOP filter (required due to the unconfigurable EOP marker in pyDKB) +eop_filter() { + sed -e"s/\\x00//g" +} + # Oracle cfg009=`get_config "consistency009.cfg"` cmd_009="$base_dir/../009_oracleConnector/Oracle2JSON.py --config $cfg009" +[ -n "$DEBUG" ] && + err009="009.err" || + err009=/dev/null + +# Formatting +cmd_016="$base_dir/../016_task2es/task2es.py -m s" +[ -n "$DEBUG" ] && + err016="016.err" || + err016=/dev/null # ES cfg_es=`get_config "es"` cmd_069="$base_dir/../069_upload2es/consistency.py -m s --conf $cfg_es" -[ -n "$DEBUG" ] && - err009="009.err" || - err009=/dev/null - -$cmd_009 2>"$err009" | $cmd_069 +$cmd_009 2>"$err009" | $cmd_016 2>"$err016" | eop_filter | $cmd_069 >/dev/null