Skip to content

Commit

Permalink
[discover] registered languages interceptor clean up and aggs (#7870)
Browse files Browse the repository at this point in the history
Clean up interceptor and strategies. Re-add aggregations working for PPL language

Signed-off-by: Kawika Avilla <[email protected]>
  • Loading branch information
kavilla authored Aug 28, 2024
1 parent f9965cd commit 9b8266f
Show file tree
Hide file tree
Showing 14 changed files with 308 additions and 522 deletions.
2 changes: 2 additions & 0 deletions changelogs/fragments/7870.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
fix:
- Clean up language search interceptors and fix aggs for PPL ([#7870](https://github.com/opensearch-project/OpenSearch-Dashboards/pull/7870))
253 changes: 23 additions & 230 deletions src/plugins/data/common/data_frames/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import datemath from '@opensearch/datemath';
import {
DATA_FRAME_TYPES,
DataFrameAggConfig,
DataFrameBucketAgg,
IDataFrame,
IDataFrameWithAggs,
IDataFrameResponse,
Expand All @@ -17,139 +16,7 @@ import {
} from './types';
import { IFieldType } from './fields';
import { IndexPatternFieldMap, IndexPatternSpec } from '../index_patterns';
import { IOpenSearchDashboardsSearchRequest } from '../search';
import { GetAggTypeFn, GetDataFrameAggQsFn, Query, TimeRange } from '../types';

/**
* Returns the raw data frame from the search request.
*
* @param searchRequest - search request object.
* @returns dataframe
*/
export const getRawDataFrame = (searchRequest: IOpenSearchDashboardsSearchRequest) => {
return searchRequest.params?.body?.df;
};

/**
* Returns the raw query string from the search request.
* Gets current state query if exists, otherwise gets the initial query.
*
* @param searchRequest - search request object
* @returns query string
*/
export const getRawQueryString = (
searchRequest: IOpenSearchDashboardsSearchRequest
): string | undefined => {
return (
searchRequest.params?.body?.query?.queries[1]?.query ??
searchRequest.params?.body?.query?.queries[0]?.query
);
};

/**
* Returns the raw aggregations from the search request.
*
* @param searchRequest - search request object
* @returns aggregations
*/
export const getRawAggs = (searchRequest: IOpenSearchDashboardsSearchRequest) => {
return searchRequest.params?.body?.aggs;
};

/**
* Returns the unique values for raw aggregations. This is used
* with `other-filter` aggregation. To get the values that were not
* included in the aggregation response prior to this request.
*
* @param rawAggs - raw aggregations object
* @returns object containing the field and its unique values
*/
export const getUniqueValuesForRawAggs = (rawAggs: Record<string, any>) => {
const filters = rawAggs.filters?.filters?.['']?.bool?.must_not;
if (!filters || !Array.isArray(filters)) {
return null;
}
const values: unknown[] = [];
let field: string | undefined;

filters.forEach((agg: any) => {
Object.values(agg).forEach((aggValue) => {
Object.entries(aggValue as Record<string, any>).forEach(([key, value]) => {
field = key;
values.push(value);
});
});
});

return { field, values };
};

/**
* Returns the aggregation configuration for raw aggregations.
* Aggregations are nested objects, so this function recursively
* builds an object that is easier to work with.
*
* @param rawAggs - raw aggregations object
* @returns aggregation configuration
*/
export const getAggConfigForRawAggs = (rawAggs: Record<string, any>): DataFrameAggConfig | null => {
const aggConfig: DataFrameAggConfig = { id: '', type: '' };

Object.entries(rawAggs).forEach(([aggKey, agg]) => {
aggConfig.id = aggKey;
Object.entries(agg as Record<string, unknown>).forEach(([name, value]) => {
if (name === 'aggs') {
aggConfig.aggs = {};
Object.entries(value as Record<string, unknown>).forEach(([subAggKey, subRawAgg]) => {
const subAgg = getAggConfigForRawAggs(subRawAgg as Record<string, any>);
if (subAgg) {
aggConfig.aggs![subAgg.id] = { ...subAgg, id: subAggKey };
}
});
} else {
aggConfig.type = name;
Object.assign(aggConfig, { [name]: value });
}
});
});

return aggConfig;
};

/**
* Returns the aggregation configuration.
*
* @param searchRequest - search request object
* @param aggConfig - aggregation configuration object
* @param getAggTypeFn - function to get the aggregation type from the aggsService
* @returns aggregation configuration
*/
export const getAggConfig = (
searchRequest: IOpenSearchDashboardsSearchRequest,
aggConfig: Partial<DataFrameAggConfig> = {},
getAggTypeFn: GetAggTypeFn
): DataFrameAggConfig => {
const rawAggs = getRawAggs(searchRequest);
Object.entries(rawAggs).forEach(([aggKey, agg]) => {
aggConfig.id = aggKey;
Object.entries(agg as Record<string, unknown>).forEach(([name, value]) => {
if (name === 'aggs' && value) {
aggConfig.aggs = {};
Object.entries(value as Record<string, unknown>).forEach(([subAggKey, subRawAgg]) => {
const subAgg = getAggConfigForRawAggs(subRawAgg as Record<string, any>);
if (subAgg) {
aggConfig.aggs![subAgg.id] = { ...subAgg, id: subAggKey };
}
});
} else {
aggConfig.type = getAggTypeFn(name)?.type ?? name;
Object.assign(aggConfig, { [name]: value });
}
});
});

return aggConfig as DataFrameAggConfig;
};
import { TimeRange } from '../types';

/**
* Converts the data frame response to a search response.
Expand Down Expand Up @@ -199,61 +66,35 @@ export const convertResult = (response: IDataFrameResponse): SearchResponse<any>
if (data.hasOwnProperty('aggs')) {
const dataWithAggs = data as IDataFrameWithAggs;
if (!dataWithAggs.aggs) {
// TODO: MQL best guess, get timestamp field and caculate it here
return searchResponse;
}
searchResponse.aggregations = Object.entries(dataWithAggs.aggs).reduce(
(acc: Record<string, unknown>, [id, value]) => {
const aggConfig = dataWithAggs.meta?.aggs;
if (id === 'other-filter') {
const buckets = value as DataFrameBucketAgg[];
buckets.forEach((bucket) => {
const bucketValue = bucket.value;
searchResponse.hits.total += bucketValue;
});
acc[id] = {
buckets: [{ '': { doc_count: 0 } }],
};
return acc;
}
if (aggConfig && aggConfig.type === 'buckets') {
const buckets = value as DataFrameBucketAgg[];
acc[id] = {
buckets: buckets.map((bucket) => {
const bucketValue = bucket.value;
searchResponse.hits.total += bucketValue;
return {
key_as_string: bucket.key,
key: (aggConfig as DataFrameAggConfig).date_histogram
? new Date(bucket.key).getTime()
: bucket.key,
doc_count: bucketValue,
};
}),
};
return acc;
}
acc[id] = Array.isArray(value) ? value[0] : value;
return acc;
},
{}
);
searchResponse.aggregations = {};

const aggConfig = dataWithAggs.meta;
Object.entries(dataWithAggs.aggs).forEach(([id, value]) => {
if (aggConfig && aggConfig.date_histogram) {
const buckets = value as Array<{ key: string; value: number }>;
searchResponse.aggregations[id] = {
buckets: buckets.map((bucket) => {
const timestamp = new Date(bucket.key).getTime();
searchResponse.hits.total += bucket.value;
return {
key_as_string: bucket.key,
key: timestamp,
doc_count: bucket.value,
};
}),
};
} else {
// Handle other aggregation types here if needed
searchResponse.aggregations[id] = value;
}
});
}

return searchResponse;
};

/**
* Formats the field value.
*
* @param field - field object
* @param value - value to format
* @returns formatted value
*/
export const formatFieldValue = (field: IFieldType | Partial<IFieldType>, value: any): any => {
return field.format && field.format.convert ? field.format.convert(value) : value;
};

/**
* Returns the field type. This function is used to determine the field type so that can
* be used by the rest of the application. The field type must map to a OsdFieldType
Expand Down Expand Up @@ -372,54 +213,6 @@ export const createDataFrame = (partial: PartialDataFrame): IDataFrame | IDataFr
};
};

/**
* Updates the data frame metadata. Metadata is used to store the aggregation configuration.
* It also stores the query string used to fetch the data frame aggregations.
*
* @param params - { dataFrame, qs, aggConfig, timeField, timeFilter, getAggQsFn }
*/
export const updateDataFrameMeta = ({
dataFrame,
query,
aggConfig,
timeField,
timeFilter,
getAggQsFn,
}: {
dataFrame: IDataFrame;
query: Query;
aggConfig: DataFrameAggConfig;
timeField: any;
timeFilter: string;
getAggQsFn: GetDataFrameAggQsFn;
}) => {
dataFrame.meta = {
...dataFrame?.meta,
aggs: aggConfig,
aggsQs: {
[aggConfig.id]: getAggQsFn({
query,
aggConfig,
timeField,
timeFilter,
}),
},
};

if (aggConfig.aggs) {
const subAggs = aggConfig.aggs as Record<string, DataFrameAggConfig>;
for (const [key, subAgg] of Object.entries(subAggs)) {
const subAggConfig: Record<string, any> = { [key]: subAgg };
dataFrame.meta.aggsQs[subAgg.id] = getAggQsFn({
query,
aggConfig: subAggConfig as DataFrameAggConfig,
timeField,
timeFilter,
});
}
}
};

/**
* Converts a data frame to index pattern spec which can be used to create an index pattern.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
Dataset,
IIndexPattern,
DATA_STRUCTURE_META_TYPES,
DataStructureCustomMeta,
} from '../../../../../common';
import { DatasetTypeConfig } from '../types';
import { getIndexPatterns } from '../../../../services';
Expand All @@ -27,10 +28,12 @@ export const indexPatternTypeConfig: DatasetTypeConfig = {

toDataset: (path) => {
const pattern = path[path.length - 1];
const patternMeta = pattern.meta as DataStructureCustomMeta;
return {
id: pattern.id,
title: pattern.title,
type: DEFAULT_DATA.SET_TYPES.INDEX_PATTERN,
timeFieldName: patternMeta?.timeFieldName,
dataSource: pattern.parent
? {
id: pattern.parent.id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,12 @@

import { SavedObjectsClientContract } from 'opensearch-dashboards/public';
import { map } from 'rxjs/operators';
import { DEFAULT_DATA, DataStructure, Dataset } from '../../../../../common';
import {
DEFAULT_DATA,
DataStructure,
DataStructureCustomMeta,
Dataset,
} from '../../../../../common';
import { DatasetTypeConfig } from '../types';
import { getSearchService, getIndexPatterns } from '../../../../services';
import { injectMetaToDataStructures } from './utils';
Expand All @@ -29,11 +34,13 @@ export const indexTypeConfig: DatasetTypeConfig = {
toDataset: (path) => {
const index = path[path.length - 1];
const dataSource = path.find((ds) => ds.type === 'DATA_SOURCE');
const indexMeta = index.meta as DataStructureCustomMeta;

return {
id: index.id,
title: index.title,
type: DEFAULT_DATA.SET_TYPES.INDEX,
timeFieldName: indexMeta?.timeFieldName,
dataSource: dataSource
? {
id: dataSource.id,
Expand Down
4 changes: 4 additions & 0 deletions src/plugins/query_enhancements/common/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ export const PLUGIN_NAME = 'queryEnhancements';

export const BASE_API = '/api/enhancements';

export const DATASET = {
S3: 'S3',
};

export const SEARCH_STRATEGY = {
PPL: 'ppl',
PPL_RAW: 'pplraw',
Expand Down
13 changes: 12 additions & 1 deletion src/plugins/query_enhancements/common/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,18 @@
import { CoreSetup } from 'opensearch-dashboards/public';
import { Observable } from 'rxjs';

export interface FetchDataFrameContext {
export interface QueryAggConfig {
[key: string]: {
field?: string;
fixed_interval?: string;
calendar_interval?: string;
min_doc_count?: number;
time_zone?: string;
[x: number]: string;
};
}

export interface EnhancedFetchContext {
http: CoreSetup['http'];
path: string;
signal?: AbortSignal;
Expand Down
Loading

0 comments on commit 9b8266f

Please sign in to comment.