Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Oracle-ES consistency #240

Merged
merged 28 commits into from
May 28, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
9903938
Add consistency query to stage 009.
Evildoor Mar 27, 2019
42d55b3
Add script for consistency check to stage 069.
Evildoor Mar 29, 2019
f27f278
Get index name from config rather than code.
Evildoor Apr 2, 2019
af9f212
Check for index' existence before working.
Evildoor Apr 2, 2019
4c560a6
Update documentation.
Evildoor Apr 2, 2019
2296f6d
Add a very basic consistency check script.
Evildoor Apr 2, 2019
b8a2ab1
Generalize 069-consistency.
Evildoor Apr 4, 2019
acfe45a
Save and display the info about different tasks.
Evildoor Apr 5, 2019
e39efe2
Merge remote-tracking branch 'origin/master' into oracle-es-consistency
Evildoor Apr 5, 2019
8a71791
Move certain shell functions to library.
Evildoor Apr 5, 2019
90380a9
Remove DEBUG mode.
Evildoor Apr 17, 2019
7bac202
Move ES consistency script into a separate stage.
Evildoor Apr 17, 2019
12dd86e
Update a query description.
Evildoor Apr 18, 2019
944b5a2
Update and explain a magic number.
Evildoor Apr 18, 2019
26a1dfe
Reword es_connect() description.
Evildoor Apr 18, 2019
20875b1
Change log prefixes to standard ones.
Evildoor Apr 18, 2019
46cf0af
Fix pop() results handling.
Evildoor Apr 18, 2019
72d85a9
Update ES parameters handling.
Evildoor Apr 18, 2019
cacba11
Remove batching of inconsistent records.
Evildoor Apr 18, 2019
4d8eb83
Merge remote-tracking branch 'origin/master' into oracle-es-consistency
Evildoor Apr 19, 2019
181fb14
Add consistency data samples.
Evildoor Apr 19, 2019
7117242
Update the dataflow README.
Evildoor Apr 19, 2019
165c5d2
Ignore two additional fields.
Evildoor May 21, 2019
8f84d22
Change messages formatting.
Evildoor May 22, 2019
d195650
Add _parent field handling.
Evildoor May 22, 2019
612bf52
Remove service fields before checking.
Evildoor May 22, 2019
01ae258
Remove interpreter directives from lib files.
Evildoor May 22, 2019
8f86ddd
Simplify a field retrieval.
Evildoor May 28, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion Utils/Dataflow/069_upload2es/README
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

Description
-----------
Uploads prepared data to ElasticSearch.
load_data.sh uploads prepared data to ElasticSearch.
mgolosova marked this conversation as resolved.
Show resolved Hide resolved

Input
-----
Expand All @@ -18,6 +18,24 @@ JSON documents, one per line:
...
}}}

Consistency
-----------
consistency.py checks that the data is present in ElasticSearch instead of
uploading it. Input comes from Stage 009(in consistency mode) and only needs 2
fields for now:
{{{
{taskid, task_timestamp}
mgolosova marked this conversation as resolved.
Show resolved Hide resolved
...
}}}

Consistency check can be run as following:

./consistency.py --conf elasticsearch_config

For more information about running the check and its arguments, use:

./consistency.py -h

TODO
----
Make the stage aware of EOProcess/EOMessage markers
31 changes: 25 additions & 6 deletions Utils/Dataflow/069_upload2es/consistency.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
#!/bin/env python
'''
Script for consistency checking.
Script for checking the supplied task's presence in elasticsearch.

Currently it performs the check by comparing the supplied timestamp
with the one in elasticsearch.

Authors:
Vasilii Aulov ([email protected])
Expand All @@ -15,7 +18,7 @@


def log(msg, prefix='DEBUG'):
''' ??? '''
''' Add prefix and current time to message and write it to stderr. '''
prefix = '(%s)' % (prefix)
prefix = prefix.ljust(8)
mgolosova marked this conversation as resolved.
Show resolved Hide resolved
sys.stderr.write('%s%s %s\n' % (prefix, datetime.now().isoformat(), msg))
Expand All @@ -41,7 +44,13 @@ def log(msg, prefix='DEBUG'):


def load_config(fname):
''' ??? '''
''' Open elasticsearch config and obtain parameters from it.

Setup INDEX as global variable.

:param fname: config file's name
:type fname: str
'''
cfg = {
'ES_HOST': 'localhost',
'ES_PORT': '9200',
Expand All @@ -67,9 +76,10 @@ def load_config(fname):


def es_connect(cfg):
''' Establish a connection to elasticsearch.
''' Establish a connection to elasticsearch, as a global variable.
mgolosova marked this conversation as resolved.
Show resolved Hide resolved

TODO: take parameters from es config.
mgolosova marked this conversation as resolved.
Show resolved Hide resolved
:param cfg: connection parameters
:type cfg: dict
'''
global es
if cfg['ES_USER'] and cfg['ES_PASSWORD']:
Expand Down Expand Up @@ -110,6 +120,11 @@ def process(stage, message):

Implementation of :py:meth:`.AbstractProcessorStage.process` for hooking
the stage into DKB workflow.

:param stage: stage instance
:type stage: pyDKB.dataflow.stage.ProcessorStage
:param msg: input message with task info
:type msg: pyDKB.dataflow.Message
'''
data = message.content()
if type(data) is not dict:
Expand Down Expand Up @@ -141,7 +156,11 @@ def process(stage, message):


def main(args):
''' ??? '''
''' Parse command line arguments and run the stage.

:param argv: arguments
:type argv: list
'''

stage = JSONProcessorStage()
stage.add_argument('--conf', help='elasticsearch config', required=True)
Expand Down