From f9dda01515b4126c6498133563e420cf2b580cf3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B8ren=20Louv-Jansen?= Date: Tue, 3 Dec 2024 14:48:19 +0100 Subject: [PATCH] [Obs AI Assistant] Perform index creation at startup (#201362) Currently the knowledge base creates index assets (index templates, index components) lazily when the user interacts with the assistant. This prevents running the semantic text migrations (added in https://github.com/elastic/kibana/pull/186499) when Kibana starts because the mappings have not yet been updated. Additionally, this PR also increases `min_number_of_allocations` to 1 to ensure at least one ML node is available at all times. (cherry picked from commit b217f1acbdce4d9c0288c87e9afa470038cf6557) --- .../server/plugin.ts | 5 +- .../server/service/client/index.test.ts | 8 +- .../server/service/client/index.ts | 32 +++- .../server/service/index.ts | 156 ++++-------------- .../server/service/inference_endpoint.ts | 97 ++++++++--- .../server/service/kb_component_template.ts | 4 - .../service/knowledge_base_service/index.ts | 59 +------ .../setup_conversation_and_kb_index_assets.ts | 108 ++++++++++++ ...ter_migrate_knowledge_base_entries_task.ts | 99 +++++++---- 9 files changed, 316 insertions(+), 252 deletions(-) create mode 100644 x-pack/plugins/observability_solution/observability_ai_assistant/server/service/setup_conversation_and_kb_index_assets.ts diff --git a/x-pack/plugins/observability_solution/observability_ai_assistant/server/plugin.ts b/x-pack/plugins/observability_solution/observability_ai_assistant/server/plugin.ts index f693fa53c06cc..6c6d4c00f7188 100644 --- a/x-pack/plugins/observability_solution/observability_ai_assistant/server/plugin.ts +++ b/x-pack/plugins/observability_solution/observability_ai_assistant/server/plugin.ts @@ -123,8 +123,9 @@ export class ObservabilityAIAssistantPlugin core, taskManager: plugins.taskManager, logger: this.logger, - }).catch((error) => { - this.logger.error(`Failed to register migrate knowledge base entries task: ${error}`); + config: this.config, + }).catch((e) => { + this.logger.error(`Knowledge base migration was not successfully: ${e.message}`); }); service.register(registerFunctions); diff --git a/x-pack/plugins/observability_solution/observability_ai_assistant/server/service/client/index.test.ts b/x-pack/plugins/observability_solution/observability_ai_assistant/server/service/client/index.test.ts index 8da2a0d843b11..f6aa0dfab2726 100644 --- a/x-pack/plugins/observability_solution/observability_ai_assistant/server/service/client/index.test.ts +++ b/x-pack/plugins/observability_solution/observability_ai_assistant/server/service/client/index.test.ts @@ -5,7 +5,7 @@ * 2.0. */ import type { ActionsClient } from '@kbn/actions-plugin/server/actions_client'; -import type { ElasticsearchClient, IUiSettingsClient, Logger } from '@kbn/core/server'; +import type { CoreSetup, ElasticsearchClient, IUiSettingsClient, Logger } from '@kbn/core/server'; import type { DeeplyMockedKeys } from '@kbn/utility-types-jest'; import { waitFor } from '@testing-library/react'; import { last, merge, repeat } from 'lodash'; @@ -27,7 +27,9 @@ import { CONTEXT_FUNCTION_NAME } from '../../functions/context'; import { ChatFunctionClient } from '../chat_function_client'; import type { KnowledgeBaseService } from '../knowledge_base_service'; import { observableIntoStream } from '../util/observable_into_stream'; -import { CreateChatCompletionResponseChunk } from './adapters/process_openai_stream'; +import type { CreateChatCompletionResponseChunk } from './adapters/process_openai_stream'; +import type { ObservabilityAIAssistantConfig } from '../../config'; +import type { ObservabilityAIAssistantPluginStartDependencies } from '../../types'; type ChunkDelta = CreateChatCompletionResponseChunk['choices'][number]['delta']; @@ -177,6 +179,8 @@ describe('Observability AI Assistant client', () => { functionClientMock.getAdhocInstructions.mockReturnValue([]); return new ObservabilityAIAssistantClient({ + config: {} as ObservabilityAIAssistantConfig, + core: {} as CoreSetup, actionsClient: actionsClientMock, uiSettingsClient: uiSettingsClientMock, esClient: { diff --git a/x-pack/plugins/observability_solution/observability_ai_assistant/server/service/client/index.ts b/x-pack/plugins/observability_solution/observability_ai_assistant/server/service/client/index.ts index 2bd2fdcf22462..107bed3cac7be 100644 --- a/x-pack/plugins/observability_solution/observability_ai_assistant/server/service/client/index.ts +++ b/x-pack/plugins/observability_solution/observability_ai_assistant/server/service/client/index.ts @@ -7,7 +7,7 @@ import type { SearchHit } from '@elastic/elasticsearch/lib/api/types'; import { notFound } from '@hapi/boom'; import type { ActionsClient } from '@kbn/actions-plugin/server'; -import type { ElasticsearchClient, IUiSettingsClient } from '@kbn/core/server'; +import type { CoreSetup, ElasticsearchClient, IUiSettingsClient } from '@kbn/core/server'; import type { Logger } from '@kbn/logging'; import type { PublicMethodsOf } from '@kbn/utility-types'; import { SpanKind, context } from '@opentelemetry/api'; @@ -80,13 +80,20 @@ import { LangtraceServiceProvider, withLangtraceChatCompleteSpan, } from './operators/with_langtrace_chat_complete_span'; -import { runSemanticTextKnowledgeBaseMigration } from '../task_manager_definitions/register_migrate_knowledge_base_entries_task'; +import { + runSemanticTextKnowledgeBaseMigration, + scheduleSemanticTextMigration, +} from '../task_manager_definitions/register_migrate_knowledge_base_entries_task'; +import { ObservabilityAIAssistantPluginStartDependencies } from '../../types'; +import { ObservabilityAIAssistantConfig } from '../../config'; const MAX_FUNCTION_CALLS = 8; export class ObservabilityAIAssistantClient { constructor( private readonly dependencies: { + config: ObservabilityAIAssistantConfig; + core: CoreSetup; actionsClient: PublicMethodsOf; uiSettingsClient: IUiSettingsClient; namespace: string; @@ -725,9 +732,23 @@ export class ObservabilityAIAssistantClient { return this.dependencies.knowledgeBaseService.getStatus(); }; - setupKnowledgeBase = (modelId: string | undefined) => { - const { esClient } = this.dependencies; - return this.dependencies.knowledgeBaseService.setup(esClient, modelId); + setupKnowledgeBase = async (modelId: string | undefined) => { + const { esClient, core, logger, knowledgeBaseService } = this.dependencies; + + // setup the knowledge base + const res = await knowledgeBaseService.setup(esClient, modelId); + + core + .getStartServices() + .then(([_, pluginsStart]) => { + logger.debug('Schedule semantic text migration task'); + return scheduleSemanticTextMigration(pluginsStart); + }) + .catch((error) => { + logger.error(`Failed to run semantic text migration task: ${error}`); + }); + + return res; }; resetKnowledgeBase = () => { @@ -739,6 +760,7 @@ export class ObservabilityAIAssistantClient { return runSemanticTextKnowledgeBaseMigration({ esClient: this.dependencies.esClient, logger: this.dependencies.logger, + config: this.dependencies.config, }); }; diff --git a/x-pack/plugins/observability_solution/observability_ai_assistant/server/service/index.ts b/x-pack/plugins/observability_solution/observability_ai_assistant/server/service/index.ts index 9c26bebdd8388..d98799fcb63a7 100644 --- a/x-pack/plugins/observability_solution/observability_ai_assistant/server/service/index.ts +++ b/x-pack/plugins/observability_solution/observability_ai_assistant/server/service/index.ts @@ -5,22 +5,19 @@ * 2.0. */ -import type { PluginStartContract as ActionsPluginStart } from '@kbn/actions-plugin/server/plugin'; -import { createConcreteWriteIndex, getDataStreamAdapter } from '@kbn/alerting-plugin/server'; -import type { CoreSetup, CoreStart, KibanaRequest, Logger } from '@kbn/core/server'; -import type { SecurityPluginStart } from '@kbn/security-plugin/server'; +import type { CoreSetup, KibanaRequest, Logger } from '@kbn/core/server'; import { getSpaceIdFromPath } from '@kbn/spaces-plugin/common'; -import { once } from 'lodash'; import type { AssistantScope } from '@kbn/ai-assistant-common'; +import { once } from 'lodash'; +import pRetry from 'p-retry'; import { ObservabilityAIAssistantScreenContextRequest } from '../../common/types'; import type { ObservabilityAIAssistantPluginStartDependencies } from '../types'; import { ChatFunctionClient } from './chat_function_client'; import { ObservabilityAIAssistantClient } from './client'; -import { conversationComponentTemplate } from './conversation_component_template'; -import { kbComponentTemplate } from './kb_component_template'; import { KnowledgeBaseService } from './knowledge_base_service'; import type { RegistrationCallback, RespondFunctionResources } from './types'; import { ObservabilityAIAssistantConfig } from '../config'; +import { setupConversationAndKbIndexAssets } from './setup_conversation_and_kb_index_assets'; function getResourceName(resource: string) { return `.kibana-observability-ai-assistant-${resource}`; @@ -45,12 +42,15 @@ export const resourceNames = { }, }; +const createIndexAssetsOnce = once( + (logger: Logger, core: CoreSetup) => + pRetry(() => setupConversationAndKbIndexAssets({ logger, core })) +); + export class ObservabilityAIAssistantService { private readonly core: CoreSetup; private readonly logger: Logger; - private kbService?: KnowledgeBaseService; private config: ObservabilityAIAssistantConfig; - private readonly registrations: RegistrationCallback[] = []; constructor({ @@ -65,120 +65,8 @@ export class ObservabilityAIAssistantService { this.core = core; this.logger = logger; this.config = config; - - this.resetInit(); } - init = async () => {}; - - private resetInit = () => { - this.init = once(async () => { - return this.doInit().catch((error) => { - this.resetInit(); // reset the once flag if an error occurs - throw error; - }); - }); - }; - - private doInit = async () => { - try { - this.logger.debug('Setting up index assets'); - const [coreStart] = await this.core.getStartServices(); - - const { asInternalUser } = coreStart.elasticsearch.client; - - await asInternalUser.cluster.putComponentTemplate({ - create: false, - name: resourceNames.componentTemplate.conversations, - template: conversationComponentTemplate, - }); - - await asInternalUser.indices.putIndexTemplate({ - name: resourceNames.indexTemplate.conversations, - composed_of: [resourceNames.componentTemplate.conversations], - create: false, - index_patterns: [resourceNames.indexPatterns.conversations], - template: { - settings: { - number_of_shards: 1, - auto_expand_replicas: '0-1', - hidden: true, - }, - }, - }); - - const conversationAliasName = resourceNames.aliases.conversations; - - await createConcreteWriteIndex({ - esClient: asInternalUser, - logger: this.logger, - totalFieldsLimit: 10000, - indexPatterns: { - alias: conversationAliasName, - pattern: `${conversationAliasName}*`, - basePattern: `${conversationAliasName}*`, - name: `${conversationAliasName}-000001`, - template: resourceNames.indexTemplate.conversations, - }, - dataStreamAdapter: getDataStreamAdapter({ useDataStreamForAlerts: false }), - }); - - // Knowledge base: component template - await asInternalUser.cluster.putComponentTemplate({ - create: false, - name: resourceNames.componentTemplate.kb, - template: kbComponentTemplate, - }); - - // Knowledge base: index template - await asInternalUser.indices.putIndexTemplate({ - name: resourceNames.indexTemplate.kb, - composed_of: [resourceNames.componentTemplate.kb], - create: false, - index_patterns: [resourceNames.indexPatterns.kb], - template: { - settings: { - number_of_shards: 1, - auto_expand_replicas: '0-1', - hidden: true, - }, - }, - }); - - const kbAliasName = resourceNames.aliases.kb; - - // Knowledge base: write index - await createConcreteWriteIndex({ - esClient: asInternalUser, - logger: this.logger, - totalFieldsLimit: 10000, - indexPatterns: { - alias: kbAliasName, - pattern: `${kbAliasName}*`, - basePattern: `${kbAliasName}*`, - name: `${kbAliasName}-000001`, - template: resourceNames.indexTemplate.kb, - }, - dataStreamAdapter: getDataStreamAdapter({ useDataStreamForAlerts: false }), - }); - - this.kbService = new KnowledgeBaseService({ - core: this.core, - logger: this.logger.get('kb'), - config: this.config, - esClient: { - asInternalUser, - }, - }); - - this.logger.info('Successfully set up index assets'); - } catch (error) { - this.logger.error(`Failed setting up index assets: ${error.message}`); - this.logger.debug(error); - throw error; - } - }; - async getClient({ request, scopes, @@ -192,12 +80,11 @@ export class ObservabilityAIAssistantService { controller.abort(); }); - const [_, [coreStart, plugins]] = await Promise.all([ - this.init(), - this.core.getStartServices() as Promise< - [CoreStart, { security: SecurityPluginStart; actions: ActionsPluginStart }, unknown] - >, + const [[coreStart, plugins]] = await Promise.all([ + this.core.getStartServices(), + createIndexAssetsOnce(this.logger, this.core), ]); + // user will not be found when executed from system connector context const user = plugins.security.authc.getCurrentUser(request); @@ -207,12 +94,25 @@ export class ObservabilityAIAssistantService { const { spaceId } = getSpaceIdFromPath(basePath, coreStart.http.basePath.serverBasePath); + const { asInternalUser } = coreStart.elasticsearch.client; + + const kbService = new KnowledgeBaseService({ + core: this.core, + logger: this.logger.get('kb'), + config: this.config, + esClient: { + asInternalUser, + }, + }); + return new ObservabilityAIAssistantClient({ + core: this.core, + config: this.config, actionsClient: await plugins.actions.getActionsClientWithRequest(request), uiSettingsClient: coreStart.uiSettings.asScopedToClient(soClient), namespace: spaceId, esClient: { - asInternalUser: coreStart.elasticsearch.client.asInternalUser, + asInternalUser, asCurrentUser: coreStart.elasticsearch.client.asScoped(request).asCurrentUser, }, logger: this.logger, @@ -222,7 +122,7 @@ export class ObservabilityAIAssistantService { name: user.username, } : undefined, - knowledgeBaseService: this.kbService!, + knowledgeBaseService: kbService, scopes: scopes || ['all'], }); } diff --git a/x-pack/plugins/observability_solution/observability_ai_assistant/server/service/inference_endpoint.ts b/x-pack/plugins/observability_solution/observability_ai_assistant/server/service/inference_endpoint.ts index e89028652d9ac..a2993f7353c61 100644 --- a/x-pack/plugins/observability_solution/observability_ai_assistant/server/service/inference_endpoint.ts +++ b/x-pack/plugins/observability_solution/observability_ai_assistant/server/service/inference_endpoint.ts @@ -9,6 +9,7 @@ import { errors } from '@elastic/elasticsearch'; import { ElasticsearchClient } from '@kbn/core-elasticsearch-server'; import { Logger } from '@kbn/logging'; import moment from 'moment'; +import { ObservabilityAIAssistantConfig } from '../config'; export const AI_ASSISTANT_KB_INFERENCE_ID = 'obs_ai_assistant_kb_inference'; @@ -34,7 +35,7 @@ export async function createInferenceEndpoint({ service: 'elasticsearch', service_settings: { model_id: modelId, - adaptive_allocations: { enabled: true }, + adaptive_allocations: { enabled: true, min_number_of_allocations: 1 }, num_threads: 1, }, task_settings: {}, @@ -45,7 +46,7 @@ export async function createInferenceEndpoint({ } ); } catch (e) { - logger.error( + logger.debug( `Failed to create inference endpoint "${AI_ASSISTANT_KB_INFERENCE_ID}": ${e.message}` ); throw e; @@ -54,44 +55,30 @@ export async function createInferenceEndpoint({ export async function deleteInferenceEndpoint({ esClient, - logger, }: { esClient: { asCurrentUser: ElasticsearchClient; }; - logger: Logger; }) { - try { - const response = await esClient.asCurrentUser.inference.delete({ - inference_id: AI_ASSISTANT_KB_INFERENCE_ID, - force: true, - }); + const response = await esClient.asCurrentUser.inference.delete({ + inference_id: AI_ASSISTANT_KB_INFERENCE_ID, + force: true, + }); - return response; - } catch (e) { - logger.error(`Failed to delete inference endpoint: ${e.message}`); - throw e; - } + return response; } export async function getInferenceEndpoint({ esClient, - logger, }: { esClient: { asInternalUser: ElasticsearchClient }; - logger: Logger; }) { - try { - const response = await esClient.asInternalUser.inference.get({ - inference_id: AI_ASSISTANT_KB_INFERENCE_ID, - }); + const response = await esClient.asInternalUser.inference.get({ + inference_id: AI_ASSISTANT_KB_INFERENCE_ID, + }); - if (response.endpoints.length > 0) { - return response.endpoints[0]; - } - } catch (e) { - logger.error(`Failed to fetch inference endpoint: ${e.message}`); - throw e; + if (response.endpoints.length > 0) { + return response.endpoints[0]; } } @@ -102,3 +89,61 @@ export function isInferenceEndpointMissingOrUnavailable(error: Error) { error.body?.error?.type === 'status_exception') ); } + +export async function getElserModelStatus({ + esClient, + logger, + config, +}: { + esClient: { asInternalUser: ElasticsearchClient }; + logger: Logger; + config: ObservabilityAIAssistantConfig; +}) { + let errorMessage = ''; + const endpoint = await getInferenceEndpoint({ + esClient, + }).catch((error) => { + if (!isInferenceEndpointMissingOrUnavailable(error)) { + throw error; + } + errorMessage = error.message; + }); + + const enabled = config.enableKnowledgeBase; + if (!endpoint) { + return { ready: false, enabled, errorMessage }; + } + + const modelId = endpoint.service_settings?.model_id; + const modelStats = await esClient.asInternalUser.ml + .getTrainedModelsStats({ model_id: modelId }) + .catch((error) => { + logger.debug(`Failed to get model stats: ${error.message}`); + errorMessage = error.message; + }); + + if (!modelStats) { + return { ready: false, enabled, errorMessage }; + } + + const elserModelStats = modelStats.trained_model_stats.find( + (stats) => stats.deployment_stats?.deployment_id === AI_ASSISTANT_KB_INFERENCE_ID + ); + const deploymentState = elserModelStats?.deployment_stats?.state; + const allocationState = elserModelStats?.deployment_stats?.allocation_status.state; + const allocationCount = + elserModelStats?.deployment_stats?.allocation_status.allocation_count ?? 0; + const ready = + deploymentState === 'started' && allocationState === 'fully_allocated' && allocationCount > 0; + + return { + endpoint, + ready, + enabled, + model_stats: { + allocation_count: allocationCount, + deployment_state: deploymentState, + allocation_state: allocationState, + }, + }; +} diff --git a/x-pack/plugins/observability_solution/observability_ai_assistant/server/service/kb_component_template.ts b/x-pack/plugins/observability_solution/observability_ai_assistant/server/service/kb_component_template.ts index 6cf89b0c9e22d..49e856db29d50 100644 --- a/x-pack/plugins/observability_solution/observability_ai_assistant/server/service/kb_component_template.ts +++ b/x-pack/plugins/observability_solution/observability_ai_assistant/server/service/kb_component_template.ts @@ -62,10 +62,6 @@ export const kbComponentTemplate: ClusterComponentTemplate['component_template'] semantic_text: { type: 'semantic_text', inference_id: AI_ASSISTANT_KB_INFERENCE_ID, - // @ts-expect-error: @elastic/elasticsearch does not have this type yet - model_settings: { - task_type: 'sparse_embedding', - }, }, 'ml.tokens': { type: 'rank_features', diff --git a/x-pack/plugins/observability_solution/observability_ai_assistant/server/service/knowledge_base_service/index.ts b/x-pack/plugins/observability_solution/observability_ai_assistant/server/service/knowledge_base_service/index.ts index fdde5ebb49970..1cf1cdc326fdf 100644 --- a/x-pack/plugins/observability_solution/observability_ai_assistant/server/service/knowledge_base_service/index.ts +++ b/x-pack/plugins/observability_solution/observability_ai_assistant/server/service/knowledge_base_service/index.ts @@ -20,10 +20,9 @@ import { import { getAccessQuery } from '../util/get_access_query'; import { getCategoryQuery } from '../util/get_category_query'; import { - AI_ASSISTANT_KB_INFERENCE_ID, createInferenceEndpoint, deleteInferenceEndpoint, - getInferenceEndpoint, + getElserModelStatus, isInferenceEndpointMissingOrUnavailable, } from '../inference_endpoint'; import { recallFromSearchConnectors } from './recall_from_search_connectors'; @@ -61,13 +60,13 @@ export class KnowledgeBaseService { }, modelId: string | undefined ) { - await deleteInferenceEndpoint({ esClient, logger: this.dependencies.logger }).catch((e) => {}); // ensure existing inference endpoint is deleted + await deleteInferenceEndpoint({ esClient }).catch((e) => {}); // ensure existing inference endpoint is deleted return createInferenceEndpoint({ esClient, logger: this.dependencies.logger, modelId }); } async reset(esClient: { asCurrentUser: ElasticsearchClient }) { try { - await deleteInferenceEndpoint({ esClient, logger: this.dependencies.logger }); + await deleteInferenceEndpoint({ esClient }); } catch (error) { if (isInferenceEndpointMissingOrUnavailable(error)) { return; @@ -437,58 +436,10 @@ export class KnowledgeBaseService { }; getStatus = async () => { - let errorMessage = ''; - const endpoint = await getInferenceEndpoint({ + return getElserModelStatus({ esClient: this.dependencies.esClient, logger: this.dependencies.logger, - }).catch((error) => { - if (!isInferenceEndpointMissingOrUnavailable(error)) { - throw error; - } - this.dependencies.logger.error(`Failed to get inference endpoint: ${error.message}`); - errorMessage = error.message; + config: this.dependencies.config, }); - - const enabled = this.dependencies.config.enableKnowledgeBase; - if (!endpoint) { - return { ready: false, enabled, errorMessage }; - } - - const modelId = endpoint.service_settings?.model_id; - const modelStats = await this.dependencies.esClient.asInternalUser.ml - .getTrainedModelsStats({ model_id: modelId }) - .catch((error) => { - this.dependencies.logger.error(`Failed to get model stats: ${error.message}`); - errorMessage = error.message; - }); - - if (!modelStats) { - return { ready: false, enabled, errorMessage }; - } - - const elserModelStats = modelStats.trained_model_stats.find( - (stats) => stats.deployment_stats?.deployment_id === AI_ASSISTANT_KB_INFERENCE_ID - ); - const deploymentState = elserModelStats?.deployment_stats?.state; - const allocationState = elserModelStats?.deployment_stats?.allocation_status.state; - const allocationCount = - elserModelStats?.deployment_stats?.allocation_status.allocation_count ?? 0; - const ready = - deploymentState === 'started' && allocationState === 'fully_allocated' && allocationCount > 0; - - this.dependencies.logger.debug( - `Model deployment state: ${deploymentState}, allocation state: ${allocationState}, ready: ${ready}` - ); - - return { - endpoint, - ready, - enabled, - model_stats: { - allocation_count: allocationCount, - deployment_state: deploymentState, - allocation_state: allocationState, - }, - }; }; } diff --git a/x-pack/plugins/observability_solution/observability_ai_assistant/server/service/setup_conversation_and_kb_index_assets.ts b/x-pack/plugins/observability_solution/observability_ai_assistant/server/service/setup_conversation_and_kb_index_assets.ts new file mode 100644 index 0000000000000..30d55400bbbda --- /dev/null +++ b/x-pack/plugins/observability_solution/observability_ai_assistant/server/service/setup_conversation_and_kb_index_assets.ts @@ -0,0 +1,108 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { createConcreteWriteIndex, getDataStreamAdapter } from '@kbn/alerting-plugin/server'; +import type { CoreSetup, Logger } from '@kbn/core/server'; +import type { ObservabilityAIAssistantPluginStartDependencies } from '../types'; +import { conversationComponentTemplate } from './conversation_component_template'; +import { kbComponentTemplate } from './kb_component_template'; +import { resourceNames } from '.'; + +export async function setupConversationAndKbIndexAssets({ + logger, + core, +}: { + logger: Logger; + core: CoreSetup; +}) { + try { + logger.debug('Setting up index assets'); + const [coreStart] = await core.getStartServices(); + const { asInternalUser } = coreStart.elasticsearch.client; + + // Conversations: component template + await asInternalUser.cluster.putComponentTemplate({ + create: false, + name: resourceNames.componentTemplate.conversations, + template: conversationComponentTemplate, + }); + + // Conversations: index template + await asInternalUser.indices.putIndexTemplate({ + name: resourceNames.indexTemplate.conversations, + composed_of: [resourceNames.componentTemplate.conversations], + create: false, + index_patterns: [resourceNames.indexPatterns.conversations], + template: { + settings: { + number_of_shards: 1, + auto_expand_replicas: '0-1', + hidden: true, + }, + }, + }); + + // Conversations: write index + const conversationAliasName = resourceNames.aliases.conversations; + await createConcreteWriteIndex({ + esClient: asInternalUser, + logger, + totalFieldsLimit: 10000, + indexPatterns: { + alias: conversationAliasName, + pattern: `${conversationAliasName}*`, + basePattern: `${conversationAliasName}*`, + name: `${conversationAliasName}-000001`, + template: resourceNames.indexTemplate.conversations, + }, + dataStreamAdapter: getDataStreamAdapter({ useDataStreamForAlerts: false }), + }); + + // Knowledge base: component template + await asInternalUser.cluster.putComponentTemplate({ + create: false, + name: resourceNames.componentTemplate.kb, + template: kbComponentTemplate, + }); + + // Knowledge base: index template + await asInternalUser.indices.putIndexTemplate({ + name: resourceNames.indexTemplate.kb, + composed_of: [resourceNames.componentTemplate.kb], + create: false, + index_patterns: [resourceNames.indexPatterns.kb], + template: { + settings: { + number_of_shards: 1, + auto_expand_replicas: '0-1', + hidden: true, + }, + }, + }); + + // Knowledge base: write index + const kbAliasName = resourceNames.aliases.kb; + await createConcreteWriteIndex({ + esClient: asInternalUser, + logger, + totalFieldsLimit: 10000, + indexPatterns: { + alias: kbAliasName, + pattern: `${kbAliasName}*`, + basePattern: `${kbAliasName}*`, + name: `${kbAliasName}-000001`, + template: resourceNames.indexTemplate.kb, + }, + dataStreamAdapter: getDataStreamAdapter({ useDataStreamForAlerts: false }), + }); + + logger.info('Successfully set up index assets'); + } catch (error) { + logger.error(`Failed setting up index assets: ${error.message}`); + logger.debug(error); + } +} diff --git a/x-pack/plugins/observability_solution/observability_ai_assistant/server/service/task_manager_definitions/register_migrate_knowledge_base_entries_task.ts b/x-pack/plugins/observability_solution/observability_ai_assistant/server/service/task_manager_definitions/register_migrate_knowledge_base_entries_task.ts index 3df125ab2ba2d..b75074dc7ea54 100644 --- a/x-pack/plugins/observability_solution/observability_ai_assistant/server/service/task_manager_definitions/register_migrate_knowledge_base_entries_task.ts +++ b/x-pack/plugins/observability_solution/observability_ai_assistant/server/service/task_manager_definitions/register_migrate_knowledge_base_entries_task.ts @@ -12,8 +12,10 @@ import type { CoreSetup, Logger } from '@kbn/core/server'; import pRetry from 'p-retry'; import { KnowledgeBaseEntry } from '../../../common'; import { resourceNames } from '..'; -import { getInferenceEndpoint } from '../inference_endpoint'; +import { getElserModelStatus } from '../inference_endpoint'; import { ObservabilityAIAssistantPluginStartDependencies } from '../../types'; +import { ObservabilityAIAssistantConfig } from '../../config'; +import { setupConversationAndKbIndexAssets } from '../setup_conversation_and_kb_index_assets'; const TASK_ID = 'obs-ai-assistant:knowledge-base-migration-task-id'; const TASK_TYPE = 'obs-ai-assistant:knowledge-base-migration'; @@ -25,36 +27,66 @@ export async function registerMigrateKnowledgeBaseEntriesTask({ taskManager, logger, core, + config, }: { taskManager: TaskManagerSetupContract; logger: Logger; core: CoreSetup; + config: ObservabilityAIAssistantConfig; }) { - logger.debug(`Register task "${TASK_TYPE}"`); - const [coreStart, pluginsStart] = await core.getStartServices(); - taskManager.registerTaskDefinitions({ - [TASK_TYPE]: { - title: 'Migrate AI Assistant Knowledge Base', - description: `Migrates AI Assistant knowledge base entries`, - timeout: '1h', - maxAttempts: 5, - createTaskRunner() { - return { - async run() { - logger.debug(`Run task: "${TASK_TYPE}"`); - - const esClient = { asInternalUser: coreStart.elasticsearch.client.asInternalUser }; - await runSemanticTextKnowledgeBaseMigration({ esClient, logger }); - }, - }; + try { + logger.debug(`Register task "${TASK_TYPE}"`); + taskManager.registerTaskDefinitions({ + [TASK_TYPE]: { + title: 'Migrate AI Assistant Knowledge Base', + description: `Migrates AI Assistant knowledge base entries`, + timeout: '1h', + maxAttempts: 5, + createTaskRunner() { + return { + async run() { + logger.debug(`Run task: "${TASK_TYPE}"`); + const esClient = coreStart.elasticsearch.client; + + const hasKbIndex = await esClient.asInternalUser.indices.exists({ + index: resourceNames.aliases.kb, + }); + + if (!hasKbIndex) { + logger.debug( + 'Knowledge base index does not exist. Skipping semantic text migration.' + ); + return; + } + + // update fields and mappings + await setupConversationAndKbIndexAssets({ logger, core }); + + // run migration + await runSemanticTextKnowledgeBaseMigration({ esClient, logger, config }); + }, + }; + }, }, - }, - }); + }); + } catch (error) { + logger.error(`Failed to register task "${TASK_TYPE}". Error: ${error}`); + } + + try { + logger.debug(`Scheduled task: "${TASK_TYPE}"`); + await scheduleSemanticTextMigration(pluginsStart); + } catch (error) { + logger.error(`Failed to schedule task "${TASK_TYPE}". Error: ${error}`); + } +} - logger.debug(`Scheduled task: "${TASK_TYPE}"`); - await pluginsStart.taskManager.ensureScheduled({ +export function scheduleSemanticTextMigration( + pluginsStart: ObservabilityAIAssistantPluginStartDependencies +) { + return pluginsStart.taskManager.ensureScheduled({ id: TASK_ID, taskType: TASK_TYPE, scope: ['aiAssistant'], @@ -66,9 +98,11 @@ export async function registerMigrateKnowledgeBaseEntriesTask({ export async function runSemanticTextKnowledgeBaseMigration({ esClient, logger, + config, }: { esClient: { asInternalUser: ElasticsearchClient }; logger: Logger; + config: ObservabilityAIAssistantConfig; }) { logger.debug('Knowledge base migration: Running migration'); @@ -98,7 +132,7 @@ export async function runSemanticTextKnowledgeBaseMigration({ logger.debug(`Knowledge base migration: Found ${response.hits.hits.length} entries to migrate`); - await waitForInferenceEndpoint({ esClient, logger }); + await waitForModel({ esClient, logger, config }); // Limit the number of concurrent requests to avoid overloading the cluster const limiter = pLimit(10); @@ -109,6 +143,7 @@ export async function runSemanticTextKnowledgeBaseMigration({ } return esClient.asInternalUser.update({ + refresh: 'wait_for', index: resourceNames.aliases.kb, id: hit._id, body: { @@ -123,27 +158,29 @@ export async function runSemanticTextKnowledgeBaseMigration({ await Promise.all(promises); logger.debug(`Knowledge base migration: Migrated ${promises.length} entries`); - await runSemanticTextKnowledgeBaseMigration({ esClient, logger }); + await runSemanticTextKnowledgeBaseMigration({ esClient, logger, config }); } catch (e) { - logger.error('Knowledge base migration: Failed to migrate entries'); - logger.error(e); + logger.error(`Knowledge base migration failed: ${e.message}`); } } -async function waitForInferenceEndpoint({ +async function waitForModel({ esClient, logger, + config, }: { esClient: { asInternalUser: ElasticsearchClient }; logger: Logger; + config: ObservabilityAIAssistantConfig; }) { return pRetry( async () => { - const endpoint = await getInferenceEndpoint({ esClient, logger }); - if (!endpoint) { - throw new Error('Inference endpoint not yet ready'); + const { ready } = await getElserModelStatus({ esClient, logger, config }); + if (!ready) { + logger.debug('Elser model is not yet ready. Retrying...'); + throw new Error('Elser model is not yet ready'); } }, - { retries: 20, factor: 2 } + { retries: 30, factor: 2, maxTimeout: 30_000 } ); }