Skip to content

Commit

Permalink
[Obs AI Assistant] Perform index creation at startup (elastic#201362)
Browse files Browse the repository at this point in the history
Currently the knowledge base creates index assets (index templates, index components) lazily when the user interacts with the assistant. This prevents running the semantic text migrations (added in elastic#186499) when Kibana starts because the mappings have not yet been updated.

Additionally, this PR also increases `min_number_of_allocations` to 1 to
ensure at least one ML node is available at all times.

(cherry picked from commit b217f1a)
  • Loading branch information
sorenlouv committed Dec 3, 2024
1 parent 277d438 commit f9dda01
Show file tree
Hide file tree
Showing 9 changed files with 316 additions and 252 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,9 @@ export class ObservabilityAIAssistantPlugin
core,
taskManager: plugins.taskManager,
logger: this.logger,
}).catch((error) => {
this.logger.error(`Failed to register migrate knowledge base entries task: ${error}`);
config: this.config,
}).catch((e) => {
this.logger.error(`Knowledge base migration was not successfully: ${e.message}`);
});

service.register(registerFunctions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* 2.0.
*/
import type { ActionsClient } from '@kbn/actions-plugin/server/actions_client';
import type { ElasticsearchClient, IUiSettingsClient, Logger } from '@kbn/core/server';
import type { CoreSetup, ElasticsearchClient, IUiSettingsClient, Logger } from '@kbn/core/server';
import type { DeeplyMockedKeys } from '@kbn/utility-types-jest';
import { waitFor } from '@testing-library/react';
import { last, merge, repeat } from 'lodash';
Expand All @@ -27,7 +27,9 @@ import { CONTEXT_FUNCTION_NAME } from '../../functions/context';
import { ChatFunctionClient } from '../chat_function_client';
import type { KnowledgeBaseService } from '../knowledge_base_service';
import { observableIntoStream } from '../util/observable_into_stream';
import { CreateChatCompletionResponseChunk } from './adapters/process_openai_stream';
import type { CreateChatCompletionResponseChunk } from './adapters/process_openai_stream';
import type { ObservabilityAIAssistantConfig } from '../../config';
import type { ObservabilityAIAssistantPluginStartDependencies } from '../../types';

type ChunkDelta = CreateChatCompletionResponseChunk['choices'][number]['delta'];

Expand Down Expand Up @@ -177,6 +179,8 @@ describe('Observability AI Assistant client', () => {
functionClientMock.getAdhocInstructions.mockReturnValue([]);

return new ObservabilityAIAssistantClient({
config: {} as ObservabilityAIAssistantConfig,
core: {} as CoreSetup<ObservabilityAIAssistantPluginStartDependencies>,
actionsClient: actionsClientMock,
uiSettingsClient: uiSettingsClientMock,
esClient: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import type { SearchHit } from '@elastic/elasticsearch/lib/api/types';
import { notFound } from '@hapi/boom';
import type { ActionsClient } from '@kbn/actions-plugin/server';
import type { ElasticsearchClient, IUiSettingsClient } from '@kbn/core/server';
import type { CoreSetup, ElasticsearchClient, IUiSettingsClient } from '@kbn/core/server';
import type { Logger } from '@kbn/logging';
import type { PublicMethodsOf } from '@kbn/utility-types';
import { SpanKind, context } from '@opentelemetry/api';
Expand Down Expand Up @@ -80,13 +80,20 @@ import {
LangtraceServiceProvider,
withLangtraceChatCompleteSpan,
} from './operators/with_langtrace_chat_complete_span';
import { runSemanticTextKnowledgeBaseMigration } from '../task_manager_definitions/register_migrate_knowledge_base_entries_task';
import {
runSemanticTextKnowledgeBaseMigration,
scheduleSemanticTextMigration,
} from '../task_manager_definitions/register_migrate_knowledge_base_entries_task';
import { ObservabilityAIAssistantPluginStartDependencies } from '../../types';
import { ObservabilityAIAssistantConfig } from '../../config';

const MAX_FUNCTION_CALLS = 8;

export class ObservabilityAIAssistantClient {
constructor(
private readonly dependencies: {
config: ObservabilityAIAssistantConfig;
core: CoreSetup<ObservabilityAIAssistantPluginStartDependencies>;
actionsClient: PublicMethodsOf<ActionsClient>;
uiSettingsClient: IUiSettingsClient;
namespace: string;
Expand Down Expand Up @@ -725,9 +732,23 @@ export class ObservabilityAIAssistantClient {
return this.dependencies.knowledgeBaseService.getStatus();
};

setupKnowledgeBase = (modelId: string | undefined) => {
const { esClient } = this.dependencies;
return this.dependencies.knowledgeBaseService.setup(esClient, modelId);
setupKnowledgeBase = async (modelId: string | undefined) => {
const { esClient, core, logger, knowledgeBaseService } = this.dependencies;

// setup the knowledge base
const res = await knowledgeBaseService.setup(esClient, modelId);

core
.getStartServices()
.then(([_, pluginsStart]) => {
logger.debug('Schedule semantic text migration task');
return scheduleSemanticTextMigration(pluginsStart);
})
.catch((error) => {
logger.error(`Failed to run semantic text migration task: ${error}`);
});

return res;
};

resetKnowledgeBase = () => {
Expand All @@ -739,6 +760,7 @@ export class ObservabilityAIAssistantClient {
return runSemanticTextKnowledgeBaseMigration({
esClient: this.dependencies.esClient,
logger: this.dependencies.logger,
config: this.dependencies.config,
});
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,19 @@
* 2.0.
*/

import type { PluginStartContract as ActionsPluginStart } from '@kbn/actions-plugin/server/plugin';
import { createConcreteWriteIndex, getDataStreamAdapter } from '@kbn/alerting-plugin/server';
import type { CoreSetup, CoreStart, KibanaRequest, Logger } from '@kbn/core/server';
import type { SecurityPluginStart } from '@kbn/security-plugin/server';
import type { CoreSetup, KibanaRequest, Logger } from '@kbn/core/server';
import { getSpaceIdFromPath } from '@kbn/spaces-plugin/common';
import { once } from 'lodash';
import type { AssistantScope } from '@kbn/ai-assistant-common';
import { once } from 'lodash';
import pRetry from 'p-retry';
import { ObservabilityAIAssistantScreenContextRequest } from '../../common/types';
import type { ObservabilityAIAssistantPluginStartDependencies } from '../types';
import { ChatFunctionClient } from './chat_function_client';
import { ObservabilityAIAssistantClient } from './client';
import { conversationComponentTemplate } from './conversation_component_template';
import { kbComponentTemplate } from './kb_component_template';
import { KnowledgeBaseService } from './knowledge_base_service';
import type { RegistrationCallback, RespondFunctionResources } from './types';
import { ObservabilityAIAssistantConfig } from '../config';
import { setupConversationAndKbIndexAssets } from './setup_conversation_and_kb_index_assets';

function getResourceName(resource: string) {
return `.kibana-observability-ai-assistant-${resource}`;
Expand All @@ -45,12 +42,15 @@ export const resourceNames = {
},
};

const createIndexAssetsOnce = once(
(logger: Logger, core: CoreSetup<ObservabilityAIAssistantPluginStartDependencies>) =>
pRetry(() => setupConversationAndKbIndexAssets({ logger, core }))
);

export class ObservabilityAIAssistantService {
private readonly core: CoreSetup<ObservabilityAIAssistantPluginStartDependencies>;
private readonly logger: Logger;
private kbService?: KnowledgeBaseService;
private config: ObservabilityAIAssistantConfig;

private readonly registrations: RegistrationCallback[] = [];

constructor({
Expand All @@ -65,120 +65,8 @@ export class ObservabilityAIAssistantService {
this.core = core;
this.logger = logger;
this.config = config;

this.resetInit();
}

init = async () => {};

private resetInit = () => {
this.init = once(async () => {
return this.doInit().catch((error) => {
this.resetInit(); // reset the once flag if an error occurs
throw error;
});
});
};

private doInit = async () => {
try {
this.logger.debug('Setting up index assets');
const [coreStart] = await this.core.getStartServices();

const { asInternalUser } = coreStart.elasticsearch.client;

await asInternalUser.cluster.putComponentTemplate({
create: false,
name: resourceNames.componentTemplate.conversations,
template: conversationComponentTemplate,
});

await asInternalUser.indices.putIndexTemplate({
name: resourceNames.indexTemplate.conversations,
composed_of: [resourceNames.componentTemplate.conversations],
create: false,
index_patterns: [resourceNames.indexPatterns.conversations],
template: {
settings: {
number_of_shards: 1,
auto_expand_replicas: '0-1',
hidden: true,
},
},
});

const conversationAliasName = resourceNames.aliases.conversations;

await createConcreteWriteIndex({
esClient: asInternalUser,
logger: this.logger,
totalFieldsLimit: 10000,
indexPatterns: {
alias: conversationAliasName,
pattern: `${conversationAliasName}*`,
basePattern: `${conversationAliasName}*`,
name: `${conversationAliasName}-000001`,
template: resourceNames.indexTemplate.conversations,
},
dataStreamAdapter: getDataStreamAdapter({ useDataStreamForAlerts: false }),
});

// Knowledge base: component template
await asInternalUser.cluster.putComponentTemplate({
create: false,
name: resourceNames.componentTemplate.kb,
template: kbComponentTemplate,
});

// Knowledge base: index template
await asInternalUser.indices.putIndexTemplate({
name: resourceNames.indexTemplate.kb,
composed_of: [resourceNames.componentTemplate.kb],
create: false,
index_patterns: [resourceNames.indexPatterns.kb],
template: {
settings: {
number_of_shards: 1,
auto_expand_replicas: '0-1',
hidden: true,
},
},
});

const kbAliasName = resourceNames.aliases.kb;

// Knowledge base: write index
await createConcreteWriteIndex({
esClient: asInternalUser,
logger: this.logger,
totalFieldsLimit: 10000,
indexPatterns: {
alias: kbAliasName,
pattern: `${kbAliasName}*`,
basePattern: `${kbAliasName}*`,
name: `${kbAliasName}-000001`,
template: resourceNames.indexTemplate.kb,
},
dataStreamAdapter: getDataStreamAdapter({ useDataStreamForAlerts: false }),
});

this.kbService = new KnowledgeBaseService({
core: this.core,
logger: this.logger.get('kb'),
config: this.config,
esClient: {
asInternalUser,
},
});

this.logger.info('Successfully set up index assets');
} catch (error) {
this.logger.error(`Failed setting up index assets: ${error.message}`);
this.logger.debug(error);
throw error;
}
};

async getClient({
request,
scopes,
Expand All @@ -192,12 +80,11 @@ export class ObservabilityAIAssistantService {
controller.abort();
});

const [_, [coreStart, plugins]] = await Promise.all([
this.init(),
this.core.getStartServices() as Promise<
[CoreStart, { security: SecurityPluginStart; actions: ActionsPluginStart }, unknown]
>,
const [[coreStart, plugins]] = await Promise.all([
this.core.getStartServices(),
createIndexAssetsOnce(this.logger, this.core),
]);

// user will not be found when executed from system connector context
const user = plugins.security.authc.getCurrentUser(request);

Expand All @@ -207,12 +94,25 @@ export class ObservabilityAIAssistantService {

const { spaceId } = getSpaceIdFromPath(basePath, coreStart.http.basePath.serverBasePath);

const { asInternalUser } = coreStart.elasticsearch.client;

const kbService = new KnowledgeBaseService({
core: this.core,
logger: this.logger.get('kb'),
config: this.config,
esClient: {
asInternalUser,
},
});

return new ObservabilityAIAssistantClient({
core: this.core,
config: this.config,
actionsClient: await plugins.actions.getActionsClientWithRequest(request),
uiSettingsClient: coreStart.uiSettings.asScopedToClient(soClient),
namespace: spaceId,
esClient: {
asInternalUser: coreStart.elasticsearch.client.asInternalUser,
asInternalUser,
asCurrentUser: coreStart.elasticsearch.client.asScoped(request).asCurrentUser,
},
logger: this.logger,
Expand All @@ -222,7 +122,7 @@ export class ObservabilityAIAssistantService {
name: user.username,
}
: undefined,
knowledgeBaseService: this.kbService!,
knowledgeBaseService: kbService,
scopes: scopes || ['all'],
});
}
Expand Down
Loading

0 comments on commit f9dda01

Please sign in to comment.