From b73189b150e37445687195d52307e8a818d0b100 Mon Sep 17 00:00:00 2001 From: Marina Golosova Date: Thu, 11 Apr 2019 12:33:52 +0200 Subject: [PATCH 01/12] pyDKB: introduce pyDKB.storages submodule. Sometimes we have to interact with the same storage in different scripts. When it happens we got to implement same functionality multiple times: read configuration, check that Python module is available, create client, ... This submodule is a place to implement this once and then reuse whwnever it is needed. --- Utils/Dataflow/pyDKB/storages/Storage.py | 50 +++++++++++++++++++++ Utils/Dataflow/pyDKB/storages/__init__.py | 7 +++ Utils/Dataflow/pyDKB/storages/exceptions.py | 36 +++++++++++++++ 3 files changed, 93 insertions(+) create mode 100644 Utils/Dataflow/pyDKB/storages/Storage.py create mode 100644 Utils/Dataflow/pyDKB/storages/__init__.py create mode 100644 Utils/Dataflow/pyDKB/storages/exceptions.py diff --git a/Utils/Dataflow/pyDKB/storages/Storage.py b/Utils/Dataflow/pyDKB/storages/Storage.py new file mode 100644 index 000000000..4c57db93f --- /dev/null +++ b/Utils/Dataflow/pyDKB/storages/Storage.py @@ -0,0 +1,50 @@ +""" +pyDKB.storages.Storage +""" + +from . import storageType + + +class Storage(object): + """ Interface class for external and internal DKB storages. """ + + # Storage name (identifier) + name = None + + # Storage type (storageType member) + type = None + + # Storage client + c = None + + def __init__(self, name): + """ Initialize Storage object. + + Raise ``StorageAlreadyExists`` if storage with given name + was already created. + + :param name: storage identifier + :type name: str + """ + raise NotImplementedError + + def configure(self, cfg): + """ Apply storage configuration (initialize client). + + :param cfg: configuration parameters + :type cfg: dict + """ + raise NotImplementedError + + def get(self, id, **kwargs): + """ Get object / record from storage by ID. + + Raise ``NotFound`` exception if object / record not found. + + :param id: object / record identfier + :type id: str, int + + :return: record with given ID + :rtype: dict + """ + raise NotImplementedError diff --git a/Utils/Dataflow/pyDKB/storages/__init__.py b/Utils/Dataflow/pyDKB/storages/__init__.py new file mode 100644 index 000000000..05ab39fe1 --- /dev/null +++ b/Utils/Dataflow/pyDKB/storages/__init__.py @@ -0,0 +1,7 @@ +""" +pyDKB.storages +""" + +from ..common import Type + +storageType = Type() diff --git a/Utils/Dataflow/pyDKB/storages/exceptions.py b/Utils/Dataflow/pyDKB/storages/exceptions.py new file mode 100644 index 000000000..5af81246e --- /dev/null +++ b/Utils/Dataflow/pyDKB/storages/exceptions.py @@ -0,0 +1,36 @@ +""" +pyDKB.storages.exceptions +""" + + +class StorageException(Exception): + """ Base exception for all storage-related exceptions. """ + pass + + +class StorageAlreadyExists(StorageException): + """ Exception indicating that storage name was already used. """ + + def __init__(self, name): + """ Initialize exception. + + :param name: storage name + :type name: str + """ + message = "Name already in use: '%s'" % name + super(StorageAlreadyExists, self).__init__(message) + + +class NotFound(StorageException): + """ Exeption indicating that record with given ID not found. """ + + def __init__(self, name, id): + """ Initialize exception. + + :param name: storage name + :type name: str + :param id: record ID + :type id: str, int + """ + message = "Record not found in '%s' (id: '%s')" % (name, id) + super(NotFound, self).__init__(message) From 7eaf9070855d4c6fd3c6f6cd3605348caca792f3 Mon Sep 17 00:00:00 2001 From: Marina Golosova Date: Thu, 11 Apr 2019 13:16:02 +0200 Subject: [PATCH 02/12] pyDKB/storages: move storages management from Storage object. It would be wierd to manage module global variable from class; it would also be wierd to get errors like ``StorageAlreadyExists`` if one just uses the Storage functionality and doesn't want any extra logic around it. --- Utils/Dataflow/pyDKB/storages/Storage.py | 7 +----- Utils/Dataflow/pyDKB/storages/__init__.py | 26 +++++++++++++++++++++++ 2 files changed, 27 insertions(+), 6 deletions(-) diff --git a/Utils/Dataflow/pyDKB/storages/Storage.py b/Utils/Dataflow/pyDKB/storages/Storage.py index 4c57db93f..2ad76fc42 100644 --- a/Utils/Dataflow/pyDKB/storages/Storage.py +++ b/Utils/Dataflow/pyDKB/storages/Storage.py @@ -2,8 +2,6 @@ pyDKB.storages.Storage """ -from . import storageType - class Storage(object): """ Interface class for external and internal DKB storages. """ @@ -20,13 +18,10 @@ class Storage(object): def __init__(self, name): """ Initialize Storage object. - Raise ``StorageAlreadyExists`` if storage with given name - was already created. - :param name: storage identifier :type name: str """ - raise NotImplementedError + self.name = name def configure(self, cfg): """ Apply storage configuration (initialize client). diff --git a/Utils/Dataflow/pyDKB/storages/__init__.py b/Utils/Dataflow/pyDKB/storages/__init__.py index 05ab39fe1..673c8e3b2 100644 --- a/Utils/Dataflow/pyDKB/storages/__init__.py +++ b/Utils/Dataflow/pyDKB/storages/__init__.py @@ -4,4 +4,30 @@ from ..common import Type +from Storage import Storage +from exceptions import StorageAlreadyExists + + storageType = Type() +storages = {} + + +def create(name, stype): + """ Create storage of given type. + + Raise ``StorageAlreadyExists`` if storage with given name was created + earlier. + + :param name: storage identifier + :type name: str + :param stype: storage type + :type stype: storageType member + + :return: Storage object + :rtype: Storage + """ + global storages + if name in storages: + raise StorageAlreadyExists(name) + storages[name] = Storage(name) + return storages[name] From b777e0add6f2fa42faf74e799fdb33140881eff1 Mon Sep 17 00:00:00 2001 From: Marina Golosova Date: Fri, 26 Apr 2019 09:45:09 +0200 Subject: [PATCH 03/12] pyDKB/storages: add methods to the Storage interface class (`log()`, `client()`). * `log()` -- quite useful method for all the derived classes; * `client()` -- given that one day we may want to make multiple simultaneous requests to the storage, it doesn'y look good to use instance variable `c` directly; method that returns ready-to-use client seems to be a better way. --- Utils/Dataflow/pyDKB/storages/Storage.py | 27 ++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/Utils/Dataflow/pyDKB/storages/Storage.py b/Utils/Dataflow/pyDKB/storages/Storage.py index 2ad76fc42..f6b1ffa73 100644 --- a/Utils/Dataflow/pyDKB/storages/Storage.py +++ b/Utils/Dataflow/pyDKB/storages/Storage.py @@ -2,6 +2,11 @@ pyDKB.storages.Storage """ +import sys +from datetime import datetime + +from exceptions import StorageNotConfigured + class Storage(object): """ Interface class for external and internal DKB storages. """ @@ -23,6 +28,16 @@ def __init__(self, name): """ self.name = name + def log(self, level, message): + """ Output log message. """ + if level not in ('TRACE', 'DEBUG', 'INFO', 'WARN', 'WARNING', 'ERROR', + 'CRITICAL'): + level = 'INFO' + dt = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + sys.stderr.write('(%s) %s (%s) %s\n' % (level, dt, + self.__class__.__name__, + message)) + def configure(self, cfg): """ Apply storage configuration (initialize client). @@ -31,6 +46,18 @@ def configure(self, cfg): """ raise NotImplementedError + def client(self): + """ Get storage client. + + Raise ``StorageNotConfigured`` if called before configuration. + + :return: client object, corresponding given storage type. + :rtype: object + """ + if not self.c: + raise StorageNotConfigured(self.name) + return self.c + def get(self, id, **kwargs): """ Get object / record from storage by ID. From 8303cc655c5a39ed82e50e5bc6c2888547a6c23a Mon Sep 17 00:00:00 2001 From: Marina Golosova Date: Fri, 26 Apr 2019 09:54:11 +0200 Subject: [PATCH 04/12] pyDKB/storages: add module-level function to get configured storage instance. --- Utils/Dataflow/pyDKB/storages/__init__.py | 19 ++++++++++++++++++- Utils/Dataflow/pyDKB/storages/exceptions.py | 13 +++++++++++++ 2 files changed, 31 insertions(+), 1 deletion(-) diff --git a/Utils/Dataflow/pyDKB/storages/__init__.py b/Utils/Dataflow/pyDKB/storages/__init__.py index 673c8e3b2..af0fe5cc3 100644 --- a/Utils/Dataflow/pyDKB/storages/__init__.py +++ b/Utils/Dataflow/pyDKB/storages/__init__.py @@ -5,7 +5,8 @@ from ..common import Type from Storage import Storage -from exceptions import StorageAlreadyExists +from exceptions import (StorageAlreadyExists, + StorageNotConfigured) storageType = Type() @@ -31,3 +32,19 @@ def create(name, stype): raise StorageAlreadyExists(name) storages[name] = Storage(name) return storages[name] + + +def get(name): + """ Get storage client by name. + + Raise ``StorageNotConfigured`` if the storage was not `create()`d earlier. + + :param name: storage name + :type name: str + + :return: object representing given storage + :rtype: Storage + """ + if name not in storages: + raise StorageNotConfigured(name) + return storages[name] diff --git a/Utils/Dataflow/pyDKB/storages/exceptions.py b/Utils/Dataflow/pyDKB/storages/exceptions.py index 5af81246e..ed73b15c2 100644 --- a/Utils/Dataflow/pyDKB/storages/exceptions.py +++ b/Utils/Dataflow/pyDKB/storages/exceptions.py @@ -21,6 +21,19 @@ def __init__(self, name): super(StorageAlreadyExists, self).__init__(message) +class StorageNotConfigured(StorageException): + """ Exception indicating that requested storage is not configured. """ + + def __init__(self, name): + """ Initialize exception. + + :param name: storage name + :type name: str + """ + message = "Storage '%s' used before configuration." % name + super(StorageNotConfigured, self).__init__(message) + + class NotFound(StorageException): """ Exeption indicating that record with given ID not found. """ From 65a52aa634b057c4d741b572905c76f52d6f4eae Mon Sep 17 00:00:00 2001 From: Marina Golosova Date: Fri, 26 Apr 2019 12:12:36 +0200 Subject: [PATCH 05/12] pyDKB/storages: improve NotFound exception message. There may be more than one valuable parameter for record detection (e.g. record ID and index in ES). --- Utils/Dataflow/pyDKB/storages/exceptions.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/Utils/Dataflow/pyDKB/storages/exceptions.py b/Utils/Dataflow/pyDKB/storages/exceptions.py index ed73b15c2..45bd6302a 100644 --- a/Utils/Dataflow/pyDKB/storages/exceptions.py +++ b/Utils/Dataflow/pyDKB/storages/exceptions.py @@ -37,13 +37,17 @@ def __init__(self, name): class NotFound(StorageException): """ Exeption indicating that record with given ID not found. """ - def __init__(self, name, id): + def __init__(self, name, **kwargs): """ Initialize exception. :param name: storage name :type name: str - :param id: record ID - :type id: str, int + :param kwargs: record parameters + :type kwargs: dict """ - message = "Record not found in '%s' (id: '%s')" % (name, id) + message = "Record not found in '%s'" % (name) + if kwargs: + params = [': '.join((key, '%r' % kwargs[key])) for key in kwargs] + params = ', '.join(params) + message = message + ' (%s)' % params super(NotFound, self).__init__(message) From b1b841c1606ed747d9487857eddc862105c9f143 Mon Sep 17 00:00:00 2001 From: Marina Golosova Date: Fri, 26 Apr 2019 12:23:58 +0200 Subject: [PATCH 06/12] pyDKB/storages: more accurate storage instance creation. --- Utils/Dataflow/pyDKB/storages/__init__.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/Utils/Dataflow/pyDKB/storages/__init__.py b/Utils/Dataflow/pyDKB/storages/__init__.py index af0fe5cc3..101f4674d 100644 --- a/Utils/Dataflow/pyDKB/storages/__init__.py +++ b/Utils/Dataflow/pyDKB/storages/__init__.py @@ -10,6 +10,7 @@ storageType = Type() +storageClass = {} storages = {} @@ -30,7 +31,14 @@ def create(name, stype): global storages if name in storages: raise StorageAlreadyExists(name) - storages[name] = Storage(name) + cls = storageClass.get(stype) + if not cls: + sname = storageType.memberName(stype) + if not sname: + raise ValueError("Unknown storage type: '%s'" % stype) + raise NotImplementedError("Storage class is not implemented for: '%s'" + % storageType.memberName(stype)) + storages[name] = cls(name) return storages[name] From d9a9567c13712fa84ac52c87a9911cb4dfb05582 Mon Sep 17 00:00:00 2001 From: Marina Golosova Date: Fri, 26 Apr 2019 12:26:40 +0200 Subject: [PATCH 07/12] pyDKB/storages: add class for ES storage. --- Utils/Dataflow/pyDKB/storages/Storage.py | 2 + Utils/Dataflow/pyDKB/storages/__init__.py | 7 ++- Utils/Dataflow/pyDKB/storages/es.py | 63 +++++++++++++++++++++++ Utils/Dataflow/pyDKB/storages/types.py | 6 +++ 4 files changed, 74 insertions(+), 4 deletions(-) create mode 100644 Utils/Dataflow/pyDKB/storages/es.py create mode 100644 Utils/Dataflow/pyDKB/storages/types.py diff --git a/Utils/Dataflow/pyDKB/storages/Storage.py b/Utils/Dataflow/pyDKB/storages/Storage.py index f6b1ffa73..67c59558d 100644 --- a/Utils/Dataflow/pyDKB/storages/Storage.py +++ b/Utils/Dataflow/pyDKB/storages/Storage.py @@ -23,6 +23,8 @@ class Storage(object): def __init__(self, name): """ Initialize Storage object. + Raise ``StorageException`` in case of error. + :param name: storage identifier :type name: str """ diff --git a/Utils/Dataflow/pyDKB/storages/__init__.py b/Utils/Dataflow/pyDKB/storages/__init__.py index 101f4674d..4cc064cb4 100644 --- a/Utils/Dataflow/pyDKB/storages/__init__.py +++ b/Utils/Dataflow/pyDKB/storages/__init__.py @@ -2,15 +2,14 @@ pyDKB.storages """ -from ..common import Type - +from types import storageType from Storage import Storage +from es import ES from exceptions import (StorageAlreadyExists, StorageNotConfigured) -storageType = Type() -storageClass = {} +storageClass = {storageType.ES: ES} storages = {} diff --git a/Utils/Dataflow/pyDKB/storages/es.py b/Utils/Dataflow/pyDKB/storages/es.py new file mode 100644 index 000000000..d030d01fe --- /dev/null +++ b/Utils/Dataflow/pyDKB/storages/es.py @@ -0,0 +1,63 @@ +""" +pyDKB.storages.es +""" + +from Storage import Storage +from . import storageType +from exceptions import (StorageException, + NotFound, + InvalidRequest) + + +try: + import elasticsearch + from elasticsearch.exceptions import (NotFoundError, + RequestError) +except ImportError: + pass + + +DEFAULT_CFG = { + 'host': '127.0.0.1', + 'port': '9200' +} + + +class ES(Storage): + """ Representation of Elasticsearch storage. """ + + # Default index + index = None + + type = storageType.ES + + def __init__(self, name): + """ Check if this class can be used and initialize object. """ + try: + elasticsearch + except NameError: + raise StorageException("Required Python module not found:" + " 'elasticsearch'") + super(ES, self).__init__(name) + + def configure(self, cfg): + """ Configure ES client. + + Configuration parameters: + host (str: '127.0.0.1') + port (str: '9200') + index (str) + user (str) + passwd (str) + + :param cfg: configuration parameters + :type cfg: dict + """ + hosts = [{'host': cfg.get('host', DEFAULT_CFG['host']), + 'port': cfg.get('port', DEFAULT_CFG['port'])}] + kwargs = {} + if cfg.get('user'): + kwargs['http_auth'] = '%(user)s:%(passwd)s' % cfg + if cfg.get('index'): + self.index = cfg['index'] + self.c = elasticsearch.Elasticsearch(hosts, **kwargs) diff --git a/Utils/Dataflow/pyDKB/storages/types.py b/Utils/Dataflow/pyDKB/storages/types.py new file mode 100644 index 000000000..1cdcc49a0 --- /dev/null +++ b/Utils/Dataflow/pyDKB/storages/types.py @@ -0,0 +1,6 @@ +""" +pyDKB.storages.types +""" + +from ..common import Type +storageType = Type("ES") From 2551c6efad923a482a42cc1fedbd89fd22b91d23 Mon Sep 17 00:00:00 2001 From: Marina Golosova Date: Fri, 26 Apr 2019 12:28:11 +0200 Subject: [PATCH 08/12] pyDKB/storages: add ES.get() method. --- Utils/Dataflow/pyDKB/storages/es.py | 43 +++++++++++++++++++++ Utils/Dataflow/pyDKB/storages/exceptions.py | 25 ++++++++++++ 2 files changed, 68 insertions(+) diff --git a/Utils/Dataflow/pyDKB/storages/es.py b/Utils/Dataflow/pyDKB/storages/es.py index d030d01fe..31ec01567 100644 --- a/Utils/Dataflow/pyDKB/storages/es.py +++ b/Utils/Dataflow/pyDKB/storages/es.py @@ -61,3 +61,46 @@ def configure(self, cfg): if cfg.get('index'): self.index = cfg['index'] self.c = elasticsearch.Elasticsearch(hosts, **kwargs) + + def get(self, id, fields=None, index=None, doc_type='_all', parent=None): + """ Get record from ES. + + Raise: + * ``NotFound`` exception if record is not found + * ``InvalidRequest`` if request can not be executed + * ``StorageNotConfigured`` if called before `configure()` + + Supported document types for ATLAS: 'task', 'output_dataset'. + + :param fields: specific set of fields to get (if not specified, all + fields will be returned) + :type fields: list, NoneType + :param doc_type: document type + :type doc_type: str + :param parent: parent document ID (required for child documents) + :type parent: str + + :return: ES record with specified or full set of fields + :rtype: dict + """ + c = self.client() + if not index: + index = self.index + kwargs = {'index': index, 'doc_type': doc_type, 'id': id} + if not kwargs['index']: + raise InvalidRequest("Index not specified.") + if fields is not None: + kwargs['_source'] = fields + if parent: + kwargs['parent'] = parent + try: + r = c.get(**kwargs) + except NotFoundError, err: + raise NotFound(self.name, id=id, index=index) + except RequestError, err: + if doc_type == 'output_dataset' \ + and err.args[1] == 'routing_missing_exception': + self.log('WARN', 'Parent info missed.') + raise NotFound(self.name, id=id, index=index) + raise InvalidRequest(err) + return r.get('_source', {}) diff --git a/Utils/Dataflow/pyDKB/storages/exceptions.py b/Utils/Dataflow/pyDKB/storages/exceptions.py index 45bd6302a..8b499ca21 100644 --- a/Utils/Dataflow/pyDKB/storages/exceptions.py +++ b/Utils/Dataflow/pyDKB/storages/exceptions.py @@ -51,3 +51,28 @@ def __init__(self, name, **kwargs): params = ', '.join(params) message = message + ' (%s)' % params super(NotFound, self).__init__(message) + + +class InvalidRequest(StorageException): + """ Exception indicating wrong user request. """ + + def __init__(self, message, *args, **kwargs): + """ Initialize exception. + + Message formatting: old-style ('%' operator) only. + + :param message: error message + :type message: str + :param args: message format positional parameters + :type args: list + :param kwargs: message format named parameters + :type kwargs: dict + """ + if args and kwargs: + raise ValueError("Message formatting supports only one type " + "of parameters: positional OR named.") + if args: + message = message % params + elif kwargs: + message = message % kwargs + super(InvalidRequest, self).__init__(message) From d54f90dfb25a94b001d29eb888e931ba1f42b5fd Mon Sep 17 00:00:00 2001 From: Marina Golosova Date: Fri, 26 Apr 2019 12:53:05 +0200 Subject: [PATCH 09/12] pyDKB/storages: add interface methods for custom query usage. --- Utils/Dataflow/pyDKB/storages/Storage.py | 61 ++++++++++++++++++++++++ 1 file changed, 61 insertions(+) diff --git a/Utils/Dataflow/pyDKB/storages/Storage.py b/Utils/Dataflow/pyDKB/storages/Storage.py index 67c59558d..b8fd3a20f 100644 --- a/Utils/Dataflow/pyDKB/storages/Storage.py +++ b/Utils/Dataflow/pyDKB/storages/Storage.py @@ -72,3 +72,64 @@ def get(self, id, **kwargs): :rtype: dict """ raise NotImplementedError + + def read_query(self, fname, qname=None): + """ Read query from file and save it. + + :param fname: file name + :type fname: str + :param qname: query name (for futher usage) + :type qname: str + """ + raise NotImplementedError + + def save_query(self, query, qname=None): + """ Save query for further usage. + + :param query: query content + :type query: object + :param qname: query name (must not start with '__') + :type qname: str + """ + raise NotImplementedError + + def get_query(self, qname): + """ Get query by name. + + :param qname: query name (if None, last stored/used query will be used) + :type qnmae: str + + :return: stored query + :rtype: object + """ + raise NotImplementedError + + def exec_query(self, qname=None, **kwargs): + """ Execute stored query with given parameters. + + :param qname: query name (if None, last used/read + one will be used) + :type qname: str, NoneType + :param kwargs: query parameters (applied with old-style + string formatting operator '%') + :type kwargs: dict + + :return: storage response + :rtype: object + """ + raise NotImplementedError + + def execute(self, query, **kwargs): + """ Execute query with given parameters. + + :param query: query content + :type query: object + :param kwargs: query parameters (applied with old-style + string formatting operator '%') + :type kwargs: dict + + :return: storage response + :rtype: object + """ + self.save_query(query) + return self.exec_query(**kwargs) From 039043546097b6bf3f676aa4fefa2d14254b0aaf Mon Sep 17 00:00:00 2001 From: Marina Golosova Date: Fri, 26 Apr 2019 15:38:15 +0200 Subject: [PATCH 10/12] pyDKB/storages: add `save/get_query()` methods implementation. --- Utils/Dataflow/pyDKB/storages/Storage.py | 27 ++++++++++++++++++--- Utils/Dataflow/pyDKB/storages/exceptions.py | 5 ++++ 2 files changed, 29 insertions(+), 3 deletions(-) diff --git a/Utils/Dataflow/pyDKB/storages/Storage.py b/Utils/Dataflow/pyDKB/storages/Storage.py index b8fd3a20f..69b1aa1c5 100644 --- a/Utils/Dataflow/pyDKB/storages/Storage.py +++ b/Utils/Dataflow/pyDKB/storages/Storage.py @@ -5,7 +5,8 @@ import sys from datetime import datetime -from exceptions import StorageNotConfigured +from exceptions import (StorageNotConfigured, + QueryError) class Storage(object): @@ -20,6 +21,9 @@ class Storage(object): # Storage client c = None + # Stored queries + stored_queries = {} + def __init__(self, name): """ Initialize Storage object. @@ -91,18 +95,35 @@ def save_query(self, query, qname=None): :param qname: query name (must not start with '__') :type qname: str """ - raise NotImplementedError + if qname and qname.startswith('__'): + raise ValueError("Query name must not start with '__'" + " (reserved for service needs).") + if not qname: + qname = '__last' + self.stored_queries[qname] = query + self.stored_queries[__last'] = query def get_query(self, qname): """ Get query by name. + Raise ``QueryError`` if query not found. + :param qname: query name (if None, last stored/used query will be used) :type qnmae: str :return: stored query :rtype: object """ - raise NotImplementedError + if not qname: + qname = '__last' + try: + q = self.stored_queries[qname] + self.stored_queries['__last'] = q + except KeyError: + raise QueryError("Query used before saving: '%s'" + % qname) + self.stored_queries['__last'] = q + return q def exec_query(self, qname=None, **kwargs): """ Execute stored query with given parameters. diff --git a/Utils/Dataflow/pyDKB/storages/exceptions.py b/Utils/Dataflow/pyDKB/storages/exceptions.py index 8b499ca21..846b9d51a 100644 --- a/Utils/Dataflow/pyDKB/storages/exceptions.py +++ b/Utils/Dataflow/pyDKB/storages/exceptions.py @@ -76,3 +76,8 @@ def __init__(self, message, *args, **kwargs): elif kwargs: message = message % kwargs super(InvalidRequest, self).__init__(message) + + +class QueryError(StorageException): + """ Exception indicating issues with stored queries. """ + pass From 14cf1cae828a15dc59f696ff2343aca5db92652e Mon Sep 17 00:00:00 2001 From: Marina Golosova Date: Fri, 26 Apr 2019 15:42:11 +0200 Subject: [PATCH 11/12] pyDKB/storages: add ES implementation for queries. Added concept of "raw" queries: it is useful when query must be stored, yet can not be converted to the format acceptable by storage client. Here it is JSON with %(parameter)s values: it may fail to be parsed as proper JSON if there is something like `"taskname": %(name)s`. --- Utils/Dataflow/pyDKB/storages/Storage.py | 35 ++++++- Utils/Dataflow/pyDKB/storages/es.py | 106 +++++++++++++++++++- Utils/Dataflow/pyDKB/storages/exceptions.py | 23 +++++ 3 files changed, 158 insertions(+), 6 deletions(-) diff --git a/Utils/Dataflow/pyDKB/storages/Storage.py b/Utils/Dataflow/pyDKB/storages/Storage.py index 69b1aa1c5..cdb05621a 100644 --- a/Utils/Dataflow/pyDKB/storages/Storage.py +++ b/Utils/Dataflow/pyDKB/storages/Storage.py @@ -87,21 +87,42 @@ def read_query(self, fname, qname=None): """ raise NotImplementedError - def save_query(self, query, qname=None): + def query_is_raw(self, query): + """ Check if given query is not compiled ("raw"). + + :param query: query body + :type query: obj + + :return: True/False + :rtype: bool + """ + raise NotImplementedError + + def save_query(self, query, qname=None, raw=False): """ Save query for further usage. :param query: query content :type query: object :param qname: query name (must not start with '__') :type qname: str + :param raw: store "raw" (not compiled) version of query + :type raw: bool """ if qname and qname.startswith('__'): raise ValueError("Query name must not start with '__'" " (reserved for service needs).") + if not raw: + try: + raw = self.query_is_raw(query) + except NotImplementedError: + pass + prefix = '' if not qname: qname = '__last' - self.stored_queries[qname] = query - self.stored_queries[__last'] = query + if raw: + prefix = '__raw' + self.stored_queries[prefix + qname] = query + self.stored_queries[prefix + '__last'] = query def get_query(self, qname): """ Get query by name. @@ -120,8 +141,12 @@ def get_query(self, qname): q = self.stored_queries[qname] self.stored_queries['__last'] = q except KeyError: - raise QueryError("Query used before saving: '%s'" - % qname) + # There still may be raw version of the query + try: + q = self.stored_queries['__raw' + qname] + except KeyError: + raise QueryError("Query used before saving: '%s'" + % qname) self.stored_queries['__last'] = q return q diff --git a/Utils/Dataflow/pyDKB/storages/es.py b/Utils/Dataflow/pyDKB/storages/es.py index 31ec01567..69637ed06 100644 --- a/Utils/Dataflow/pyDKB/storages/es.py +++ b/Utils/Dataflow/pyDKB/storages/es.py @@ -2,11 +2,14 @@ pyDKB.storages.es """ +import json + from Storage import Storage from . import storageType from exceptions import (StorageException, NotFound, - InvalidRequest) + InvalidRequest, + MissedParameter) try: @@ -29,6 +32,9 @@ class ES(Storage): # Default index index = None + # Default datetime format + datetime_fmt = '%d-%m-%Y %H:%M:%S' + type = storageType.ES def __init__(self, name): @@ -104,3 +110,101 @@ def get(self, id, fields=None, index=None, doc_type='_all', parent=None): raise NotFound(self.name, id=id, index=index) raise InvalidRequest(err) return r.get('_source', {}) + + def read_query(self, fname, qname=None): + """ Read query from file and save it. + + Raise ``QueryNotFound`` in case of failure. + + :param fname: file name + :type fname: str + :param qname: query name (for futher usage) + :type qname: str + """ + raw = False + try: + with open(fname, 'r') as f: + query = f.read() + query = json.loads(query) + except IOError: + raise QueryNotFound(qname, fname) + except ValueError: + # Query with parameters may fail when try to parse as JSON + # In this case we just store it as "raw" version + raw = True + self.save_query(query, qname, raw) + + def query_is_raw(self, query): + """ Check if given query is not compiled ("raw"). + + :param query: query body + :type query: str, dict + + :return: True/False + :rtype: bool + """ + return not isinstance(query, dict) + + def exec_query(self, qname=None, **kwargs): + """ Execute stored query with given parameters. + + :param qname: query name (if None, last used/read + one will be used) + :type qname: str, NoneType + :param kwargs: query parameters (applied with old-style + string formatting operator '%'). Parameter + name, started with '_', is treated as special + one: + * _size -- for ES request "size"; + * _type -- for ES request "doc_type"; + * _index -- for ES index to use. + :type kwargs: dict + + :return: storage response + :rtype: object + """ + query = self.get_query(qname) + raw = self.query_is_raw(query) + params = {} + for key in kwargs: + if key.startswith('_'): + continue + try: + params[key] = json.dumps(kwargs[key]) + except TypeError, err: + if 'datetime' in str(err): + val = json.dumps(kwargs[key].strftime(self.datetime_fmt)) + params[key] = val + else: + raise + q = {} + q['index'] = kwargs.get('_index', self.index) + q['size'] = kwargs.get('_size') + q['doc_type'] = kwargs.get('_type') + if params: + try: + if not raw: + query = json.dumps(query) + raw = True + query = query % params + except KeyError, err: + raise MissedParameter(qname, str(err)) + if raw: + try: + query = json.loads(query) + except ValueError, err: + msg = "Failed to parse query" + if qname: + msg += " (%r)" % qname + msg += ": %s" % err + raise QueryError(msg) + q['body'] = query + try: + r = self.client().search(**q) + except RequestError, err: + msg = "Query failed" + if qname: + msg += ": (%r)" % qname + msg += ": %s" % err + raise QueryError(msg) + return r diff --git a/Utils/Dataflow/pyDKB/storages/exceptions.py b/Utils/Dataflow/pyDKB/storages/exceptions.py index 846b9d51a..fee5a6ad9 100644 --- a/Utils/Dataflow/pyDKB/storages/exceptions.py +++ b/Utils/Dataflow/pyDKB/storages/exceptions.py @@ -81,3 +81,26 @@ def __init__(self, message, *args, **kwargs): class QueryError(StorageException): """ Exception indicating issues with stored queries. """ pass + + +class MissedParameter(QueryError): + """ Exception indicating that some query parameters are missed. """ + + def __init__(self, qname=None, param=None): + """ Initialize exception. + + :param qname: query name + :type qname: str, NoneType + :param param: parameter name(s) + :type param: str, list(str) + """ + message = 'Missed query parameters' + if param: + if isinstance(param, list): + p = ', '.join(param) + else: + p = param + message += ": %s" % p + if qname: + message += " ('%s')" % qname + super(MissedParameter, self).__init__(message) From 698c8e775b38bc77c5db37a73a70b9d586c4b480 Mon Sep 17 00:00:00 2001 From: Marina Golosova Date: Fri, 26 Apr 2019 19:55:57 +0200 Subject: [PATCH 12/12] pyDKB/storages: add possibility to pass configuration to `create()` function. --- Utils/Dataflow/pyDKB/storages/__init__.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/Utils/Dataflow/pyDKB/storages/__init__.py b/Utils/Dataflow/pyDKB/storages/__init__.py index 4cc064cb4..137b0438e 100644 --- a/Utils/Dataflow/pyDKB/storages/__init__.py +++ b/Utils/Dataflow/pyDKB/storages/__init__.py @@ -13,7 +13,7 @@ storages = {} -def create(name, stype): +def create(name, stype, cfg=None): """ Create storage of given type. Raise ``StorageAlreadyExists`` if storage with given name was created @@ -23,6 +23,8 @@ def create(name, stype): :type name: str :param stype: storage type :type stype: storageType member + :param cfg: storage configuration (if None, won't be applied) + :type cfg: dict, NoneType :return: Storage object :rtype: Storage @@ -38,6 +40,8 @@ def create(name, stype): raise NotImplementedError("Storage class is not implemented for: '%s'" % storageType.memberName(stype)) storages[name] = cls(name) + if cfg is not None: + storages[name].configure(cfg) return storages[name]