Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Oracle-ES consistency #240

Merged
merged 28 commits into from
May 28, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
9903938
Add consistency query to stage 009.
Evildoor Mar 27, 2019
42d55b3
Add script for consistency check to stage 069.
Evildoor Mar 29, 2019
f27f278
Get index name from config rather than code.
Evildoor Apr 2, 2019
af9f212
Check for index' existence before working.
Evildoor Apr 2, 2019
4c560a6
Update documentation.
Evildoor Apr 2, 2019
2296f6d
Add a very basic consistency check script.
Evildoor Apr 2, 2019
b8a2ab1
Generalize 069-consistency.
Evildoor Apr 4, 2019
acfe45a
Save and display the info about different tasks.
Evildoor Apr 5, 2019
e39efe2
Merge remote-tracking branch 'origin/master' into oracle-es-consistency
Evildoor Apr 5, 2019
8a71791
Move certain shell functions to library.
Evildoor Apr 5, 2019
90380a9
Remove DEBUG mode.
Evildoor Apr 17, 2019
7bac202
Move ES consistency script into a separate stage.
Evildoor Apr 17, 2019
12dd86e
Update a query description.
Evildoor Apr 18, 2019
944b5a2
Update and explain a magic number.
Evildoor Apr 18, 2019
26a1dfe
Reword es_connect() description.
Evildoor Apr 18, 2019
20875b1
Change log prefixes to standard ones.
Evildoor Apr 18, 2019
46cf0af
Fix pop() results handling.
Evildoor Apr 18, 2019
72d85a9
Update ES parameters handling.
Evildoor Apr 18, 2019
cacba11
Remove batching of inconsistent records.
Evildoor Apr 18, 2019
4d8eb83
Merge remote-tracking branch 'origin/master' into oracle-es-consistency
Evildoor Apr 19, 2019
181fb14
Add consistency data samples.
Evildoor Apr 19, 2019
7117242
Update the dataflow README.
Evildoor Apr 19, 2019
165c5d2
Ignore two additional fields.
Evildoor May 21, 2019
8f84d22
Change messages formatting.
Evildoor May 22, 2019
d195650
Add _parent field handling.
Evildoor May 22, 2019
612bf52
Remove service fields before checking.
Evildoor May 22, 2019
01ae258
Remove interpreter directives from lib files.
Evildoor May 22, 2019
8f86ddd
Simplify a field retrieval.
Evildoor May 28, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 35 additions & 31 deletions Utils/Dataflow/069_upload2es/consistency.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,27 +92,29 @@ def es_connect(cfg):
es = elasticsearch.Elasticsearch([s])


def get_field(index, taskid, field):
''' Get field value by given taskid.
def get_fields(index, _id, _type, fields):
''' Get fields value by given _id and _type.

:param es: elasticsearch client
:type es: elasticsearch.client.Elasticsearch
:param index: index containing tasks
:param index: index to search in
:type index: str
:param taskid: taskid of the task to look for
:type taskid: int or str
:param index: field name
:type index: str

:return: field value, or False if the task was not found
:rtype: int or bool
:param _id: id of the document to look for
:type _id: int or str
:param _type: type of the document to look for
:type _type: str
:param fields: field names
:type fields: list

:return: field values, or False if the document was not found
:rtype: dict or bool
'''
try:
results = es.get(index=index, doc_type='_all', id=taskid,
_source=[field])
results = es.get(index=index, doc_type=_type, id=_id,
_source=fields)
except elasticsearch.exceptions.NotFoundError:
return False
return results['_source'].get(field)
return results['_source']


def process(stage, message):
Expand All @@ -123,34 +125,36 @@ def process(stage, message):

:param stage: stage instance
:type stage: pyDKB.dataflow.stage.ProcessorStage
:param msg: input message with task info
:param msg: input message with document info
:type msg: pyDKB.dataflow.Message
'''
data = message.content()
if type(data) is not dict:
log('Incorrect data:' + str(data), 'INPUT')
mgolosova marked this conversation as resolved.
Show resolved Hide resolved
return False
taskid = data.get('taskid')
if taskid is None:
log('No taskid in data:' + str(data), 'INPUT')
_id = data.pop('_id')
_type = data.pop('_type')
Evildoor marked this conversation as resolved.
Show resolved Hide resolved
if _id is None or _type is None:
log('Insufficient ES info in data:' + str(data), 'INPUT')
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'INPUT' -> 'WARN'?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

return False
timestamp = data.get('task_timestamp')
if timestamp is None:
log('No timestamp supplied for taskid ' + str(taskid), 'INPUT')

# Crutch. Remove unwanted (for now) field added by Stage 016.
if 'phys_category' in data:
del data['phys_category']

# Do not check empty documents with valid _id and _type.
# It's unlikely that such documents will be produced in DKB. In general,
# such documents should be checked by es.exists(), and not es.get().
if not data:
log('Nothing to check for document (%s, %d)' % (_type, _id), 'INPUT')
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'INPUT' -> 'INFO'/'WARN'?

Copy link
Contributor Author

@Evildoor Evildoor Apr 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

return False

es_timestamp = get_field(INDEX, taskid, 'task_timestamp')
if es_timestamp is None:
log('No timestamp in ES for taskid ' + str(taskid), 'DIFF')
elif not es_timestamp:
log('Taskid %d not found in ES' % taskid, 'DIFF')
elif es_timestamp != timestamp:
log('Taskid %d has timestamp %s in ES, %s in Oracle' % (taskid,
es_timestamp,
timestamp),
'DIFF')
es_data = get_fields(INDEX, _id, _type, data.keys())
mgolosova marked this conversation as resolved.
Show resolved Hide resolved
if data != es_data:
log('Document (%s, %d) differs between Oracle and ES: Oracle:%s ES:%s'
% (_type, _id, data, es_data), 'DIFF')
mgolosova marked this conversation as resolved.
Show resolved Hide resolved
else:
log('Taskid %d is up to date in ES' % taskid, 'INFO')
log('Document (%s, %d) is up to date in ES' % (_type, _id), 'INFO')

return True

Expand Down
20 changes: 15 additions & 5 deletions Utils/Dataflow/run/data4es-consistency-check
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,26 @@ get_config() {
done
}

# EOP filter (required due to the unconfigurable EOP marker in pyDKB)
eop_filter() {
sed -e"s/\\x00//g"
}

# Oracle
cfg009=`get_config "consistency009.cfg"`
cmd_009="$base_dir/../009_oracleConnector/Oracle2JSON.py --config $cfg009"
[ -n "$DEBUG" ] &&
err009="009.err" ||
err009=/dev/null
mgolosova marked this conversation as resolved.
Show resolved Hide resolved

# Formatting
cmd_016="$base_dir/../016_task2es/task2es.py -m s"
[ -n "$DEBUG" ] &&
err016="016.err" ||
err016=/dev/null
mgolosova marked this conversation as resolved.
Show resolved Hide resolved

# ES
cfg_es=`get_config "es"`
cmd_069="$base_dir/../069_upload2es/consistency.py -m s --conf $cfg_es"

[ -n "$DEBUG" ] &&
err009="009.err" ||
err009=/dev/null

$cmd_009 2>"$err009" | $cmd_069
$cmd_009 2>"$err009" | $cmd_016 2>"$err016" | eop_filter | $cmd_069 >/dev/null