diff --git a/Utils/Dataflow/069_upload2es/consistency.py b/Utils/Dataflow/069_upload2es/consistency.py index 2215195c1..5f1a7e6d5 100755 --- a/Utils/Dataflow/069_upload2es/consistency.py +++ b/Utils/Dataflow/069_upload2es/consistency.py @@ -92,27 +92,29 @@ def es_connect(cfg): es = elasticsearch.Elasticsearch([s]) -def get_field(index, taskid, field): - ''' Get field value by given taskid. +def get_fields(index, _id, _type, fields): + ''' Get fields value by given _id and _type. :param es: elasticsearch client :type es: elasticsearch.client.Elasticsearch - :param index: index containing tasks + :param index: index to search in :type index: str - :param taskid: taskid of the task to look for - :type taskid: int or str - :param index: field name - :type index: str - - :return: field value, or False if the task was not found - :rtype: int or bool + :param _id: id of the document to look for + :type _id: int or str + :param _type: type of the document to look for + :type _type: str + :param fields: field names + :type fields: list + + :return: field values, or False if the document was not found + :rtype: dict or bool ''' try: - results = es.get(index=index, doc_type='_all', id=taskid, - _source=[field]) + results = es.get(index=index, doc_type=_type, id=_id, + _source=fields) except elasticsearch.exceptions.NotFoundError: return False - return results['_source'].get(field) + return results['_source'] def process(stage, message): @@ -123,34 +125,36 @@ def process(stage, message): :param stage: stage instance :type stage: pyDKB.dataflow.stage.ProcessorStage - :param msg: input message with task info + :param msg: input message with document info :type msg: pyDKB.dataflow.Message ''' data = message.content() if type(data) is not dict: log('Incorrect data:' + str(data), 'INPUT') return False - taskid = data.get('taskid') - if taskid is None: - log('No taskid in data:' + str(data), 'INPUT') + _id = data.pop('_id') + _type = data.pop('_type') + if _id is None or _type is None: + log('Insufficient ES info in data:' + str(data), 'INPUT') return False - timestamp = data.get('task_timestamp') - if timestamp is None: - log('No timestamp supplied for taskid ' + str(taskid), 'INPUT') + + # Crutch. Remove unwanted (for now) field added by Stage 016. + if 'phys_category' in data: + del data['phys_category'] + + # Do not check empty documents with valid _id and _type. + # It's unlikely that such documents will be produced in DKB. In general, + # such documents should be checked by es.exists(), and not es.get(). + if not data: + log('Nothing to check for document (%s, %d)' % (_type, _id), 'INPUT') return False - es_timestamp = get_field(INDEX, taskid, 'task_timestamp') - if es_timestamp is None: - log('No timestamp in ES for taskid ' + str(taskid), 'DIFF') - elif not es_timestamp: - log('Taskid %d not found in ES' % taskid, 'DIFF') - elif es_timestamp != timestamp: - log('Taskid %d has timestamp %s in ES, %s in Oracle' % (taskid, - es_timestamp, - timestamp), - 'DIFF') + es_data = get_fields(INDEX, _id, _type, data.keys()) + if data != es_data: + log('Document (%s, %d) differs between Oracle and ES: Oracle:%s ES:%s' + % (_type, _id, data, es_data), 'DIFF') else: - log('Taskid %d is up to date in ES' % taskid, 'INFO') + log('Document (%s, %d) is up to date in ES' % (_type, _id), 'INFO') return True diff --git a/Utils/Dataflow/run/data4es-consistency-check b/Utils/Dataflow/run/data4es-consistency-check index da8b8f524..74b7a5533 100755 --- a/Utils/Dataflow/run/data4es-consistency-check +++ b/Utils/Dataflow/run/data4es-consistency-check @@ -22,16 +22,26 @@ get_config() { done } +# EOP filter (required due to the unconfigurable EOP marker in pyDKB) +eop_filter() { + sed -e"s/\\x00//g" +} + # Oracle cfg009=`get_config "consistency009.cfg"` cmd_009="$base_dir/../009_oracleConnector/Oracle2JSON.py --config $cfg009" +[ -n "$DEBUG" ] && + err009="009.err" || + err009=/dev/null + +# Formatting +cmd_016="$base_dir/../016_task2es/task2es.py -m s" +[ -n "$DEBUG" ] && + err016="016.err" || + err016=/dev/null # ES cfg_es=`get_config "es"` cmd_069="$base_dir/../069_upload2es/consistency.py -m s --conf $cfg_es" -[ -n "$DEBUG" ] && - err009="009.err" || - err009=/dev/null - -$cmd_009 2>"$err009" | $cmd_069 +$cmd_009 2>"$err009" | $cmd_016 2>"$err016" | eop_filter | $cmd_069 >/dev/null