Skip to content

Commit

Permalink
Query insights plugin implementation
Browse files Browse the repository at this point in the history
Signed-off-by: Chenyang Ji <[email protected]>
  • Loading branch information
ansjcy committed Feb 6, 2024
1 parent a4bc4af commit 7bfd76f
Show file tree
Hide file tree
Showing 29 changed files with 1,734 additions and 26 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Extract cluster management for integration tests into JUnit test rule out of OpenSearchIntegTestCase ([#11877](https://github.com/opensearch-project/OpenSearch/pull/11877)), ([#12000](https://github.com/opensearch-project/OpenSearch/pull/12000))
- Workaround for https://bugs.openjdk.org/browse/JDK-8323659 regression, introduced in JDK-21.0.2 ([#11968](https://github.com/opensearch-project/OpenSearch/pull/11968))
- Updates IpField to be searchable when only `doc_values` are enabled ([#11508](https://github.com/opensearch-project/OpenSearch/pull/11508))
- [Query Insights] Query Insights Plugin Implementation ([#11903](https://github.com/opensearch-project/OpenSearch/pull/11903))

### Deprecated

Expand Down
1 change: 1 addition & 0 deletions gradle/missing-javadoc.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ configure([
project(":plugins:mapper-annotated-text"),
project(":plugins:mapper-murmur3"),
project(":plugins:mapper-size"),
project(":plugins:query-insights"),
project(":plugins:repository-azure"),
project(":plugins:repository-gcs"),
project(":plugins:repository-hdfs"),
Expand Down
18 changes: 18 additions & 0 deletions plugins/query-insights/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

opensearchplugin {
description 'OpenSearch Query Insights Plugin.'
classname 'org.opensearch.plugin.insights.QueryInsightsPlugin'
}

dependencies {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.plugin.insights;

import org.opensearch.action.ActionRequest;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.IndexScopedSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.settings.SettingsFilter;
import org.opensearch.core.action.ActionResponse;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.plugins.ActionPlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.rest.RestController;
import org.opensearch.rest.RestHandler;
import org.opensearch.script.ScriptService;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.watcher.ResourceWatcherService;

import java.util.Collection;
import java.util.List;
import java.util.function.Supplier;

/**
* Plugin class for Query Insights.
*/
public class QueryInsightsPlugin extends Plugin implements ActionPlugin {
/**
* Default constructor
*/
public QueryInsightsPlugin() {}

@Override
public Collection<Object> createComponents(
Client client,
ClusterService clusterService,
ThreadPool threadPool,
ResourceWatcherService resourceWatcherService,
ScriptService scriptService,
NamedXContentRegistry xContentRegistry,
Environment environment,
NodeEnvironment nodeEnvironment,
NamedWriteableRegistry namedWriteableRegistry,
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<RepositoriesService> repositoriesServiceSupplier
) {
return List.of();
}

@Override
public List<RestHandler> getRestHandlers(
Settings settings,
RestController restController,
ClusterSettings clusterSettings,
IndexScopedSettings indexScopedSettings,
SettingsFilter settingsFilter,
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<DiscoveryNodes> nodesInCluster
) {
return List.of();
}

@Override
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
return List.of();
}

@Override
public List<Setting<?>> getSettings() {
return List.of();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.plugin.insights.core.exporter;

import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;

import java.util.List;

/**
* Simple abstract class to export data collected by search query analyzers
* <p>
* Mainly for use within the Query Insight framework
*
* @opensearch.internal
*/
public abstract class QueryInsightsExporter<T extends SearchQueryRecord<?>> {
private QueryInsightsExporterType type;
private String identifier;

QueryInsightsExporter(QueryInsightsExporterType type, String identifier) {
this.type = type;
this.identifier = identifier;
}

/**
* Export the data with the exporter.
*
* @param records the data to export
*/
public abstract void export(List<T> records) throws Exception;

public void setType(QueryInsightsExporterType type) {
this.type = type;
}

public QueryInsightsExporterType getType() {
return type;
}

public void setIdentifier(String identifier) {
this.identifier = identifier;
}

public String getIdentifier() {
return identifier;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.plugin.insights.core.exporter;

import java.util.Locale;

/**
* Types for the Query Insights Exporters
*
* @opensearch.internal
*/
public enum QueryInsightsExporterType {
/* local index exporter */
LOCAL_INDEX("local_index");

private final String type;

QueryInsightsExporterType(String type) {
this.type = type;
}

public static QueryInsightsExporterType parse(String type) {
return valueOf(type.toUpperCase(Locale.ROOT));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.plugin.insights.core.exporter;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.client.Client;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;

import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.List;
import java.util.Locale;
import java.util.Objects;

/**
* Class to export data collected by search query analyzers to a local OpenSearch index
* <p>
* Internal used within the Query Insight framework
*
* @opensearch.internal
*/
public class QueryInsightsLocalIndexExporter<T extends SearchQueryRecord<?>> extends QueryInsightsExporter<T> {
private static final Logger log = LogManager.getLogger(QueryInsightsLocalIndexExporter.class);
private static final int INDEX_TIMEOUT = 60;

private final ClusterService clusterService;
private final Client client;

/** The mapping for the local index that holds the data */
private final InputStream localIndexMapping;

public QueryInsightsLocalIndexExporter(
ClusterService clusterService,
Client client,
String localIndexName,
InputStream localIndexMapping
) {
super(QueryInsightsExporterType.LOCAL_INDEX, localIndexName);
this.clusterService = clusterService;
this.client = client;
this.localIndexMapping = localIndexMapping;
}

/**
* Export the data to the predefined local OpenSearch Index
*
* @param records the data to export
* @throws IOException if an error occurs
*/
@Override
public synchronized void export(List<T> records) throws IOException {
if (records.size() == 0) {
return;
}
if (checkIfIndexExists()) {
bulkRecord(records);
} else {
// local index not exist
initLocalIndex(new ActionListener<>() {
@Override
public void onResponse(CreateIndexResponse response) {
if (response.isAcknowledged()) {
log.debug(
String.format(Locale.ROOT, "successfully initialized local index %s for query insight.", getIdentifier())
);
try {
bulkRecord(records);
} catch (IOException e) {
log.error(String.format(Locale.ROOT, "fail to ingest query insight data to local index, error: %s", e));
}
} else {
log.error(
String.format(
Locale.ROOT,
"request to created local index %s for query insight not acknowledged.",
getIdentifier()
)
);
}
}

@Override
public void onFailure(Exception e) {
log.error(String.format(Locale.ROOT, "error creating local index for query insight: %s", e));
}
});
}
}

/**
* Util function to check if a local OpenSearch Index exists
*
* @return boolean
*/
private boolean checkIfIndexExists() {
ClusterState clusterState = clusterService.state();
return clusterState.getRoutingTable().hasIndex(this.getIdentifier());
}

/**
* Initialize the local OpenSearch Index for the exporter
*
* @param listener the listener to be notified upon completion
* @throws IOException if an error occurs
*/
private synchronized void initLocalIndex(ActionListener<CreateIndexResponse> listener) throws IOException {
CreateIndexRequest createIndexRequest = new CreateIndexRequest(this.getIdentifier()).mapping(getIndexMappings())
.settings(Settings.builder().put("index.hidden", false).build());
client.admin().indices().create(createIndexRequest, listener);
}

/**
* Get the index mapping of the local index
*
* @return String to represent the index mapping
* @throws IOException if an error occurs
*/
private String getIndexMappings() throws IOException {
return new String(Objects.requireNonNull(this.localIndexMapping).readAllBytes(), Charset.defaultCharset());
}

/**
* Bulk ingest the data into to the predefined local OpenSearch Index
*
* @param records the data to export
* @throws IOException if an error occurs
*/
private void bulkRecord(List<T> records) throws IOException {
BulkRequest bulkRequest = new BulkRequest().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.timeout(TimeValue.timeValueSeconds(INDEX_TIMEOUT));
for (T record : records) {
bulkRequest.add(
new IndexRequest(getIdentifier()).source(record.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
);
}
client.bulk(bulkRequest, new ActionListener<>() {
@Override
public void onResponse(BulkResponse response) {
if (response.status().equals(RestStatus.CREATED) || response.status().equals(RestStatus.OK)) {
log.debug(String.format(Locale.ROOT, "successfully ingest data for %s! ", getIdentifier()));
} else {
log.error(
String.format(
Locale.ROOT,
"error when ingesting data for %s, error: %s",
getIdentifier(),
response.buildFailureMessage()
)
);
}
}

@Override
public void onFailure(Exception e) {
log.error(String.format(Locale.ROOT, "failed to ingest data for %s, %s", getIdentifier(), e));
}
});
}
}
Loading

0 comments on commit 7bfd76f

Please sign in to comment.