Skip to content

Commit

Permalink
[ML] AIOps: Refactors function argument structure for Log Rate Analys…
Browse files Browse the repository at this point in the history
…is. (elastic#187669)

## Summary

Refactors the function argument structure of code used on Kibana server
for Log Rate Analysis from individual arguments to single objects that
contain all options. The options structure looks like this:

```
{
  // "meta" args like dependencies, general callbacks etc. on the outer most level
  esClient,
  abortSignal,
  ...
  // within "arguments" we pass in actual options that necessary for the logic of the function
  arguments: {
    start,
    end,
    query,
    fields,
    ...
  }
}
```

The main benefit is that code where these functions are used become
easier to read. Instead of the strict order of args that sometimes
included `undefined` or just a value where it's hard to guess for which
argument it's used for, this enforces to have the names of options show
up in the consuming code. Here's an example:

Before:

```
await fetchHistogramsForFields(
                client,
                requestBody.index,
                histogramQuery,
                [
                  {
                    fieldName: requestBody.timeFieldName,
                    type: KBN_FIELD_TYPES.DATE,
                    interval: overallTimeSeries.interval,
                    min: overallTimeSeries.stats[0],
                    max: overallTimeSeries.stats[1],
                  },
                ],
                -1,
                undefined,
                abortSignal,
                stateHandler.sampleProbability(),
                RANDOM_SAMPLER_SEED
              )
```

After:

```
                (await fetchHistogramsForFields({
                  esClient,
                  abortSignal,
                  arguments: {
                    indexPattern: requestBody.index,
                    query: histogramQuery,
                    fields: [
                      {
                        fieldName: requestBody.timeFieldName,
                        type: KBN_FIELD_TYPES.DATE,
                        interval: overallTimeSeries.interval,
                        min: overallTimeSeries.stats[0],
                        max: overallTimeSeries.stats[1],
                      },
                    ],
                    samplerShardSize: -1,
                    randomSamplerProbability: stateHandler.sampleProbability(),
                    randomSamplerSeed: RANDOM_SAMPLER_SEED,
                  },
                })) as [NumericChartData]
```


### Checklist

- [ ] [Unit or functional
tests](https://www.elastic.co/guide/en/kibana/master/development-tests.html)
were updated or added to match the most common scenarios
- [x] This was checked for breaking API changes and was [labeled
appropriately](https://www.elastic.co/guide/en/kibana/master/contributing.html#kibana-release-notes-process)
  • Loading branch information
walterra authored Jul 8, 2024
1 parent f99f834 commit 94cab93
Show file tree
Hide file tree
Showing 20 changed files with 375 additions and 269 deletions.
2 changes: 1 addition & 1 deletion x-pack/packages/ml/agg_utils/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
*/

export { buildSamplerAggregation } from './src/build_sampler_aggregation';
export { fetchAggIntervals } from './src/fetch_agg_intervals';
export { fetchAggIntervals, type FetchAggIntervalsParams } from './src/fetch_agg_intervals';
export { fetchHistogramsForFields } from './src/fetch_histograms_for_fields';
export { DEFAULT_SAMPLER_SHARD_SIZE } from './src/field_histograms';
export { getSamplerAggregationsResponsePath } from './src/get_sampler_aggregations_response_path';
Expand Down
63 changes: 42 additions & 21 deletions x-pack/packages/ml/agg_utils/src/fetch_agg_intervals.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,30 +22,51 @@ import type { HistogramField, NumericColumnStatsMap } from './types';
const MAX_CHART_COLUMNS = 20;

/**
* Returns aggregation intervals for the supplied document fields.
* Interface for the parameters required to fetch aggregation intervals.
*/
export interface FetchAggIntervalsParams {
/** The Elasticsearch client to use for the query. */
esClient: ElasticsearchClient;
/** An optional abort signal to cancel the request. */
abortSignal?: AbortSignal;
/** The arguments for the aggregation query. */
arguments: {
/** The index pattern to query against. */
indexPattern: string;
/** The query to filter documents. */
query: estypes.QueryDslQueryContainer;
/** The fields to aggregate on. */
fields: HistogramField[];
/** The size of the sampler shard. */
samplerShardSize: number;
/** Optional runtime mappings for the query. */
runtimeMappings?: estypes.MappingRuntimeFields;
/** Optional probability for random sampling. */
randomSamplerProbability?: number;
/** Optional seed for random sampling. */
randomSamplerSeed?: number;
};
}
/**
* Asynchronously fetches aggregation intervals from an Elasticsearch client.
*
* @param client - The Elasticsearch client.
* @param indexPattern - The index pattern to search.
* @param query - The query to filter documents.
* @param fields - An array of field definitions for which aggregation intervals are requested.
* @param samplerShardSize - The shard size parameter for sampling aggregations. A value less than 1 indicates no sampling.
* @param runtimeMappings - Optional runtime mappings to apply.
* @param abortSignal - Optional AbortSignal for canceling the request.
* @param randomSamplerProbability - Optional probability value for random sampling.
* @param randomSamplerSeed - Optional seed value for random sampling.
* @returns A promise that resolves to a map of aggregation intervals for the specified fields.
* @param params - The parameters for fetching aggregation intervals.
* @returns A promise that resolves to a map of numeric column statistics.
*/
export const fetchAggIntervals = async (
client: ElasticsearchClient,
indexPattern: string,
query: estypes.QueryDslQueryContainer,
fields: HistogramField[],
samplerShardSize: number,
runtimeMappings?: estypes.MappingRuntimeFields,
abortSignal?: AbortSignal,
randomSamplerProbability?: number,
randomSamplerSeed?: number
params: FetchAggIntervalsParams
): Promise<NumericColumnStatsMap> => {
const { esClient, abortSignal, arguments: args } = params;
const {
indexPattern,
query,
fields,
samplerShardSize,
runtimeMappings,
randomSamplerProbability,
randomSamplerSeed,
} = args;

if (
samplerShardSize >= 1 &&
randomSamplerProbability !== undefined &&
Expand Down Expand Up @@ -77,7 +98,7 @@ export const fetchAggIntervals = async (
seed: randomSamplerSeed,
});

const body = await client.search(
const body = await esClient.search(
{
index: indexPattern,
size: 0,
Expand Down
86 changes: 52 additions & 34 deletions x-pack/packages/ml/agg_utils/src/fetch_histograms_for_fields.ts
Original file line number Diff line number Diff line change
Expand Up @@ -167,32 +167,48 @@ export type FieldsForHistograms = Array<
| UnsupportedHistogramField
>;

interface FetchHistogramsForFieldsParams {
/** The Elasticsearch client to use for the query. */
esClient: ElasticsearchClient;
/** An optional abort signal to cancel the request. */
abortSignal?: AbortSignal;
/** The arguments for the aggregation query. */
arguments: {
/** The index pattern to query against. */
indexPattern: string;
/** The query to filter documents. */
query: any;
/** The fields for which histograms are to be fetched. */
fields: FieldsForHistograms;
/** The size of the sampler shard. */
samplerShardSize: number;
/** Optional runtime mappings for the query. */
runtimeMappings?: estypes.MappingRuntimeFields;
/** Optional probability for random sampling. */
randomSamplerProbability?: number;
/** Optional seed for random sampling. */
randomSamplerSeed?: number;
};
}

/**
* Fetches data to be used in mini histogram charts. Supports auto-identifying
* the histogram interval and min/max values.
* Asynchronously fetches histograms for specified fields from an Elasticsearch client.
*
* @param client Elasticsearch Client
* @param indexPattern index pattern to be queried
* @param query Elasticsearch query
* @param fields the fields the histograms should be generated for
* @param samplerShardSize shard_size parameter of the sampler aggregation
* @param runtimeMappings optional runtime mappings
* @param abortSignal optional abort signal
* @param randomSamplerProbability optional random sampler probability
* @param randomSamplerSeed optional random sampler seed
* @returns an array of histogram data for each supplied field
* @param params The parameters for fetching histograms.
* @returns A promise that resolves with the fetched histograms.
*/
export const fetchHistogramsForFields = async (
client: ElasticsearchClient,
indexPattern: string,
query: any,
fields: FieldsForHistograms,
samplerShardSize: number,
runtimeMappings?: estypes.MappingRuntimeFields,
abortSignal?: AbortSignal,
randomSamplerProbability?: number,
randomSamplerSeed?: number
) => {
export const fetchHistogramsForFields = async (params: FetchHistogramsForFieldsParams) => {
const { esClient, abortSignal, arguments: args } = params;
const {
indexPattern,
query,
fields,
samplerShardSize,
runtimeMappings,
randomSamplerProbability,
randomSamplerSeed,
} = args;

if (
samplerShardSize >= 1 &&
randomSamplerProbability !== undefined &&
Expand All @@ -202,17 +218,19 @@ export const fetchHistogramsForFields = async (
}

const aggIntervals = {
...(await fetchAggIntervals(
client,
indexPattern,
query,
fields.filter((f) => !isNumericHistogramFieldWithColumnStats(f)),
samplerShardSize,
runtimeMappings,
...(await fetchAggIntervals({
esClient,
abortSignal,
randomSamplerProbability,
randomSamplerSeed
)),
arguments: {
indexPattern,
query,
fields: fields.filter((f) => !isNumericHistogramFieldWithColumnStats(f)),
samplerShardSize,
runtimeMappings,
randomSamplerProbability,
randomSamplerSeed,
},
})),
...fields.filter(isNumericHistogramFieldWithColumnStats).reduce((p, field) => {
const { interval, min, max, fieldName } = field;
p[stringHash(fieldName)] = { interval, min, max };
Expand Down Expand Up @@ -259,7 +277,7 @@ export const fetchHistogramsForFields = async (
seed: randomSamplerSeed,
});

const body = await client.search(
const body = await esClient.search(
{
index: indexPattern,
size: 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,10 @@ export const fetchCategories = async (
esClient: ElasticsearchClient,
params: AiopsLogRateAnalysisSchema,
fieldNames: string[],
logger: Logger,
logger?: Logger,
// The default value of 1 means no sampling will be used
sampleProbability: number = 1,
emitError: (m: string) => void,
emitError?: (m: string) => void,
abortSignal?: AbortSignal
): Promise<FetchCategoriesResponse[]> => {
const randomSamplerWrapper = createRandomSamplerWrapper({
Expand All @@ -122,14 +122,19 @@ export const fetchCategories = async (

function reportError(fieldName: string, error: unknown) {
if (!isRequestAbortedError(error)) {
logger.error(
`Failed to fetch category aggregation for fieldName "${fieldName}", got: \n${JSON.stringify(
error,
null,
2
)}`
);
emitError(`Failed to fetch category aggregation for fieldName "${fieldName}".`);
if (logger) {
logger.error(
`Failed to fetch category aggregation for fieldName "${fieldName}", got: \n${JSON.stringify(
error,
null,
2
)}`
);
}

if (emitError) {
emitError(`Failed to fetch category aggregation for fieldName "${fieldName}".`);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ export const fetchCategoryCounts = async (
categories: FetchCategoriesResponse,
from: number | undefined,
to: number | undefined,
logger: Logger,
emitError: (m: string) => void,
logger?: Logger,
emitError?: (m: string) => void,
abortSignal?: AbortSignal
): Promise<FetchCategoriesResponse> => {
const updatedCategories = cloneDeep(categories);
Expand All @@ -101,14 +101,19 @@ export const fetchCategoryCounts = async (
);
} catch (error) {
if (!isRequestAbortedError(error)) {
logger.error(
`Failed to fetch category counts for field name "${fieldName}", got: \n${JSON.stringify(
error,
null,
2
)}`
);
emitError(`Failed to fetch category counts for field name "${fieldName}".`);
if (logger) {
logger.error(
`Failed to fetch category counts for field name "${fieldName}", got: \n${JSON.stringify(
error,
null,
2
)}`
);
}

if (emitError) {
emitError(`Failed to fetch category counts for field name "${fieldName}".`);
}
}
return updatedCategories;
}
Expand All @@ -118,14 +123,19 @@ export const fetchCategoryCounts = async (
updatedCategories.categories[index].count =
(resp.hits.total as estypes.SearchTotalHits).value ?? 0;
} else {
logger.error(
`Failed to fetch category count for category "${
updatedCategories.categories[index].key
}", got: \n${JSON.stringify(resp, null, 2)}`
);
emitError(
`Failed to fetch category count for category "${updatedCategories.categories[index].key}".`
);
if (logger) {
logger.error(
`Failed to fetch category count for category "${
updatedCategories.categories[index].key
}", got: \n${JSON.stringify(resp, null, 2)}`
);
}

if (emitError) {
emitError(
`Failed to fetch category count for category "${updatedCategories.categories[index].key}".`
);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,20 +80,38 @@ export function getFrequentItemSetsAggFields(significantItems: SignificantItem[]
);
}

export async function fetchFrequentItemSets(
client: ElasticsearchClient,
index: string,
searchQuery: estypes.QueryDslQueryContainer,
significantItems: SignificantItem[],
timeFieldName: string,
deviationMin: number,
deviationMax: number,
logger: Logger,
// The default value of 1 means no sampling will be used
sampleProbability: number = 1,
emitError: (m: string) => void,
abortSignal?: AbortSignal
): Promise<FetchFrequentItemSetsResponse> {
export async function fetchFrequentItemSets({
esClient,
abortSignal,
emitError,
logger,
arguments: args,
}: {
esClient: ElasticsearchClient;
emitError: (m: string) => void;
abortSignal?: AbortSignal;
logger: Logger;
arguments: {
index: string;
searchQuery: estypes.QueryDslQueryContainer;
significantItems: SignificantItem[];
timeFieldName: string;
deviationMin: number;
deviationMax: number;
sampleProbability?: number;
};
}): Promise<FetchFrequentItemSetsResponse> {
const {
index,
searchQuery,
significantItems,
timeFieldName,
deviationMin,
deviationMax,
// The default value of 1 means no sampling will be used
sampleProbability = 1,
} = args;

// Sort significant terms by ascending p-value, necessary to apply the field limit correctly.
const sortedSignificantItems = significantItems.slice().sort((a, b) => {
return (a.pValue ?? 0) - (b.pValue ?? 0);
Expand Down Expand Up @@ -140,7 +158,7 @@ export async function fetchFrequentItemSets(
track_total_hits: true,
};

const body = await client.search<
const body = await esClient.search<
unknown,
{ sample: FrequentItemSetsAggregation } | FrequentItemSetsAggregation
>(
Expand Down
Loading

0 comments on commit 94cab93

Please sign in to comment.