Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Oracle-ES consistency #240

Merged
merged 28 commits into from
May 28, 2019
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
9903938
Add consistency query to stage 009.
Evildoor Mar 27, 2019
42d55b3
Add script for consistency check to stage 069.
Evildoor Mar 29, 2019
f27f278
Get index name from config rather than code.
Evildoor Apr 2, 2019
af9f212
Check for index' existence before working.
Evildoor Apr 2, 2019
4c560a6
Update documentation.
Evildoor Apr 2, 2019
2296f6d
Add a very basic consistency check script.
Evildoor Apr 2, 2019
b8a2ab1
Generalize 069-consistency.
Evildoor Apr 4, 2019
acfe45a
Save and display the info about different tasks.
Evildoor Apr 5, 2019
e39efe2
Merge remote-tracking branch 'origin/master' into oracle-es-consistency
Evildoor Apr 5, 2019
8a71791
Move certain shell functions to library.
Evildoor Apr 5, 2019
90380a9
Remove DEBUG mode.
Evildoor Apr 17, 2019
7bac202
Move ES consistency script into a separate stage.
Evildoor Apr 17, 2019
12dd86e
Update a query description.
Evildoor Apr 18, 2019
944b5a2
Update and explain a magic number.
Evildoor Apr 18, 2019
26a1dfe
Reword es_connect() description.
Evildoor Apr 18, 2019
20875b1
Change log prefixes to standard ones.
Evildoor Apr 18, 2019
46cf0af
Fix pop() results handling.
Evildoor Apr 18, 2019
72d85a9
Update ES parameters handling.
Evildoor Apr 18, 2019
cacba11
Remove batching of inconsistent records.
Evildoor Apr 18, 2019
4d8eb83
Merge remote-tracking branch 'origin/master' into oracle-es-consistency
Evildoor Apr 19, 2019
181fb14
Add consistency data samples.
Evildoor Apr 19, 2019
7117242
Update the dataflow README.
Evildoor Apr 19, 2019
165c5d2
Ignore two additional fields.
Evildoor May 21, 2019
8f84d22
Change messages formatting.
Evildoor May 22, 2019
d195650
Add _parent field handling.
Evildoor May 22, 2019
612bf52
Remove service fields before checking.
Evildoor May 22, 2019
01ae258
Remove interpreter directives from lib files.
Evildoor May 22, 2019
8f86ddd
Simplify a field retrieval.
Evildoor May 28, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Utils/Dataflow/009_oracleConnector/README
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ them as NDJSON.
Currently works with specific set of queries only:
* prodsys2ES + datasets
* prodsys2ES
* consistency : simplified query that only obtains taskid and task_timestamp
for each task

The goal is to make it work with any number and combination of queries.

Expand Down
30 changes: 30 additions & 0 deletions Utils/Dataflow/009_oracleConnector/query/consistency.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
-- Select all tasks for specified period of time
-- Query tables:
------------------------------------------------
-- ATLAS_DEFT.t_production_task
-- ATLAS_DEFT.t_production_step
-- ATLAS_DEFT.t_step_template
-- ATLAS_DEFT.t_ht_to_task
-- ATLAS_DEFT.t_hashtag
-- ATLAS_PANDA.jedi_datasets
--
-- All fields:
-- architecture, campaign, cloud, conditions_tags, core_count, description, end_time,
-- energy_gev, evgen_job_opts, geometry_version, hashtag_list, job_config, physics_list, processed_events,
-- phys_group, project, pr_id, requested_events, run_number, site, start_time, step_name, status, subcampaign,
-- taskid, taskname, task_timestamp, ticket_id, trans_home, trans_path, trans_uses, trigger_config, user_name, vo,
-- n_files_per_job, n_events_per_job, n_files_to_be_used,
mgolosova marked this conversation as resolved.
Show resolved Hide resolved

-- RESTRICTIONS:
-- 1. taskID must be more than 4 000 000 OR from the date > 12-03-2014
-- 2. we collecting only PRODUCTION tasks OR only ANALYSIS tasks
-- ('pr_id > 300' or 'pr_id = 300')
SELECT DISTINCT
t.taskid,
TO_CHAR(t.timestamp, 'dd-mm-yyyy hh24:mi:ss') AS task_timestamp
FROM
ATLAS_DEFT.t_production_task t
WHERE
t.timestamp > :start_date AND
t.timestamp <= :end_date AND
t.pr_id %(production_or_analysis_cond)s 300
20 changes: 19 additions & 1 deletion Utils/Dataflow/069_upload2es/README
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

Description
-----------
Uploads prepared data to ElasticSearch.
load_data.sh uploads prepared data to ElasticSearch.
mgolosova marked this conversation as resolved.
Show resolved Hide resolved

Input
-----
Expand All @@ -18,6 +18,24 @@ JSON documents, one per line:
...
}}}

Consistency
-----------
consistency.py checks that the data is present in ElasticSearch instead of
uploading it. Input comes from Stage 009(in consistency mode) and only needs 2
fields for now:
{{{
{taskid, task_timestamp}
mgolosova marked this conversation as resolved.
Show resolved Hide resolved
...
}}}

Consistency check can be run as following:

./consistency.py --conf elasticsearch_config

For more information about running the check and its arguments, use:

./consistency.py -h

TODO
----
Make the stage aware of EOProcess/EOMessage markers
199 changes: 199 additions & 0 deletions Utils/Dataflow/069_upload2es/consistency.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
#!/bin/env python
'''
Script for checking the supplied task's presence in elasticsearch.

Currently it performs the check by comparing the supplied timestamp
with the one in elasticsearch.

Authors:
Vasilii Aulov ([email protected])
'''
import os
import sys
import traceback

from datetime import datetime

import elasticsearch


def log(msg, prefix='DEBUG'):
''' Add prefix and current time to message and write it to stderr. '''
prefix = '(%s)' % (prefix)
prefix = prefix.ljust(8)
mgolosova marked this conversation as resolved.
Show resolved Hide resolved
sys.stderr.write('%s%s %s\n' % (prefix, datetime.now().isoformat(), msg))


try:
base_dir = os.path.dirname(__file__)
dkb_dir = os.path.join(base_dir, os.pardir)
sys.path.append(dkb_dir)
import pyDKB
from pyDKB.dataflow.stage import JSONProcessorStage
from pyDKB.dataflow.messages import JSONMessage
from pyDKB.dataflow.exceptions import DataflowException
except Exception, err:
log('Failed to import pyDKB library: %s' % err, 'ERROR')
sys.exit(1)


es = None


INDEX = None


def load_config(fname):
''' Open elasticsearch config and obtain parameters from it.

Setup INDEX as global variable.

:param fname: config file's name
:type fname: str
'''
cfg = {
'ES_HOST': 'localhost',
'ES_PORT': '9200',
'ES_USER': '',
mgolosova marked this conversation as resolved.
Show resolved Hide resolved
'ES_PASSWORD': '',
'ES_INDEX': ''
}
with open(fname) as f:
lines = f.readlines()
for l in lines:
if l.startswith('ES'):
key = False
value = False
try:
(key, value) = l.split()[0].split('=')
except ValueError:
pass
if key in cfg:
cfg[key] = value
global INDEX
INDEX = cfg['ES_INDEX']
return cfg


def es_connect(cfg):
''' Establish a connection to elasticsearch, as a global variable.
mgolosova marked this conversation as resolved.
Show resolved Hide resolved

:param cfg: connection parameters
:type cfg: dict
'''
global es
if cfg['ES_USER'] and cfg['ES_PASSWORD']:
s = 'http://%s:%s@%s:%s/' % (cfg['ES_USER'],
cfg['ES_PASSWORD'],
cfg['ES_HOST'],
cfg['ES_PORT'])
else:
s = '%s:%s' % (cfg['ES_HOST'], cfg['ES_PORT'])
es = elasticsearch.Elasticsearch([s])


def get_field(index, taskid, field):
''' Get field value by given taskid.

:param es: elasticsearch client
:type es: elasticsearch.client.Elasticsearch
:param index: index containing tasks
:type index: str
:param taskid: taskid of the task to look for
:type taskid: int or str
:param index: field name
:type index: str

:return: field value, or False if the task was not found
:rtype: int or bool
'''
try:
results = es.get(index=index, doc_type='_all', id=taskid,
_source=[field])
except elasticsearch.exceptions.NotFoundError:
return False
return results['_source'].get(field)


def process(stage, message):
''' Process a message.

Implementation of :py:meth:`.AbstractProcessorStage.process` for hooking
the stage into DKB workflow.

:param stage: stage instance
:type stage: pyDKB.dataflow.stage.ProcessorStage
:param msg: input message with task info
:type msg: pyDKB.dataflow.Message
'''
data = message.content()
if type(data) is not dict:
log('Incorrect data:' + str(data), 'INPUT')
mgolosova marked this conversation as resolved.
Show resolved Hide resolved
return False
taskid = data.get('taskid')
if taskid is None:
log('No taskid in data:' + str(data), 'INPUT')
return False
timestamp = data.get('task_timestamp')
if timestamp is None:
log('No timestamp supplied for taskid ' + str(taskid), 'INPUT')
return False

es_timestamp = get_field(INDEX, taskid, 'task_timestamp')
if es_timestamp is None:
log('No timestamp in ES for taskid ' + str(taskid), 'DIFF')
elif not es_timestamp:
log('Taskid %d not found in ES' % taskid, 'DIFF')
elif es_timestamp != timestamp:
log('Taskid %d has timestamp %s in ES, %s in Oracle' % (taskid,
es_timestamp,
timestamp),
'DIFF')
else:
log('Taskid %d is up to date in ES' % taskid, 'INFO')

return True


def main(args):
''' Parse command line arguments and run the stage.

:param argv: arguments
:type argv: list
'''

stage = JSONProcessorStage()
stage.add_argument('--conf', help='elasticsearch config', required=True)

exit_code = 0
exc_info = None
try:
stage.parse_args(args)
cfg = load_config(stage.ARGS.conf)
stage.process = process
es_connect(cfg)
if not es.indices.exists(INDEX):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for information: this check would cause AuthorizationException in case of remote connection via Nginx proxy:

>>> es = elasticsearch.Elasticsearch('http://login:[email protected]:9200') 
>>> es.indices.exists('test_prodsys_rucio_ami')
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/lib/python2.7/site-packages/elasticsearch/client/utils.py", line 76, in _wrapped
    return func(*args, params=params, **kwargs)
  File "/usr/lib/python2.7/site-packages/elasticsearch/client/indices.py", line 213, in exists
    params=params)
  File "/usr/lib/python2.7/site-packages/elasticsearch/transport.py", line 318, in perform_request
    status, headers_response, data = connection.perform_request(method, url, params, body, headers=headers, ignore=ignore, timeout=timeout)
  File "/usr/lib/python2.7/site-packages/elasticsearch/connection/http_urllib3.py", line 185, in perform_request
    self._raise_error(response.status, raw_data)
  File "/usr/lib/python2.7/site-packages/elasticsearch/connection/base.py", line 125, in _raise_error
    raise HTTP_EXCEPTIONS.get(status_code, TransportError)(status_code, error_message, additional_info)
elasticsearch.exceptions.AuthorizationException: TransportError(403, u'')

I updated proxy configuration to allow HEAD requests (to readable locations), as there`s no actual need to block them; so now there should be no problem with it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And (again, just for information), there was another way to hit the goal:

Check for index' existence before working.
es.get() raises NotFoundError in both cases - when index does not exist and
when document does not exist. Also, it's more reasonable to check index once
since it's the same for all messages.

Even with NotFoundError it is possible to say one situation from another:

>>> try: es.get(index='test_prodsys_rucio_ami', doc_type='task', id=1468216)
... except Exception, err: pass
... 
>>> err.info
{u'found': False, u'_type': u'task', u'_id': u'1468216', u'_index': u'test_prodsys_rucio_ami'}
>>>
>>> try: es.get(index='_no_such_index_', doc_type='task', id=14682166)                                                                                     
... except Exception, err: pass
>>> err.info
{u'status': 404, u'error': {u'index_uuid': u'_na_', u'index': u'tprodsys_rucio_ami', u'resource.type': u'index_expression', u'root_cause': [{u'index_uuid': u'_na_', u'index': u'tprodsys_rucio_ami', u'resource.type': u'index_expression', u'resource.id': u'tprodsys_rucio_ami', u'reason': u'no such index', u'type': u'index_not_found_exception'}], u'reason': u'no such index', u'type': u'index_not_found_exception', u'resource.id': u'tprodsys_rucio_ami'}}
>>> err.info['error']['reason']
u'no such index'

And in case of the error -- or, maybe, even in case of any error, when info['error'] is defined (but I am not sure if there can possibly be any other error) -- re-raise the exception to indicate that the process can not be continued. Maybe wrapping the exception into DataflowException.

The only situation when it makes any difference is when during the process execution the index was removed or access policy changed: the check was successfully passed on the start, so any NotFoundError after this will be taken as "record missed", no matter what. But I don`t think it is likely to happen, so there`s nothing wrong in the one-time check.

log('No such index: %s' % INDEX, 'ERROR')
exit_code = 4
else:
stage.run()
except (DataflowException, RuntimeError), err:
if str(err):
log(err, 'ERROR')
exit_code = 2
except Exception:
exc_info = sys.exc_info()
exit_code = 3
finally:
stage.stop()

if exc_info:
trace = traceback.format_exception(*exc_info)
for line in trace:
log(line, 'ERROR')

exit(exit_code)


if __name__ == '__main__':
main(sys.argv[1:])