From 9b8266f178270e0e93aa82fa1e2f9c8c15737238 Mon Sep 17 00:00:00 2001 From: Kawika Avilla <kavilla414@gmail.com> Date: Wed, 28 Aug 2024 16:11:32 -0700 Subject: [PATCH] [discover] registered languages interceptor clean up and aggs (#7870) Clean up interceptor and strategies. Re-add aggregations working for PPL language Signed-off-by: Kawika Avilla <kavilla414@gmail.com> --- changelogs/fragments/7870.yml | 2 + src/plugins/data/common/data_frames/utils.ts | 253 ++---------------- .../dataset_service/lib/index_pattern_type.ts | 3 + .../dataset_service/lib/index_type.ts | 9 +- .../query_enhancements/common/constants.ts | 4 + .../query_enhancements/common/types.ts | 13 +- .../query_enhancements/common/utils.ts | 19 +- .../public/datasets/s3_handler.ts | 12 +- .../public/search/ppl_search_interceptor.ts | 168 +++++------- .../public/search/sql_search_interceptor.ts | 146 +++------- .../query_enhancements/server/routes/index.ts | 45 ++++ .../server/search/ppl_search_strategy.ts | 46 +--- .../server/search/sql_search_strategy.test.ts | 81 +++++- .../server/search/sql_search_strategy.ts | 29 +- 14 files changed, 308 insertions(+), 522 deletions(-) create mode 100644 changelogs/fragments/7870.yml diff --git a/changelogs/fragments/7870.yml b/changelogs/fragments/7870.yml new file mode 100644 index 000000000000..cb0c33dfcb39 --- /dev/null +++ b/changelogs/fragments/7870.yml @@ -0,0 +1,2 @@ +fix: +- Clean up language search interceptors and fix aggs for PPL ([#7870](https://github.com/opensearch-project/OpenSearch-Dashboards/pull/7870)) \ No newline at end of file diff --git a/src/plugins/data/common/data_frames/utils.ts b/src/plugins/data/common/data_frames/utils.ts index 8c9f63b0f0d3..fdee757bfabb 100644 --- a/src/plugins/data/common/data_frames/utils.ts +++ b/src/plugins/data/common/data_frames/utils.ts @@ -8,7 +8,6 @@ import datemath from '@opensearch/datemath'; import { DATA_FRAME_TYPES, DataFrameAggConfig, - DataFrameBucketAgg, IDataFrame, IDataFrameWithAggs, IDataFrameResponse, @@ -17,139 +16,7 @@ import { } from './types'; import { IFieldType } from './fields'; import { IndexPatternFieldMap, IndexPatternSpec } from '../index_patterns'; -import { IOpenSearchDashboardsSearchRequest } from '../search'; -import { GetAggTypeFn, GetDataFrameAggQsFn, Query, TimeRange } from '../types'; - -/** - * Returns the raw data frame from the search request. - * - * @param searchRequest - search request object. - * @returns dataframe - */ -export const getRawDataFrame = (searchRequest: IOpenSearchDashboardsSearchRequest) => { - return searchRequest.params?.body?.df; -}; - -/** - * Returns the raw query string from the search request. - * Gets current state query if exists, otherwise gets the initial query. - * - * @param searchRequest - search request object - * @returns query string - */ -export const getRawQueryString = ( - searchRequest: IOpenSearchDashboardsSearchRequest -): string | undefined => { - return ( - searchRequest.params?.body?.query?.queries[1]?.query ?? - searchRequest.params?.body?.query?.queries[0]?.query - ); -}; - -/** - * Returns the raw aggregations from the search request. - * - * @param searchRequest - search request object - * @returns aggregations - */ -export const getRawAggs = (searchRequest: IOpenSearchDashboardsSearchRequest) => { - return searchRequest.params?.body?.aggs; -}; - -/** - * Returns the unique values for raw aggregations. This is used - * with `other-filter` aggregation. To get the values that were not - * included in the aggregation response prior to this request. - * - * @param rawAggs - raw aggregations object - * @returns object containing the field and its unique values - */ -export const getUniqueValuesForRawAggs = (rawAggs: Record<string, any>) => { - const filters = rawAggs.filters?.filters?.['']?.bool?.must_not; - if (!filters || !Array.isArray(filters)) { - return null; - } - const values: unknown[] = []; - let field: string | undefined; - - filters.forEach((agg: any) => { - Object.values(agg).forEach((aggValue) => { - Object.entries(aggValue as Record<string, any>).forEach(([key, value]) => { - field = key; - values.push(value); - }); - }); - }); - - return { field, values }; -}; - -/** - * Returns the aggregation configuration for raw aggregations. - * Aggregations are nested objects, so this function recursively - * builds an object that is easier to work with. - * - * @param rawAggs - raw aggregations object - * @returns aggregation configuration - */ -export const getAggConfigForRawAggs = (rawAggs: Record<string, any>): DataFrameAggConfig | null => { - const aggConfig: DataFrameAggConfig = { id: '', type: '' }; - - Object.entries(rawAggs).forEach(([aggKey, agg]) => { - aggConfig.id = aggKey; - Object.entries(agg as Record<string, unknown>).forEach(([name, value]) => { - if (name === 'aggs') { - aggConfig.aggs = {}; - Object.entries(value as Record<string, unknown>).forEach(([subAggKey, subRawAgg]) => { - const subAgg = getAggConfigForRawAggs(subRawAgg as Record<string, any>); - if (subAgg) { - aggConfig.aggs![subAgg.id] = { ...subAgg, id: subAggKey }; - } - }); - } else { - aggConfig.type = name; - Object.assign(aggConfig, { [name]: value }); - } - }); - }); - - return aggConfig; -}; - -/** - * Returns the aggregation configuration. - * - * @param searchRequest - search request object - * @param aggConfig - aggregation configuration object - * @param getAggTypeFn - function to get the aggregation type from the aggsService - * @returns aggregation configuration - */ -export const getAggConfig = ( - searchRequest: IOpenSearchDashboardsSearchRequest, - aggConfig: Partial<DataFrameAggConfig> = {}, - getAggTypeFn: GetAggTypeFn -): DataFrameAggConfig => { - const rawAggs = getRawAggs(searchRequest); - Object.entries(rawAggs).forEach(([aggKey, agg]) => { - aggConfig.id = aggKey; - Object.entries(agg as Record<string, unknown>).forEach(([name, value]) => { - if (name === 'aggs' && value) { - aggConfig.aggs = {}; - Object.entries(value as Record<string, unknown>).forEach(([subAggKey, subRawAgg]) => { - const subAgg = getAggConfigForRawAggs(subRawAgg as Record<string, any>); - if (subAgg) { - aggConfig.aggs![subAgg.id] = { ...subAgg, id: subAggKey }; - } - }); - } else { - aggConfig.type = getAggTypeFn(name)?.type ?? name; - Object.assign(aggConfig, { [name]: value }); - } - }); - }); - - return aggConfig as DataFrameAggConfig; -}; +import { TimeRange } from '../types'; /** * Converts the data frame response to a search response. @@ -199,61 +66,35 @@ export const convertResult = (response: IDataFrameResponse): SearchResponse<any> if (data.hasOwnProperty('aggs')) { const dataWithAggs = data as IDataFrameWithAggs; if (!dataWithAggs.aggs) { - // TODO: MQL best guess, get timestamp field and caculate it here return searchResponse; } - searchResponse.aggregations = Object.entries(dataWithAggs.aggs).reduce( - (acc: Record<string, unknown>, [id, value]) => { - const aggConfig = dataWithAggs.meta?.aggs; - if (id === 'other-filter') { - const buckets = value as DataFrameBucketAgg[]; - buckets.forEach((bucket) => { - const bucketValue = bucket.value; - searchResponse.hits.total += bucketValue; - }); - acc[id] = { - buckets: [{ '': { doc_count: 0 } }], - }; - return acc; - } - if (aggConfig && aggConfig.type === 'buckets') { - const buckets = value as DataFrameBucketAgg[]; - acc[id] = { - buckets: buckets.map((bucket) => { - const bucketValue = bucket.value; - searchResponse.hits.total += bucketValue; - return { - key_as_string: bucket.key, - key: (aggConfig as DataFrameAggConfig).date_histogram - ? new Date(bucket.key).getTime() - : bucket.key, - doc_count: bucketValue, - }; - }), - }; - return acc; - } - acc[id] = Array.isArray(value) ? value[0] : value; - return acc; - }, - {} - ); + searchResponse.aggregations = {}; + + const aggConfig = dataWithAggs.meta; + Object.entries(dataWithAggs.aggs).forEach(([id, value]) => { + if (aggConfig && aggConfig.date_histogram) { + const buckets = value as Array<{ key: string; value: number }>; + searchResponse.aggregations[id] = { + buckets: buckets.map((bucket) => { + const timestamp = new Date(bucket.key).getTime(); + searchResponse.hits.total += bucket.value; + return { + key_as_string: bucket.key, + key: timestamp, + doc_count: bucket.value, + }; + }), + }; + } else { + // Handle other aggregation types here if needed + searchResponse.aggregations[id] = value; + } + }); } return searchResponse; }; -/** - * Formats the field value. - * - * @param field - field object - * @param value - value to format - * @returns formatted value - */ -export const formatFieldValue = (field: IFieldType | Partial<IFieldType>, value: any): any => { - return field.format && field.format.convert ? field.format.convert(value) : value; -}; - /** * Returns the field type. This function is used to determine the field type so that can * be used by the rest of the application. The field type must map to a OsdFieldType @@ -372,54 +213,6 @@ export const createDataFrame = (partial: PartialDataFrame): IDataFrame | IDataFr }; }; -/** - * Updates the data frame metadata. Metadata is used to store the aggregation configuration. - * It also stores the query string used to fetch the data frame aggregations. - * - * @param params - { dataFrame, qs, aggConfig, timeField, timeFilter, getAggQsFn } - */ -export const updateDataFrameMeta = ({ - dataFrame, - query, - aggConfig, - timeField, - timeFilter, - getAggQsFn, -}: { - dataFrame: IDataFrame; - query: Query; - aggConfig: DataFrameAggConfig; - timeField: any; - timeFilter: string; - getAggQsFn: GetDataFrameAggQsFn; -}) => { - dataFrame.meta = { - ...dataFrame?.meta, - aggs: aggConfig, - aggsQs: { - [aggConfig.id]: getAggQsFn({ - query, - aggConfig, - timeField, - timeFilter, - }), - }, - }; - - if (aggConfig.aggs) { - const subAggs = aggConfig.aggs as Record<string, DataFrameAggConfig>; - for (const [key, subAgg] of Object.entries(subAggs)) { - const subAggConfig: Record<string, any> = { [key]: subAgg }; - dataFrame.meta.aggsQs[subAgg.id] = getAggQsFn({ - query, - aggConfig: subAggConfig as DataFrameAggConfig, - timeField, - timeFilter, - }); - } - } -}; - /** * Converts a data frame to index pattern spec which can be used to create an index pattern. * diff --git a/src/plugins/data/public/query/query_string/dataset_service/lib/index_pattern_type.ts b/src/plugins/data/public/query/query_string/dataset_service/lib/index_pattern_type.ts index e549a5abdf3d..1034850c41ba 100644 --- a/src/plugins/data/public/query/query_string/dataset_service/lib/index_pattern_type.ts +++ b/src/plugins/data/public/query/query_string/dataset_service/lib/index_pattern_type.ts @@ -12,6 +12,7 @@ import { Dataset, IIndexPattern, DATA_STRUCTURE_META_TYPES, + DataStructureCustomMeta, } from '../../../../../common'; import { DatasetTypeConfig } from '../types'; import { getIndexPatterns } from '../../../../services'; @@ -27,10 +28,12 @@ export const indexPatternTypeConfig: DatasetTypeConfig = { toDataset: (path) => { const pattern = path[path.length - 1]; + const patternMeta = pattern.meta as DataStructureCustomMeta; return { id: pattern.id, title: pattern.title, type: DEFAULT_DATA.SET_TYPES.INDEX_PATTERN, + timeFieldName: patternMeta?.timeFieldName, dataSource: pattern.parent ? { id: pattern.parent.id, diff --git a/src/plugins/data/public/query/query_string/dataset_service/lib/index_type.ts b/src/plugins/data/public/query/query_string/dataset_service/lib/index_type.ts index 6dfcdc46df49..fe18afbd2e76 100644 --- a/src/plugins/data/public/query/query_string/dataset_service/lib/index_type.ts +++ b/src/plugins/data/public/query/query_string/dataset_service/lib/index_type.ts @@ -5,7 +5,12 @@ import { SavedObjectsClientContract } from 'opensearch-dashboards/public'; import { map } from 'rxjs/operators'; -import { DEFAULT_DATA, DataStructure, Dataset } from '../../../../../common'; +import { + DEFAULT_DATA, + DataStructure, + DataStructureCustomMeta, + Dataset, +} from '../../../../../common'; import { DatasetTypeConfig } from '../types'; import { getSearchService, getIndexPatterns } from '../../../../services'; import { injectMetaToDataStructures } from './utils'; @@ -29,11 +34,13 @@ export const indexTypeConfig: DatasetTypeConfig = { toDataset: (path) => { const index = path[path.length - 1]; const dataSource = path.find((ds) => ds.type === 'DATA_SOURCE'); + const indexMeta = index.meta as DataStructureCustomMeta; return { id: index.id, title: index.title, type: DEFAULT_DATA.SET_TYPES.INDEX, + timeFieldName: indexMeta?.timeFieldName, dataSource: dataSource ? { id: dataSource.id, diff --git a/src/plugins/query_enhancements/common/constants.ts b/src/plugins/query_enhancements/common/constants.ts index 8e50d82db789..99a3652bd82a 100644 --- a/src/plugins/query_enhancements/common/constants.ts +++ b/src/plugins/query_enhancements/common/constants.ts @@ -8,6 +8,10 @@ export const PLUGIN_NAME = 'queryEnhancements'; export const BASE_API = '/api/enhancements'; +export const DATASET = { + S3: 'S3', +}; + export const SEARCH_STRATEGY = { PPL: 'ppl', PPL_RAW: 'pplraw', diff --git a/src/plugins/query_enhancements/common/types.ts b/src/plugins/query_enhancements/common/types.ts index c98ca0284969..7efb0d9f3d13 100644 --- a/src/plugins/query_enhancements/common/types.ts +++ b/src/plugins/query_enhancements/common/types.ts @@ -6,7 +6,18 @@ import { CoreSetup } from 'opensearch-dashboards/public'; import { Observable } from 'rxjs'; -export interface FetchDataFrameContext { +export interface QueryAggConfig { + [key: string]: { + field?: string; + fixed_interval?: string; + calendar_interval?: string; + min_doc_count?: number; + time_zone?: string; + [x: number]: string; + }; +} + +export interface EnhancedFetchContext { http: CoreSetup['http']; path: string; signal?: AbortSignal; diff --git a/src/plugins/query_enhancements/common/utils.ts b/src/plugins/query_enhancements/common/utils.ts index 208256bd4dd4..bdb8740e4e48 100644 --- a/src/plugins/query_enhancements/common/utils.ts +++ b/src/plugins/query_enhancements/common/utils.ts @@ -6,7 +6,7 @@ import { IDataFrame, Query } from 'src/plugins/data/common'; import { Observable, Subscription, from, throwError, timer } from 'rxjs'; import { catchError, concatMap, last, takeWhile, tap } from 'rxjs/operators'; -import { FetchDataFrameContext, FetchFunction } from './types'; +import { EnhancedFetchContext, FetchFunction, QueryAggConfig } from './types'; export const formatDate = (dateString: string) => { const date = new Date(dateString); @@ -133,7 +133,20 @@ export const handleDataFrameError = (response: any) => { } }; -export const fetchDataFrame = (context: FetchDataFrameContext, query: Query, df: IDataFrame) => { +export const fetch = (context: EnhancedFetchContext, query: Query, aggConfig?: QueryAggConfig) => { + const { http, path, signal } = context; + const body = JSON.stringify({ query: { ...query, format: 'jdbc' }, aggConfig }); + return from( + http.fetch({ + method: 'POST', + path, + body, + signal, + }) + ).pipe(tap(handleDataFrameError)); +}; + +export const fetchDataFrame = (context: EnhancedFetchContext, query: Query, df: IDataFrame) => { const { http, path, signal } = context; const body = JSON.stringify({ query: { ...query, format: 'jdbc' }, df }); return from( @@ -146,7 +159,7 @@ export const fetchDataFrame = (context: FetchDataFrameContext, query: Query, df: ).pipe(tap(handleDataFrameError)); }; -export const fetchDataFramePolling = (context: FetchDataFrameContext, df: IDataFrame) => { +export const fetchDataFramePolling = (context: EnhancedFetchContext, df: IDataFrame) => { const { http, path, signal } = context; const queryId = df.meta?.queryId; const dataSourceId = df.meta?.queryConfig?.dataSourceId; diff --git a/src/plugins/query_enhancements/public/datasets/s3_handler.ts b/src/plugins/query_enhancements/public/datasets/s3_handler.ts index ce18662146bc..2613f016143f 100644 --- a/src/plugins/query_enhancements/public/datasets/s3_handler.ts +++ b/src/plugins/query_enhancements/public/datasets/s3_handler.ts @@ -6,13 +6,13 @@ import { SavedObjectsClientContract } from 'opensearch-dashboards/public'; import { DataStructure, Dataset, DatasetField } from 'src/plugins/data/common'; import { DatasetTypeConfig } from 'src/plugins/data/public'; +import { DATASET } from '../../common'; const S3_ICON = 'visTable'; -const S3_ID = 'S3'; export const s3TypeConfig: DatasetTypeConfig = { - id: S3_ID, - title: S3_ID, + id: DATASET.S3, + title: DATASET.S3, meta: { icon: { type: S3_ICON }, tooltip: 'S3 Data Source', @@ -20,12 +20,12 @@ export const s3TypeConfig: DatasetTypeConfig = { toDataset: (path: DataStructure[]): Dataset => { const s3 = path[path.length - 1]; - const dataSource = path.find((ds) => ds.type === S3_ID); + const dataSource = path.find((ds) => ds.type === DATASET.S3); return { id: s3.id, title: s3.title, - type: S3_ID, + type: DATASET.S3, dataSource: dataSource ? { id: dataSource.id, @@ -42,7 +42,7 @@ export const s3TypeConfig: DatasetTypeConfig = { ): Promise<DataStructure> => { const dataStructure = path[path.length - 1]; switch (dataStructure.type) { - case S3_ID: + case DATASET.S3: return { ...dataStructure, columnHeader: 'Connections', diff --git a/src/plugins/query_enhancements/public/search/ppl_search_interceptor.ts b/src/plugins/query_enhancements/public/search/ppl_search_interceptor.ts index 3a5edd67ca7f..4618945a35f7 100644 --- a/src/plugins/query_enhancements/public/search/ppl_search_interceptor.ts +++ b/src/plugins/query_enhancements/public/search/ppl_search_interceptor.ts @@ -6,14 +6,7 @@ import { trimEnd } from 'lodash'; import { Observable, throwError } from 'rxjs'; import { catchError } from 'rxjs/operators'; -import { - DataFrameAggConfig, - getRawDataFrame, - formatTimePickerDate, - getUniqueValuesForRawAggs, - updateDataFrameMeta, - Query, -} from '../../../data/common'; +import { formatTimePickerDate, Query } from '../../../data/common'; import { DataPublicPluginStart, IOpenSearchDashboardsSearchRequest, @@ -25,10 +18,10 @@ import { import { formatDate, SEARCH_STRATEGY, - removeKeyword, API, - FetchDataFrameContext, - fetchDataFrame, + EnhancedFetchContext, + fetch, + QueryAggConfig, } from '../../common'; import { QueryEnhancementsPluginStartDependencies } from '../types'; @@ -51,109 +44,15 @@ export class PPLSearchInterceptor extends SearchInterceptor { strategy?: string ): Observable<IOpenSearchDashboardsSearchResponse> { const { id, ...searchRequest } = request; - const dfContext: FetchDataFrameContext = { + const context: EnhancedFetchContext = { http: this.deps.http, path: trimEnd(API.PPL_SEARCH), signal, }; - const { timefilter } = this.queryService; - const dateRange = timefilter.timefilter.getTime(); - const { fromDate, toDate } = formatTimePickerDate(dateRange, 'YYYY-MM-DD HH:mm:ss.SSS'); - - const getTimeFilter = (timeField: any) => { - return ` | where \`${timeField}\` >= '${formatDate( - fromDate - )}' and \`${timeField}\` <= '${formatDate(toDate)}'`; - }; - - const insertTimeFilter = (query: string, filter: string) => { - return `${query}${filter}`; - }; - - const getAggQsFn = ({ - query, - aggConfig, - timeField, - timeFilter, - }: { - query: Query; - aggConfig: DataFrameAggConfig; - timeField: any; - timeFilter: string; - }) => { - return removeKeyword(`${query.query} ${getAggString(timeField, aggConfig)} ${timeFilter}`); - }; - - const getAggString = (timeField: any, aggsConfig?: DataFrameAggConfig) => { - if (!aggsConfig) { - return ` | stats count() by span(${timeField}, ${this.aggsService.calculateAutoTimeExpression( - { - from: fromDate, - to: toDate, - mode: 'absolute', - } - )})`; - } - if (aggsConfig.date_histogram) { - return ` | stats count() by span(${timeField}, ${ - aggsConfig.date_histogram.fixed_interval ?? - aggsConfig.date_histogram.calendar_interval ?? - this.aggsService.calculateAutoTimeExpression({ - from: fromDate, - to: toDate, - mode: 'absolute', - }) - })`; - } - if (aggsConfig.avg) { - return ` | stats avg(${aggsConfig.avg.field})`; - } - if (aggsConfig.cardinality) { - return ` | dedup ${aggsConfig.cardinality.field} | stats count()`; - } - if (aggsConfig.terms) { - return ` | stats count() by ${aggsConfig.terms.field}`; - } - if (aggsConfig.id === 'other-filter') { - const uniqueConfig = getUniqueValuesForRawAggs(aggsConfig); - if ( - !uniqueConfig || - !uniqueConfig.field || - !uniqueConfig.values || - uniqueConfig.values.length === 0 - ) { - return ''; - } - - let otherQueryString = ` | stats count() by ${uniqueConfig.field}`; - uniqueConfig.values.forEach((value, index) => { - otherQueryString += ` ${index === 0 ? '| where' : 'and'} ${ - uniqueConfig.field - }<>'${value}'`; - }); - return otherQueryString; - } - }; - const dataFrame = getRawDataFrame(searchRequest); - const query = this.queryService.queryString.getQuery(); - const timeField = query.dataset?.timeFieldName; - const aggConfig = dataFrame?.meta?.aggConfig; - if (timeField && aggConfig) { - const timeFilter = getTimeFilter(timeField); - const newQuery = insertTimeFilter(query.query as string, timeFilter); - updateDataFrameMeta({ - dataFrame, - query: { ...query, query: newQuery }, - aggConfig: dataFrame?.meta?.aggConfig, - timeField, - timeFilter, - getAggQsFn: getAggQsFn.bind(this), - }); - query.query += timeFilter; - } + const query = this.buildQuery(); - return fetchDataFrame(dfContext, query, dataFrame).pipe( + return fetch(context, query, this.getAggConfig(searchRequest, query)).pipe( catchError((error) => { return throwError(error); }) @@ -163,4 +62,57 @@ export class PPLSearchInterceptor extends SearchInterceptor { public search(request: IOpenSearchDashboardsSearchRequest, options: ISearchOptions) { return this.runSearch(request, options.abortSignal, SEARCH_STRATEGY.PPL); } + + private buildQuery() { + const query: Query = this.queryService.queryString.getQuery(); + const dataset = query.dataset; + if (!dataset || !dataset.timeFieldName) return query; + const timeFilter = this.getTimeFilter(dataset.timeFieldName); + return { ...query, query: query.query + timeFilter }; + } + + private getAggConfig(request: IOpenSearchDashboardsSearchRequest, query: Query) { + const { aggs } = request.params.body; + if (!aggs || !query.dataset || !query.dataset.timeFieldName) return; + const aggsConfig: QueryAggConfig = {}; + const { fromDate, toDate } = formatTimePickerDate( + this.queryService.timefilter.timefilter.getTime(), + 'YYYY-MM-DD HH:mm:ss.SSS' + ); + Object.entries(aggs as Record<number, any>).forEach(([key, value]) => { + const aggTypeKeys = Object.keys(value); + if (aggTypeKeys.length === 0) { + return aggsConfig; + } + const aggTypeKey = aggTypeKeys[0]; + if (aggTypeKey === 'date_histogram') { + aggsConfig[aggTypeKey] = { + ...value[aggTypeKey], + }; + aggsConfig.qs = { + [key]: `${query.query} | stats count() by span(${query.dataset!.timeFieldName}, ${ + value[aggTypeKey].fixed_interval ?? + value[aggTypeKey].calendar_interval ?? + this.aggsService.calculateAutoTimeExpression({ + from: fromDate, + to: toDate, + mode: 'absolute', + }) + })`, + }; + } + }); + + return aggsConfig; + } + + private getTimeFilter(timeFieldName: string) { + const { fromDate, toDate } = formatTimePickerDate( + this.queryService.timefilter.timefilter.getTime(), + 'YYYY-MM-DD HH:mm:ss.SSS' + ); + return ` | where \`${timeFieldName}\` >= '${formatDate( + fromDate + )}' and \`${timeFieldName}\` <= '${formatDate(toDate)}'`; + } } diff --git a/src/plugins/query_enhancements/public/search/sql_search_interceptor.ts b/src/plugins/query_enhancements/public/search/sql_search_interceptor.ts index 3aba5cca7fad..69e4e14af562 100644 --- a/src/plugins/query_enhancements/public/search/sql_search_interceptor.ts +++ b/src/plugins/query_enhancements/public/search/sql_search_interceptor.ts @@ -5,9 +5,8 @@ import { trimEnd } from 'lodash'; import { Observable, throwError } from 'rxjs'; -import { i18n } from '@osd/i18n'; -import { concatMap, map } from 'rxjs/operators'; -import { DATA_FRAME_TYPES, getRawDataFrame } from '../../../data/common'; +import { catchError } from 'rxjs/operators'; +import { Query } from '../../../data/common'; import { DataPublicPluginStart, IOpenSearchDashboardsSearchRequest, @@ -16,28 +15,17 @@ import { SearchInterceptor, SearchInterceptorDeps, } from '../../../data/public'; -import { - API, - DataFramePolling, - FetchDataFrameContext, - SEARCH_STRATEGY, - fetchDataFrame, - fetchDataFramePolling, -} from '../../common'; +import { API, DATASET, EnhancedFetchContext, SEARCH_STRATEGY, fetch } from '../../common'; import { QueryEnhancementsPluginStartDependencies } from '../types'; export class SQLSearchInterceptor extends SearchInterceptor { protected queryService!: DataPublicPluginStart['query']; - protected aggsService!: DataPublicPluginStart['search']['aggs']; - protected uiService!: DataPublicPluginStart['ui']; constructor(deps: SearchInterceptorDeps) { super(deps); deps.startServices.then(([coreStart, depsStart]) => { this.queryService = (depsStart as QueryEnhancementsPluginStartDependencies).data.query; - this.aggsService = (depsStart as QueryEnhancementsPluginStartDependencies).data.search.aggs; - this.uiService = (depsStart as QueryEnhancementsPluginStartDependencies).data.ui; }); } @@ -47,115 +35,47 @@ export class SQLSearchInterceptor extends SearchInterceptor { strategy?: string ): Observable<IOpenSearchDashboardsSearchResponse> { const { id, ...searchRequest } = request; - const dfContext: FetchDataFrameContext = { + const context: EnhancedFetchContext = { http: this.deps.http, path: trimEnd(API.SQL_SEARCH), signal, }; - const dataFrame = getRawDataFrame(searchRequest); - - return fetchDataFrame(dfContext, this.queryService.queryString.getQuery(), dataFrame); - } - - protected runSearchAsync( - request: IOpenSearchDashboardsSearchRequest, - signal?: AbortSignal, - strategy?: string - ): Observable<IOpenSearchDashboardsSearchResponse> { - const { id, ...searchRequest } = request; - const path = trimEnd(API.SQL_ASYNC_SEARCH); - const dfContext: FetchDataFrameContext = { - http: this.deps.http, - path, - signal, - }; - - const dataFrame = getRawDataFrame(searchRequest); - const query = this.queryService.queryString.getQuery(); + const query = this.buildQuery(strategy); - const dataSourceRef = query.dataset - ? { - dataSourceId: query.dataset.dataSource?.id, - dataSourceName: query.dataset.dataSource?.title, - } - : {}; - - dataFrame.meta = { - ...dataFrame?.meta, - queryConfig: { - ...dataFrame?.meta.queryConfig, - ...dataSourceRef, - }, - sessionId: dataSourceRef - ? this.uiService.Settings.getUserQuerySessionId(dataSourceRef.dataSourceName!) - : {}, - }; - - const onPollingSuccess = (pollingResult: any) => { - if (pollingResult && pollingResult.body.meta.status === 'SUCCESS') { - return false; - } - if (pollingResult && pollingResult.body.meta.status === 'FAILED') { - const jsError = new Error(pollingResult.data.error.response); - this.deps.toasts.addError(jsError, { - title: i18n.translate('queryEnhancements.sqlQueryError', { - defaultMessage: 'Could not complete the SQL async query', - }), - toastMessage: pollingResult.data.error.response, - }); - return false; - } - - this.deps.toasts.addInfo({ - title: i18n.translate('queryEnhancements.sqlQueryPolling', { - defaultMessage: `Polling query job results. Status: ${pollingResult.body.meta.status}`, - }), - }); - - return true; - }; - - const onPollingError = (error: Error) => { - throw new Error(error.message); - }; - - this.deps.toasts.addInfo({ - title: i18n.translate('queryEnhancements.sqlQueryInfo', { - defaultMessage: 'Starting query job...', - }), - }); - return fetchDataFrame(dfContext, query, dataFrame).pipe( - concatMap((jobResponse) => { - const df = jobResponse.body; - if (dataSourceRef?.dataSourceName && df?.meta?.sessionId) { - this.uiService.Settings.setUserQuerySessionId( - dataSourceRef.dataSourceName, - df?.meta?.sessionId - ); - } - const dataFramePolling = new DataFramePolling<any, any>( - () => fetchDataFramePolling(dfContext, df), - 5000, - onPollingSuccess, - onPollingError - ); - return dataFramePolling.fetch().pipe( - map(() => { - const dfPolling = dataFramePolling.data; - dfPolling.type = DATA_FRAME_TYPES.DEFAULT; - return dfPolling; - }) - ); + return fetch(context, query).pipe( + catchError((error) => { + return throwError(error); }) ); } public search(request: IOpenSearchDashboardsSearchRequest, options: ISearchOptions) { const dataset = this.queryService.queryString.getQuery().dataset; - if (dataset?.type === 'S3') { - return this.runSearchAsync(request, options.abortSignal, SEARCH_STRATEGY.SQL_ASYNC); - } - return this.runSearch(request, options.abortSignal, SEARCH_STRATEGY.SQL); + const strategy = dataset?.type === DATASET.S3 ? SEARCH_STRATEGY.SQL_ASYNC : SEARCH_STRATEGY.SQL; + return this.runSearch(request, options.abortSignal, strategy); + } + + private buildQuery(strategy?: string): Query { + const query: Query = this.queryService.queryString.getQuery(); + // TODO: MQL keeping here for S3 + // const dataset = query.dataset; + + // if (strategy === SEARCH_STRATEGY.SQL_ASYNC && dataset?.dataSource) { + // const sessionId = this.queryService.queryString + // .getLanguageService() + // .getUserQuerySessionId(dataset.dataSource.title); + // if (sessionId) { + // return { + // ...query, + // meta: { + // ...query.meta, + // sessionId, + // }, + // }; + // } + // } + + return query; } } diff --git a/src/plugins/query_enhancements/server/routes/index.ts b/src/plugins/query_enhancements/server/routes/index.ts index a17c4f2294bd..a3673946114d 100644 --- a/src/plugins/query_enhancements/server/routes/index.ts +++ b/src/plugins/query_enhancements/server/routes/index.ts @@ -16,6 +16,40 @@ import { API, SEARCH_STRATEGY } from '../../common'; import { registerQueryAssistRoutes } from './query_assist'; import { registerDataSourceConnectionsRoutes } from './data_source_connection'; +/** + * Defines a route for a specific search strategy. + * + * @experimental This function is experimental and might change in future releases. + * + * @param logger - The logger instance. + * @param router - The router instance. + * @param searchStrategies - The available search strategies. + * @param searchStrategyId - The ID of the search strategy to use. + * + * @example + * API Request Body: + * ```json + * { + * "query": { + * "query": "SELECT * FROM my_index", + * "language": "sql", + * "dataset": { + * "id": "my_dataset_id", + * "title": "My Dataset" + * }, + * "format": "json" + * }, + * @experimental + * "aggConfig": { + * // Optional aggregation configuration + * }, + * @deprecated + * "df": { + * // Optional data frame configuration + * } + * } + * ``` + */ function defineRoute( logger: Logger, router: IRouter, @@ -37,6 +71,7 @@ function defineRoute( dataset: schema.nullable(schema.object({}, { unknowns: 'allow' })), format: schema.string(), }), + aggConfig: schema.nullable(schema.object({}, { unknowns: 'allow' })), df: schema.nullable(schema.object({}, { unknowns: 'allow' })), }), }, @@ -125,6 +160,16 @@ function defineRoute( ); } +/** + * Defines routes for various search strategies and registers additional routes. + * + * @experimental This function is experimental and might change in future releases. + * + * @param logger - The logger instance. + * @param router - The router instance. + * @param client - The client instance. + * @param searchStrategies - The available search strategies. + */ export function defineRoutes( logger: Logger, router: IRouter, diff --git a/src/plugins/query_enhancements/server/search/ppl_search_strategy.ts b/src/plugins/query_enhancements/server/search/ppl_search_strategy.ts index f426b2d9980f..ae2192d4155f 100644 --- a/src/plugins/query_enhancements/server/search/ppl_search_strategy.ts +++ b/src/plugins/query_enhancements/server/search/ppl_search_strategy.ts @@ -5,7 +5,7 @@ import { SharedGlobalConfig, Logger, ILegacyClusterClient } from 'opensearch-dashboards/server'; import { Observable } from 'rxjs'; -import { ISearchStrategy, getDefaultSearchParams, SearchUsage } from '../../../data/server'; +import { ISearchStrategy, SearchUsage } from '../../../data/server'; import { DATA_FRAME_TYPES, IDataFrameError, @@ -17,6 +17,7 @@ import { } from '../../../data/common'; import { getFields } from '../../common/utils'; import { Facet } from '../utils'; +import { QueryAggConfig } from '../../common'; export const pplSearchStrategyProvider = ( config$: Observable<SharedGlobalConfig>, @@ -32,43 +33,11 @@ export const pplSearchStrategyProvider = ( shimResponse: true, }); - const parseRequest = (query: string) => { - const pipeMap = new Map<string, string>(); - const pipeArray = query.split('|'); - pipeArray.forEach((pipe, index) => { - const splitChar = index === 0 ? '=' : ' '; - const split = pipe.trim().split(splitChar); - const key = split[0]; - const value = pipe.replace(index === 0 ? `${key}=` : key, '').trim(); - pipeMap.set(key, value); - }); - - const source = pipeMap.get('source'); - - const filters = pipeMap.get('where'); - - const stats = pipeMap.get('stats'); - const aggsQuery = stats - ? `source=${source} ${filters ? `| where ${filters}` : ''} | stats ${stats}` - : undefined; - - return { - aggs: aggsQuery, - }; - }; - return { search: async (context, request: any, options) => { - const uiSettingsClient = await context.core.uiSettings.client; - - const { dataFrameHydrationStrategy, ...defaultParams } = await getDefaultSearchParams( - uiSettingsClient - ); - try { const query: Query = request.body.query; - const { df } = request.body; - + const aggConfig: QueryAggConfig | undefined = request.body.aggConfig; const rawResponse: any = await pplFacet.describeQuery(context, request); if (!rawResponse.success) { @@ -82,7 +51,7 @@ export const pplSearchStrategyProvider = ( const dataFrame = createDataFrame({ name: query.dataset?.id, schema: rawResponse.data.schema, - meta: df?.meta, + meta: aggConfig, fields: getFields(rawResponse), }); @@ -90,10 +59,9 @@ export const pplSearchStrategyProvider = ( if (usage) usage.trackSuccess(rawResponse.took); - if (dataFrame?.meta?.aggsQs) { - for (const [key, aggQueryString] of Object.entries(dataFrame?.meta?.aggsQs)) { - const aggRequest = parseRequest(aggQueryString as string); - request.body.query = aggRequest.aggs; + if (aggConfig) { + for (const [key, aggQueryString] of Object.entries(aggConfig.qs)) { + request.body.query.query = aggQueryString; const rawAggs: any = await pplFacet.describeQuery(context, request); (dataFrame as IDataFrameWithAggs).aggs = {}; (dataFrame as IDataFrameWithAggs).aggs[key] = rawAggs.data.datarows?.map((hit: any) => { diff --git a/src/plugins/query_enhancements/server/search/sql_search_strategy.test.ts b/src/plugins/query_enhancements/server/search/sql_search_strategy.test.ts index 9742ec04b969..822534879040 100644 --- a/src/plugins/query_enhancements/server/search/sql_search_strategy.test.ts +++ b/src/plugins/query_enhancements/server/search/sql_search_strategy.test.ts @@ -17,8 +17,14 @@ import { IDataFrameError, IDataFrameResponse, IOpenSearchDashboardsSearchRequest, + Query, } from '../../../data/common'; import * as facet from '../utils/facet'; +import * as utils from '../../common/utils'; + +jest.mock('../../common/utils', () => ({ + getFields: jest.fn(), +})); describe('sqlSearchStrategyProvider', () => { let config$: Observable<SharedGlobalConfig>; @@ -64,12 +70,16 @@ describe('sqlSearchStrategyProvider', () => { describeQuery: jest.fn().mockResolvedValue(mockResponse), } as unknown) as facet.Facet; jest.spyOn(facet, 'Facet').mockImplementation(() => mockFacet); + (utils.getFields as jest.Mock).mockReturnValue([ + { name: 'field1', type: 'long' }, + { name: 'field2', type: 'text' }, + ]); const strategy = sqlSearchStrategyProvider(config$, logger, client, usage); const result = await strategy.search( emptyRequestHandlerContext, ({ - body: { query: { qs: 'SELECT * FROM table' }, df: { name: 'table' } }, + body: { query: { query: 'SELECT * FROM table', dataset: { id: 'test-dataset' } } }, } as unknown) as IOpenSearchDashboardsSearchRequest<unknown>, {} ); @@ -77,15 +87,19 @@ describe('sqlSearchStrategyProvider', () => { expect(result).toEqual({ type: DATA_FRAME_TYPES.DEFAULT, body: { - name: 'table', + name: 'test-dataset', fields: [ - { name: 'field1', type: 'long', values: [1, 2] }, - { name: 'field2', type: 'text', values: ['value1', 'value2'] }, + { name: 'field1', type: 'long', values: [] }, + { name: 'field2', type: 'text', values: [] }, + ], + schema: [ + { name: 'field1', type: 'long', values: [] }, + { name: 'field2', type: 'text', values: [] }, ], size: 2, }, took: 100, - } as IDataFrameResponse); + }); expect(usage.trackSuccess).toHaveBeenCalledWith(100); }); @@ -104,16 +118,16 @@ describe('sqlSearchStrategyProvider', () => { const result = await strategy.search( emptyRequestHandlerContext, ({ - body: { query: { qs: 'SELECT * FROM table' } }, + body: { query: { query: 'SELECT * FROM table' } }, } as unknown) as IOpenSearchDashboardsSearchRequest<unknown>, {} ); - expect(result).toEqual(({ + expect(result).toEqual({ type: DATA_FRAME_TYPES.ERROR, body: { error: { cause: 'Query failed' } }, took: 50, - } as unknown) as IDataFrameError); + } as IDataFrameError); }); it('should handle exceptions', async () => { @@ -128,7 +142,7 @@ describe('sqlSearchStrategyProvider', () => { strategy.search( emptyRequestHandlerContext, ({ - body: { query: { qs: 'SELECT * FROM table' } }, + body: { query: { query: 'SELECT * FROM table' } }, } as unknown) as IOpenSearchDashboardsSearchRequest<unknown>, {} ) @@ -136,4 +150,53 @@ describe('sqlSearchStrategyProvider', () => { expect(logger.error).toHaveBeenCalledWith(`sqlSearchStrategy: ${mockError.message}`); expect(usage.trackError).toHaveBeenCalled(); }); + + it('should handle empty search response', async () => { + const mockResponse = { + success: true, + data: { + schema: [ + { name: 'field1', type: 'long' }, + { name: 'field2', type: 'text' }, + ], + datarows: [], + }, + took: 10, + }; + const mockFacet = ({ + describeQuery: jest.fn().mockResolvedValue(mockResponse), + } as unknown) as facet.Facet; + jest.spyOn(facet, 'Facet').mockImplementation(() => mockFacet); + (utils.getFields as jest.Mock).mockReturnValue([ + { name: 'field1', type: 'long' }, + { name: 'field2', type: 'text' }, + ]); + + const strategy = sqlSearchStrategyProvider(config$, logger, client, usage); + const result = await strategy.search( + emptyRequestHandlerContext, + ({ + body: { query: { query: 'SELECT * FROM empty_table', dataset: { id: 'empty-dataset' } } }, + } as unknown) as IOpenSearchDashboardsSearchRequest<unknown>, + {} + ); + + expect(result).toEqual({ + type: DATA_FRAME_TYPES.DEFAULT, + body: { + name: 'empty-dataset', + fields: [ + { name: 'field1', type: 'long', values: [] }, + { name: 'field2', type: 'text', values: [] }, + ], + schema: [ + { name: 'field1', type: 'long', values: [] }, + { name: 'field2', type: 'text', values: [] }, + ], + size: 0, + }, + took: 10, + }); + expect(usage.trackSuccess).toHaveBeenCalledWith(10); + }); }); diff --git a/src/plugins/query_enhancements/server/search/sql_search_strategy.ts b/src/plugins/query_enhancements/server/search/sql_search_strategy.ts index 263757bbfabe..ffc7f0a6f0cd 100644 --- a/src/plugins/query_enhancements/server/search/sql_search_strategy.ts +++ b/src/plugins/query_enhancements/server/search/sql_search_strategy.ts @@ -11,22 +11,30 @@ import { IDataFrameError, IDataFrameResponse, IOpenSearchDashboardsSearchRequest, - PartialDataFrame, + Query, createDataFrame, } from '../../../data/common'; +import { getFields } from '../../common/utils'; import { Facet } from '../utils'; export const sqlSearchStrategyProvider = ( - _config$: Observable<SharedGlobalConfig>, + config$: Observable<SharedGlobalConfig>, logger: Logger, client: ILegacyClusterClient, usage?: SearchUsage ): ISearchStrategy<IOpenSearchDashboardsSearchRequest, IDataFrameResponse> => { - const sqlFacet = new Facet({ client, logger, endpoint: 'enhancements.sqlQuery' }); + const sqlFacet = new Facet({ + client, + logger, + endpoint: 'enhancements.sqlQuery', + useJobs: false, + shimResponse: true, + }); return { - search: async (context, request: any, _options) => { + search: async (context, request: any, options) => { try { + const query: Query = request.body.query; const rawResponse: any = await sqlFacet.describeQuery(context, request); if (!rawResponse.success) { @@ -37,16 +45,13 @@ export const sqlSearchStrategyProvider = ( } as IDataFrameError; } - const partial: PartialDataFrame = { - ...request.body.df, - fields: rawResponse.data?.schema || [], - }; - const dataFrame = createDataFrame(partial); - dataFrame.fields?.forEach((field, index) => { - field.values = rawResponse.data.datarows.map((row: any) => row[index]); + const dataFrame = createDataFrame({ + name: query.dataset?.id, + schema: rawResponse.data.schema, + fields: getFields(rawResponse), }); - dataFrame.size = rawResponse.data.datarows?.length || 0; + dataFrame.size = rawResponse.data.datarows.length; if (usage) usage.trackSuccess(rawResponse.took);