From 26a9113ae40b3b238fb3ea206ecec6feec9a0281 Mon Sep 17 00:00:00 2001 From: Marina Golosova Date: Tue, 2 Jun 2020 13:38:46 +0300 Subject: [PATCH 1/3] API/storages/es: refactoring. Breaking tasks into steps via 'terms' query looks like a very general way to do the task, we just might want to change the field name -- which is easier to do in a hash than in if/elif/elif/elif clause. --- Utils/API/server/lib/dkb/api/storages/es/common.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/Utils/API/server/lib/dkb/api/storages/es/common.py b/Utils/API/server/lib/dkb/api/storages/es/common.py index d9378aa17..f66d76e52 100644 --- a/Utils/API/server/lib/dkb/api/storages/es/common.py +++ b/Utils/API/server/lib/dkb/api/storages/es/common.py @@ -354,7 +354,10 @@ def get_step_aggregation_query(step_type=None, selection_params={}): elif step_type not in STEP_TYPES: raise ValueError(step_type, "Unknown step type (expected one of: %s)" % STEP_TYPES) - if step_type == 'ctag_format': + step_fields = {'step': 'step_name.keyword'} + if step_type in step_fields: + aggs = {'steps': {'terms': {'field': step_fields[step_type]}}} + elif step_type == 'ctag_format': formats = output_formats(**selection_params) filters = {} for f in formats: @@ -366,8 +369,6 @@ def get_step_aggregation_query(step_type=None, selection_params={}): } aggs = {'steps': {'filters': {'filters': filters}, 'aggs': {'substeps': {'terms': {'field': 'ctag'}}}}} - elif step_type == 'step': - aggs = {'steps': {'terms': {'field': 'step_name.keyword'}}} else: raise DkbApiNotImplemented("Aggregation by steps of type '%s' is not" " implemented yet.") From 4294ba1e8db0b66bc421958ff6453beacddc3630 Mon Sep 17 00:00:00 2001 From: Marina Golosova Date: Tue, 2 Jun 2020 14:11:01 +0300 Subject: [PATCH 2/3] API: add new method (campaign/daily_progress). --- Utils/API/server/dkb.yaml.example | 1 + Utils/API/server/lib/dkb/api/handlers.py | 75 ++++++++++++++++ .../server/lib/dkb/api/storages/__init__.py | 40 +++++++++ .../lib/dkb/api/storages/es/__init__.py | 1 + .../server/lib/dkb/api/storages/es/common.py | 12 ++- .../server/lib/dkb/api/storages/es/methods.py | 86 +++++++++++++++++++ .../es/query/campaign-daily-progress-aggs | 15 ++++ .../lib/dkb/api/storages/es/transform.py | 28 ++++++ 8 files changed, 256 insertions(+), 2 deletions(-) create mode 100644 Utils/API/server/lib/dkb/api/storages/es/query/campaign-daily-progress-aggs diff --git a/Utils/API/server/dkb.yaml.example b/Utils/API/server/dkb.yaml.example index d9a891142..ca08258ce 100644 --- a/Utils/API/server/dkb.yaml.example +++ b/Utils/API/server/dkb.yaml.example @@ -7,3 +7,4 @@ storages: index: production_tasks: index_name_1 analysis_tasks: index_name_2 + daily_progress: index_name_3 diff --git a/Utils/API/server/lib/dkb/api/handlers.py b/Utils/API/server/lib/dkb/api/handlers.py index e86f01972..b0323eaa7 100644 --- a/Utils/API/server/lib/dkb/api/handlers.py +++ b/Utils/API/server/lib/dkb/api/handlers.py @@ -512,6 +512,81 @@ def campaign_stat(path, rtype='json', step_type=None, events_src=None, methods.add('/campaign', 'stat', campaign_stat) +def campaign_daily_progress(path, rtype='json', step_type=None, **kwargs): + """ Get daily progress in terms of events processing. + + Returns JSON document of the following format: + ``` + { + ... + "data": { + : { + : , + ... + }, + ... + } + } + ``` + + :param path: full path to the method + :type path: str + :param rtype: response type (only 'json' supported) + :type rtype: str + + :param step_type: step definition type: 'step', 'ctag_format' + (default: 'step') + :type step_type: str + + :param : defines condition to select tasks for + statistics. Parameter names are mapped + to storage record fields (names and/or + aliases). Values should be provided in + one of the following forms: + * ``None`` (field must not be presented + in selected records); + * exact field value; + * exact field value with logical prefix: + - ``&`` -- field must value this value; + - ``|`` -- field must have one of values + marked with this prefix + (default); + - ``!`` -- field must not have this value; + * list of field values (prefixed or not). + Supported parameters are: + * htag (hashtag_list); + * taskid. + :type : NoneType, str, number, list + + :returns: campaign (or task) daily progress + :rtype: dict + """ + method_name = '/campaign/daily_progress' + if kwargs.get('rtype', 'json') is not 'json': + raise MethodException(method_name, "Unsupported response type: '%s'" + % kwargs['rtype']) + allowed_types = STEP_TYPES + if step_type is None: + step_type = allowed_types[0] + if (step_type not in allowed_types): + raise InvalidArgument(method_name, ('step_type', step_type, + allowed_types)) + params = {} + for param in kwargs: + vals = kwargs[param] + if not isinstance(vals, list): + vals = [vals] + if vals: + vals = sort_by_prefixes(vals, ['|', '&', '!']) + params[param] = vals + + return storages.campaign_daily_progress(selection_params=params, + step_type=step_type) + + +methods.add('/campaign', 'daily_progress', campaign_daily_progress) + + def step_stat(path, rtype='json', step_type=None, **kwargs): """ Get tasks statistics. diff --git a/Utils/API/server/lib/dkb/api/storages/__init__.py b/Utils/API/server/lib/dkb/api/storages/__init__.py index 3efb3aabd..feb1d7fe3 100644 --- a/Utils/API/server/lib/dkb/api/storages/__init__.py +++ b/Utils/API/server/lib/dkb/api/storages/__init__.py @@ -91,6 +91,46 @@ def campaign_stat(**kwargs): return es.campaign_stat(**kwargs) +def campaign_daily_progress(**kwargs): + """ Generate events processing daily progress report. + + :param step_type: step definition type: 'step', 'ctag_format' + (default: 'step') + :type step_type: str + + :param selection_params: defines conditions to select tasks for + statistics. Parameter names are mapped + to storage record fields (names and/or + aliases). Values should be provided in + one of the following forms: + * ``None`` (field must not be presented + in selected records); + * (list of) exact field value(s). + Field values are broken into categories: + * ``&`` -- field must have all these values; + * ``|`` -- field must have at least one of + these values; + * ``!`` -- field must not have none of these + values. + Expected format: + ``` + { + : { + : [], + ... + }, + ... + } + ``` + :type selection_params: dict + + :returns: daily progress of events processing in the form required + by :py:func:`api.handlers.campaign_daily_progress` + :rtype: dict + """ + return es.campaign_daily_progress(**kwargs) + + def step_stat(**kwargs): """ Calculate statistics for tasks by execution steps. diff --git a/Utils/API/server/lib/dkb/api/storages/es/__init__.py b/Utils/API/server/lib/dkb/api/storages/es/__init__.py index 1f4d15549..49fc536d6 100644 --- a/Utils/API/server/lib/dkb/api/storages/es/__init__.py +++ b/Utils/API/server/lib/dkb/api/storages/es/__init__.py @@ -9,5 +9,6 @@ task_chain, task_kwsearch, task_derivation_statistics, + campaign_daily_progress, campaign_stat, step_stat) diff --git a/Utils/API/server/lib/dkb/api/storages/es/common.py b/Utils/API/server/lib/dkb/api/storages/es/common.py index f66d76e52..4b249ac89 100644 --- a/Utils/API/server/lib/dkb/api/storages/es/common.py +++ b/Utils/API/server/lib/dkb/api/storages/es/common.py @@ -332,7 +332,8 @@ def output_formats(**kwargs): r['aggregations']['formats']['buckets']] -def get_step_aggregation_query(step_type=None, selection_params={}): +def get_step_aggregation_query(step_type=None, selection_params={}, + progress=False): """ Construct "aggs" part of ES query for steps aggregation. :raises: `ValueError`: unknown step type. @@ -345,6 +346,9 @@ def get_step_aggregation_query(step_type=None, selection_params={}): (see :py:func:`get_selection_query`) :type selection_params: dict + :param progress: flag parameter for progress data + :type progress: bool + :return: "aggs" part of ES query :rtype: dict """ @@ -354,7 +358,11 @@ def get_step_aggregation_query(step_type=None, selection_params={}): elif step_type not in STEP_TYPES: raise ValueError(step_type, "Unknown step type (expected one of: %s)" % STEP_TYPES) - step_fields = {'step': 'step_name.keyword'} + step_fields = {'progress_ctag_format': 'ctag_format_step', + 'progress_step': 'mc_step', + 'step': 'step_name.keyword'} + if progress: + step_type = 'progress_' + step_type if step_type in step_fields: aggs = {'steps': {'terms': {'field': step_fields[step_type]}}} elif step_type == 'ctag_format': diff --git a/Utils/API/server/lib/dkb/api/storages/es/methods.py b/Utils/API/server/lib/dkb/api/storages/es/methods.py index 03e7a992a..8388c1338 100644 --- a/Utils/API/server/lib/dkb/api/storages/es/methods.py +++ b/Utils/API/server/lib/dkb/api/storages/es/methods.py @@ -351,6 +351,92 @@ def campaign_stat(selection_params, step_type='step', events_src=None): return r +def campaign_daily_progress(selection_params, step_type='step'): + """ Generate events processing daily progress report. + + :param step_type: step definition type: 'step', 'ctag_format' + (default: 'step') + :type step_type: str + + :param selection_params: defines conditions to select tasks for + statistics. Parameter names are mapped + to storage record fields (names and/or + aliases). Values should be provided in + one of the following forms: + * ``None`` (field must not be presented + in selected records); + * (list of) exact field value(s). + Field values are broken into categories: + * ``&`` -- field must have all these values; + * ``|`` -- field must have at least one of + these values; + * ``!`` -- field must not have none of these + values. + Expected format: + ``` + { + : { + : [], + ... + }, + ... + } + ``` + :type selection_params: dict + + :returns: daily progress of events processing in the form required + by :py:func:`api.handlers.campaign_daily_progress` + :rtype: dict + """ + init() + # Construct query + query = {} + # * index with progress data + try: + query['index'] = common.CONFIG['index']['daily_progress'] + except KeyError, e: + raise MethodException("Missed ES index name in configuration: %s" % e) + # * doc type + query['doc_type'] = 'task_progress' + # * and query body: + # - select tasks + q = get_selection_query(**selection_params) + # - divide them into 'steps' + step_agg = get_step_aggregation_query(step_type, progress=True) + # - get agg values for each step ('instep' aggs) + instep_aggs = get_query('campaign-daily-progress-aggs') + # - put 'instep' aggs into the innermost (sub) step clause + instep_clause = step_agg['steps'] + while instep_clause.get('aggs'): + instep_clause = instep_clause['aggs'].get('substeps') + if instep_clause: + instep_clause['aggs'] = {} + instep_clause = instep_clause['aggs'] + instep_clause.update(instep_aggs) + # - join 'query' and 'aggs' parts within request body + q_body = {'query': q, 'aggs': step_agg} + + query['body'] = q_body + query['size'] = 0 + + r = {} + data = {} + try: + data = client().search(**query) + except Exception, err: + msg = "(%s) Failed to execute search query: %s." % (STORAGE_NAME, + str(err)) + raise MethodException(reason=msg) + try: + data = transform.campaign_daily_progress(data) + except Exception, err: + msg = "Failed to parse storage response: %s." % str(err) + raise MethodException(reason=msg) + + r.update(data) + return r + + def step_stat(selection_params, step_type='step'): """ Calculate statistics for tasks by execution steps. diff --git a/Utils/API/server/lib/dkb/api/storages/es/query/campaign-daily-progress-aggs b/Utils/API/server/lib/dkb/api/storages/es/query/campaign-daily-progress-aggs new file mode 100644 index 000000000..91083b602 --- /dev/null +++ b/Utils/API/server/lib/dkb/api/storages/es/query/campaign-daily-progress-aggs @@ -0,0 +1,15 @@ +{ + "daily_progress": { + "date_histogram": { + "field": "date", + "interval": "day", + "format": "yyyy-MM-dd", + "min_doc_count": 1 + }, + "aggs": { + "processed_events": { + "sum": {"field": "processed_events"} + } + } + } +} diff --git a/Utils/API/server/lib/dkb/api/storages/es/transform.py b/Utils/API/server/lib/dkb/api/storages/es/transform.py index 77a9c381d..32f9cf22a 100644 --- a/Utils/API/server/lib/dkb/api/storages/es/transform.py +++ b/Utils/API/server/lib/dkb/api/storages/es/transform.py @@ -455,6 +455,34 @@ def campaign_stat(stat_data, events_src=None): return r +def campaign_daily_progress(es_data): + """ ES query response transformation for ``campaign/daily_progress``. + + :param data: ES response + :type data: dict + + :return: prepared API response for ``campaign/daily_progress`` + :rtype: dict + """ + r = {} + data = {} + r['_took_storage_ms'] = es_data.pop('took', None) + r['_total'] = es_data.get('hits', {}).pop('total', None) + r['_data'] = data + data['date_format'] = '%y-%m-%d' + steps = steps_iterator(es_data.get('aggregations', {})) + for name, step_data in steps: + data[name] = {} + hist_data = step_data.get('daily_progress', {}) \ + .get('buckets', []) + for d in hist_data: + date = d.get('key_as_string') + data[name][date] = d.get('processed_events', {}) \ + .get('value') + + return r + + def step_stat(data, agg_units=[], step_type=None): """ Transform ES query response to required response format. From 603d7be52f054c4c43bbd3de4e4522d3588371b1 Mon Sep 17 00:00:00 2001 From: Marina Golosova Date: Mon, 6 Jul 2020 14:05:05 +0200 Subject: [PATCH 3/3] API: update version info (0.2.dev20200706). --- Utils/API/server/lib/dkb/api/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Utils/API/server/lib/dkb/api/__init__.py b/Utils/API/server/lib/dkb/api/__init__.py index 2ea28e573..b5ee84cb2 100644 --- a/Utils/API/server/lib/dkb/api/__init__.py +++ b/Utils/API/server/lib/dkb/api/__init__.py @@ -10,7 +10,7 @@ CONFIG_DIR = '%%CFG_DIR%%' -__version__ = '0.2.dev20200416' +__version__ = '0.2.dev20200706' STATUS_CODES = {