diff --git a/CHANGELOG-3.0.md b/CHANGELOG-3.0.md
index 1cc12f66d52e1..964383078c38d 100644
--- a/CHANGELOG-3.0.md
+++ b/CHANGELOG-3.0.md
@@ -17,7 +17,6 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Dependencies
### Changed
-- Changed locale provider from COMPAT to CLDR ([13988](https://github.com/opensearch-project/OpenSearch/pull/13988))
- Migrate client transports to Apache HttpClient / Core 5.x ([#4459](https://github.com/opensearch-project/OpenSearch/pull/4459))
- Change http code on create index API with bad input raising NotXContentException from 500 to 400 ([#4773](https://github.com/opensearch-project/OpenSearch/pull/4773))
- Improve summary error message for invalid setting updates ([#4792](https://github.com/opensearch-project/OpenSearch/pull/4792))
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 098405f8f1d44..0a334945d69ba 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -15,8 +15,12 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add getMetadataFields to MapperService ([#13819](https://github.com/opensearch-project/OpenSearch/pull/13819))
- [Remote State] Add async remote state deletion task running on an interval, configurable by a setting ([#13131](https://github.com/opensearch-project/OpenSearch/pull/13131))
- Allow setting query parameters on requests ([#13776](https://github.com/opensearch-project/OpenSearch/issues/13776))
+- Add capability to disable source recovery_source for an index ([#13590](https://github.com/opensearch-project/OpenSearch/pull/13590))
- Add remote routing table for remote state publication with experimental feature flag ([#13304](https://github.com/opensearch-project/OpenSearch/pull/13304))
+- Add dynamic action retry timeout setting ([#14022](https://github.com/opensearch-project/OpenSearch/issues/14022))
- [Remote Store] Add support to disable flush based on translog reader count ([#14027](https://github.com/opensearch-project/OpenSearch/pull/14027))
+- [Query Insights] Add exporter support for top n queries ([#12982](https://github.com/opensearch-project/OpenSearch/pull/12982))
+- [Query Insights] Add X-Opaque-Id to search request metadata for top n queries ([#13374](https://github.com/opensearch-project/OpenSearch/pull/13374))
### Dependencies
- Bump `com.github.spullara.mustache.java:compiler` from 0.9.10 to 0.9.13 ([#13329](https://github.com/opensearch-project/OpenSearch/pull/13329), [#13559](https://github.com/opensearch-project/OpenSearch/pull/13559))
@@ -41,6 +45,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add ability for Boolean and date field queries to run when only doc_values are enabled ([#11650](https://github.com/opensearch-project/OpenSearch/pull/11650))
- Refactor implementations of query phase searcher, allow QueryCollectorContext to have zero collectors ([#13481](https://github.com/opensearch-project/OpenSearch/pull/13481))
- Adds support to inject telemetry instances to plugins ([#13636](https://github.com/opensearch-project/OpenSearch/pull/13636))
+- Move cache removal notifications outside lru lock ([#14017](https://github.com/opensearch-project/OpenSearch/pull/14017))
### Deprecated
@@ -55,6 +60,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Painless: ensure type "UnmodifiableMap" for params ([#13885](https://github.com/opensearch-project/OpenSearch/pull/13885))
- Pass parent filter to inner hit query ([#13903](https://github.com/opensearch-project/OpenSearch/pull/13903))
- Fix NPE on restore searchable snapshot ([#13911](https://github.com/opensearch-project/OpenSearch/pull/13911))
+- Fix double invocation of postCollection when MultiBucketCollector is present ([#14015](https://github.com/opensearch-project/OpenSearch/pull/14015))
### Security
diff --git a/buildSrc/src/main/java/org/opensearch/gradle/OpenSearchTestBasePlugin.java b/buildSrc/src/main/java/org/opensearch/gradle/OpenSearchTestBasePlugin.java
index d0cb2da9c1dd3..2ea8c2d015ecc 100644
--- a/buildSrc/src/main/java/org/opensearch/gradle/OpenSearchTestBasePlugin.java
+++ b/buildSrc/src/main/java/org/opensearch/gradle/OpenSearchTestBasePlugin.java
@@ -110,7 +110,7 @@ public void execute(Task t) {
if (BuildParams.getRuntimeJavaVersion() == JavaVersion.VERSION_1_8) {
test.systemProperty("java.locale.providers", "SPI,JRE");
} else {
- test.systemProperty("java.locale.providers", "SPI,CLDR");
+ test.systemProperty("java.locale.providers", "SPI,COMPAT");
if (test.getJavaVersion().compareTo(JavaVersion.VERSION_17) < 0) {
test.jvmArgs("--illegal-access=warn");
}
diff --git a/distribution/tools/launchers/src/main/java/org/opensearch/tools/launchers/SystemJvmOptions.java b/distribution/tools/launchers/src/main/java/org/opensearch/tools/launchers/SystemJvmOptions.java
index af7138569972a..726c381db09f6 100644
--- a/distribution/tools/launchers/src/main/java/org/opensearch/tools/launchers/SystemJvmOptions.java
+++ b/distribution/tools/launchers/src/main/java/org/opensearch/tools/launchers/SystemJvmOptions.java
@@ -105,8 +105,13 @@ private static String javaLocaleProviders() {
SPI setting is used to allow loading custom CalendarDataProvider
in jdk8 it has to be loaded from jre/lib/ext,
in jdk9+ it is already within ES project and on a classpath
+
+ Due to internationalization enhancements in JDK 9 OpenSearch need to set the provider to COMPAT otherwise time/date
+ parsing will break in an incompatible way for some date patterns and locales.
+ //TODO COMPAT will be deprecated in at some point, see please https://bugs.openjdk.java.net/browse/JDK-8232906
+ See also: documentation in server/org.opensearch.common.time.IsoCalendarDataProvider
*/
- return "-Djava.locale.providers=SPI,CLDR";
+ return "-Djava.locale.providers=SPI,COMPAT";
}
}
diff --git a/gradle/ide.gradle b/gradle/ide.gradle
index e266d9add172d..ea353f8d92bdd 100644
--- a/gradle/ide.gradle
+++ b/gradle/ide.gradle
@@ -81,7 +81,7 @@ if (System.getProperty('idea.active') == 'true') {
}
runConfigurations {
defaults(JUnit) {
- vmParameters = '-ea -Djava.locale.providers=SPI,CLDR'
+ vmParameters = '-ea -Djava.locale.providers=SPI,COMPAT'
if (BuildParams.runtimeJavaVersion > JavaVersion.VERSION_17) {
vmParameters += ' -Djava.security.manager=allow'
}
diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java
index 4d7e0d486068a..22831c3e0f8ba 100644
--- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java
+++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java
@@ -70,7 +70,7 @@ public Collection createComponents(
final Supplier repositoriesServiceSupplier
) {
// create top n queries service
- final QueryInsightsService queryInsightsService = new QueryInsightsService(threadPool);
+ final QueryInsightsService queryInsightsService = new QueryInsightsService(clusterService.getClusterSettings(), threadPool, client);
return List.of(queryInsightsService, new QueryInsightsListener(clusterService, queryInsightsService));
}
@@ -110,7 +110,8 @@ public List> getSettings() {
// Settings for top N queries
QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED,
QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE,
- QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE
+ QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE,
+ QueryInsightsSettings.TOP_N_LATENCY_EXPORTER_SETTINGS
);
}
}
diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/DebugExporter.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/DebugExporter.java
new file mode 100644
index 0000000000000..116bd26e1f9bc
--- /dev/null
+++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/DebugExporter.java
@@ -0,0 +1,61 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.plugin.insights.core.exporter;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;
+
+import java.util.List;
+
+/**
+ * Debug exporter for development purpose
+ */
+public final class DebugExporter implements QueryInsightsExporter {
+ /**
+ * Logger of the debug exporter
+ */
+ private final Logger logger = LogManager.getLogger();
+
+ /**
+ * Constructor of DebugExporter
+ */
+ private DebugExporter() {}
+
+ private static class InstanceHolder {
+ private static final DebugExporter INSTANCE = new DebugExporter();
+ }
+
+ /**
+ Get the singleton instance of DebugExporter
+ *
+ @return DebugExporter instance
+ */
+ public static DebugExporter getInstance() {
+ return InstanceHolder.INSTANCE;
+ }
+
+ /**
+ * Write the list of SearchQueryRecord to debug log
+ *
+ * @param records list of {@link SearchQueryRecord}
+ */
+ @Override
+ public void export(final List records) {
+ logger.debug("QUERY_INSIGHTS_RECORDS: " + records.toString());
+ }
+
+ /**
+ * Close the debugger exporter sink
+ */
+ @Override
+ public void close() {
+ logger.debug("Closing the DebugExporter..");
+ }
+}
diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporter.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporter.java
new file mode 100644
index 0000000000000..c19fe3655098b
--- /dev/null
+++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporter.java
@@ -0,0 +1,113 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.plugin.insights.core.exporter;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.opensearch.action.bulk.BulkRequestBuilder;
+import org.opensearch.action.bulk.BulkResponse;
+import org.opensearch.action.index.IndexRequest;
+import org.opensearch.client.Client;
+import org.opensearch.common.unit.TimeValue;
+import org.opensearch.common.xcontent.XContentFactory;
+import org.opensearch.core.action.ActionListener;
+import org.opensearch.core.xcontent.ToXContent;
+import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.format.DateTimeFormatter;
+
+import java.util.List;
+
+/**
+ * Local index exporter for exporting query insights data to local OpenSearch indices.
+ */
+public final class LocalIndexExporter implements QueryInsightsExporter {
+ /**
+ * Logger of the local index exporter
+ */
+ private final Logger logger = LogManager.getLogger();
+ private final Client client;
+ private DateTimeFormatter indexPattern;
+
+ /**
+ * Constructor of LocalIndexExporter
+ *
+ * @param client OS client
+ * @param indexPattern the pattern of index to export to
+ */
+ public LocalIndexExporter(final Client client, final DateTimeFormatter indexPattern) {
+ this.indexPattern = indexPattern;
+ this.client = client;
+ }
+
+ /**
+ * Getter of indexPattern
+ *
+ * @return indexPattern
+ */
+ public DateTimeFormatter getIndexPattern() {
+ return indexPattern;
+ }
+
+ /**
+ * Setter of indexPattern
+ *
+ * @param indexPattern index pattern
+ * @return the current LocalIndexExporter
+ */
+ public LocalIndexExporter setIndexPattern(DateTimeFormatter indexPattern) {
+ this.indexPattern = indexPattern;
+ return this;
+ }
+
+ /**
+ * Export a list of SearchQueryRecord to a local index
+ *
+ * @param records list of {@link SearchQueryRecord}
+ */
+ @Override
+ public void export(final List records) {
+ if (records == null || records.size() == 0) {
+ return;
+ }
+ try {
+ final String index = getDateTimeFromFormat();
+ final BulkRequestBuilder bulkRequestBuilder = client.prepareBulk().setTimeout(TimeValue.timeValueMinutes(1));
+ for (SearchQueryRecord record : records) {
+ bulkRequestBuilder.add(
+ new IndexRequest(index).source(record.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
+ );
+ }
+ bulkRequestBuilder.execute(new ActionListener() {
+ @Override
+ public void onResponse(BulkResponse bulkItemResponses) {}
+
+ @Override
+ public void onFailure(Exception e) {
+ logger.error("Failed to execute bulk operation for query insights data: ", e);
+ }
+ });
+ } catch (final Exception e) {
+ logger.error("Unable to index query insights data: ", e);
+ }
+ }
+
+ /**
+ * Close the exporter sink
+ */
+ @Override
+ public void close() {
+ logger.debug("Closing the LocalIndexExporter..");
+ }
+
+ private String getDateTimeFromFormat() {
+ return indexPattern.print(DateTime.now(DateTimeZone.UTC));
+ }
+}
diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporter.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporter.java
new file mode 100644
index 0000000000000..42e5354eb1640
--- /dev/null
+++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporter.java
@@ -0,0 +1,26 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.plugin.insights.core.exporter;
+
+import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;
+
+import java.io.Closeable;
+import java.util.List;
+
+/**
+ * Base interface for Query Insights exporters
+ */
+public interface QueryInsightsExporter extends Closeable {
+ /**
+ * Export a list of SearchQueryRecord to the exporter sink
+ *
+ * @param records list of {@link SearchQueryRecord}
+ */
+ void export(final List records);
+}
diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporterFactory.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporterFactory.java
new file mode 100644
index 0000000000000..7324590c9f582
--- /dev/null
+++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporterFactory.java
@@ -0,0 +1,143 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.plugin.insights.core.exporter;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.opensearch.client.Client;
+import org.opensearch.common.settings.Settings;
+import org.joda.time.format.DateTimeFormat;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Locale;
+import java.util.Set;
+
+import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_TOP_N_LATENCY_QUERIES_INDEX_PATTERN;
+import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_TOP_QUERIES_EXPORTER_TYPE;
+import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.EXPORTER_TYPE;
+import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.EXPORT_INDEX;
+
+/**
+ * Factory class for validating and creating exporters based on provided settings
+ */
+public class QueryInsightsExporterFactory {
+ /**
+ * Logger of the query insights exporter factory
+ */
+ private final Logger logger = LogManager.getLogger();
+ final private Client client;
+ final private Set exporters;
+
+ /**
+ * Constructor of QueryInsightsExporterFactory
+ *
+ * @param client OS client
+ */
+ public QueryInsightsExporterFactory(final Client client) {
+ this.client = client;
+ this.exporters = new HashSet<>();
+ }
+
+ /**
+ * Validate exporter sink config
+ *
+ * @param settings exporter sink config {@link Settings}
+ * @throws IllegalArgumentException if provided exporter sink config settings are invalid
+ */
+ public void validateExporterConfig(final Settings settings) throws IllegalArgumentException {
+ // Disable exporter if the EXPORTER_TYPE setting is null
+ if (settings.get(EXPORTER_TYPE) == null) {
+ return;
+ }
+ SinkType type;
+ try {
+ type = SinkType.parse(settings.get(EXPORTER_TYPE, DEFAULT_TOP_QUERIES_EXPORTER_TYPE));
+ } catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException(
+ String.format(
+ Locale.ROOT,
+ "Invalid exporter type [%s], type should be one of %s",
+ settings.get(EXPORTER_TYPE),
+ SinkType.allSinkTypes()
+ )
+ );
+ }
+ switch (type) {
+ case LOCAL_INDEX:
+ final String indexPattern = settings.get(EXPORT_INDEX, DEFAULT_TOP_N_LATENCY_QUERIES_INDEX_PATTERN);
+ if (indexPattern.length() == 0) {
+ throw new IllegalArgumentException("Empty index pattern configured for the exporter");
+ }
+ try {
+ DateTimeFormat.forPattern(indexPattern);
+ } catch (Exception e) {
+ throw new IllegalArgumentException(
+ String.format(Locale.ROOT, "Invalid index pattern [%s] configured for the exporter", indexPattern)
+ );
+ }
+ }
+ }
+
+ /**
+ * Create an exporter based on provided parameters
+ *
+ * @param type The type of exporter to create
+ * @param indexPattern the index pattern if creating a index exporter
+ * @return QueryInsightsExporter the created exporter sink
+ */
+ public QueryInsightsExporter createExporter(SinkType type, String indexPattern) {
+ if (SinkType.LOCAL_INDEX.equals(type)) {
+ QueryInsightsExporter exporter = new LocalIndexExporter(client, DateTimeFormat.forPattern(indexPattern));
+ this.exporters.add(exporter);
+ return exporter;
+ }
+ return DebugExporter.getInstance();
+ }
+
+ /**
+ * Update an exporter based on provided parameters
+ *
+ * @param exporter The exporter to update
+ * @param indexPattern the index pattern if creating a index exporter
+ * @return QueryInsightsExporter the updated exporter sink
+ */
+ public QueryInsightsExporter updateExporter(QueryInsightsExporter exporter, String indexPattern) {
+ if (exporter.getClass() == LocalIndexExporter.class) {
+ ((LocalIndexExporter) exporter).setIndexPattern(DateTimeFormat.forPattern(indexPattern));
+ }
+ return exporter;
+ }
+
+ /**
+ * Close an exporter
+ *
+ * @param exporter the exporter to close
+ */
+ public void closeExporter(QueryInsightsExporter exporter) throws IOException {
+ if (exporter != null) {
+ exporter.close();
+ this.exporters.remove(exporter);
+ }
+ }
+
+ /**
+ * Close all exporters
+ *
+ */
+ public void closeAllExporters() {
+ for (QueryInsightsExporter exporter : exporters) {
+ try {
+ closeExporter(exporter);
+ } catch (IOException e) {
+ logger.error("Fail to close query insights exporter, error: ", e);
+ }
+ }
+ }
+}
diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/SinkType.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/SinkType.java
new file mode 100644
index 0000000000000..c90c9c76b6706
--- /dev/null
+++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/SinkType.java
@@ -0,0 +1,66 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.plugin.insights.core.exporter;
+
+import java.util.Arrays;
+import java.util.Locale;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Type of supported sinks
+ */
+public enum SinkType {
+ /** debug exporter */
+ DEBUG("debug"),
+ /** local index exporter */
+ LOCAL_INDEX("local_index");
+
+ private final String type;
+
+ SinkType(String type) {
+ this.type = type;
+ }
+
+ @Override
+ public String toString() {
+ return type;
+ }
+
+ /**
+ * Parse SinkType from String
+ * @param type the String representation of the SinkType
+ * @return SinkType
+ */
+ public static SinkType parse(final String type) {
+ return valueOf(type.toUpperCase(Locale.ROOT));
+ }
+
+ /**
+ * Get all valid SinkTypes
+ *
+ * @return A set contains all valid SinkTypes
+ */
+ public static Set allSinkTypes() {
+ return Arrays.stream(values()).collect(Collectors.toSet());
+ }
+
+ /**
+ * Get Sink type from exporter
+ *
+ * @param exporter the {@link QueryInsightsExporter}
+ * @return SinkType associated with this exporter
+ */
+ public static SinkType getSinkTypeFromExporter(QueryInsightsExporter exporter) {
+ if (exporter.getClass().equals(LocalIndexExporter.class)) {
+ return SinkType.LOCAL_INDEX;
+ }
+ return SinkType.DEBUG;
+ }
+}
diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/package-info.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/package-info.java
new file mode 100644
index 0000000000000..7164411194f85
--- /dev/null
+++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/package-info.java
@@ -0,0 +1,12 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+/**
+ * Query Insights exporter
+ */
+package org.opensearch.plugin.insights.core.exporter;
diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java
index 9ec8673147c38..cad2fe374f1b6 100644
--- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java
+++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java
@@ -21,6 +21,7 @@
import org.opensearch.plugin.insights.rules.model.Attribute;
import org.opensearch.plugin.insights.rules.model.MetricType;
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;
+import org.opensearch.tasks.Task;
import java.util.Collections;
import java.util.HashMap;
@@ -138,6 +139,15 @@ public void onRequestEnd(final SearchPhaseContext context, final SearchRequestCo
attributes.put(Attribute.TOTAL_SHARDS, context.getNumShards());
attributes.put(Attribute.INDICES, request.indices());
attributes.put(Attribute.PHASE_LATENCY_MAP, searchRequestContext.phaseTookMap());
+
+ Map labels = new HashMap<>();
+ // Retrieve user provided label if exists
+ String userProvidedLabel = context.getTask().getHeader(Task.X_OPAQUE_ID);
+ if (userProvidedLabel != null) {
+ labels.put(Task.X_OPAQUE_ID, userProvidedLabel);
+ }
+ attributes.put(Attribute.LABELS, labels);
+ // construct SearchQueryRecord from attributes and measurements
SearchQueryRecord record = new SearchQueryRecord(request.getOrCreateAbsoluteStartMillis(), measurements, attributes);
queryInsightsService.addRecord(record);
} catch (Exception e) {
diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java
index 525ca0d4a3d33..a83bb2094f165 100644
--- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java
+++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java
@@ -8,14 +8,18 @@
package org.opensearch.plugin.insights.core.service;
+import org.opensearch.client.Client;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
+import org.opensearch.common.settings.ClusterSettings;
+import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporterFactory;
import org.opensearch.plugin.insights.rules.model.MetricType;
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;
import org.opensearch.plugin.insights.settings.QueryInsightsSettings;
import org.opensearch.threadpool.Scheduler;
import org.opensearch.threadpool.ThreadPool;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
@@ -23,6 +27,8 @@
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
+import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_LATENCY_EXPORTER_SETTINGS;
+
/**
* Service responsible for gathering, analyzing, storing and exporting
* information related to search queries
@@ -56,21 +62,35 @@ public class QueryInsightsService extends AbstractLifecycleComponent {
*/
protected volatile Scheduler.Cancellable scheduledFuture;
+ /**
+ * Query Insights exporter factory
+ */
+ final QueryInsightsExporterFactory queryInsightsExporterFactory;
+
/**
* Constructor of the QueryInsightsService
*
- * @param threadPool The OpenSearch thread pool to run async tasks
+ * @param clusterSettings OpenSearch cluster level settings
+ * @param threadPool The OpenSearch thread pool to run async tasks
+ * @param client OS client
*/
@Inject
- public QueryInsightsService(final ThreadPool threadPool) {
+ public QueryInsightsService(final ClusterSettings clusterSettings, final ThreadPool threadPool, final Client client) {
enableCollect = new HashMap<>();
queryRecordsQueue = new LinkedBlockingQueue<>(QueryInsightsSettings.QUERY_RECORD_QUEUE_CAPACITY);
+ this.threadPool = threadPool;
+ this.queryInsightsExporterFactory = new QueryInsightsExporterFactory(client);
+ // initialize top n queries services and configurations consumers
topQueriesServices = new HashMap<>();
for (MetricType metricType : MetricType.allMetricTypes()) {
enableCollect.put(metricType, false);
- topQueriesServices.put(metricType, new TopQueriesService(metricType));
+ topQueriesServices.put(metricType, new TopQueriesService(metricType, threadPool, queryInsightsExporterFactory));
}
- this.threadPool = threadPool;
+ clusterSettings.addSettingsUpdateConsumer(
+ TOP_N_LATENCY_EXPORTER_SETTINGS,
+ (settings -> getTopQueriesService(MetricType.LATENCY).setExporter(settings)),
+ (settings -> getTopQueriesService(MetricType.LATENCY).validateExporterConfig(settings))
+ );
}
/**
@@ -176,5 +196,12 @@ protected void doStop() {
}
@Override
- protected void doClose() {}
+ protected void doClose() throws IOException {
+ // close all top n queries service
+ for (TopQueriesService topQueriesService : topQueriesServices.values()) {
+ topQueriesService.close();
+ }
+ // close any unclosed resources
+ queryInsightsExporterFactory.closeAllExporters();
+ }
}
diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesService.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesService.java
index d2c30cbdf98e7..ff90edf1ec33d 100644
--- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesService.java
+++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesService.java
@@ -8,11 +8,19 @@
package org.opensearch.plugin.insights.core.service;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
+import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporter;
+import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporterFactory;
+import org.opensearch.plugin.insights.core.exporter.SinkType;
import org.opensearch.plugin.insights.rules.model.MetricType;
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;
import org.opensearch.plugin.insights.settings.QueryInsightsSettings;
+import org.opensearch.threadpool.ThreadPool;
+import java.io.IOException;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
@@ -27,6 +35,12 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_TOP_N_LATENCY_QUERIES_INDEX_PATTERN;
+import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_TOP_QUERIES_EXPORTER_TYPE;
+import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.EXPORTER_TYPE;
+import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.EXPORT_INDEX;
+import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.QUERY_INSIGHTS_EXECUTOR;
+
/**
* Service responsible for gathering and storing top N queries
* with high latency or resource usage
@@ -34,6 +48,10 @@
* @opensearch.internal
*/
public class TopQueriesService {
+ /**
+ * Logger of the local index exporter
+ */
+ private final Logger logger = LogManager.getLogger();
private boolean enabled;
/**
* The metric type to measure top n queries
@@ -63,12 +81,34 @@ public class TopQueriesService {
*/
private final AtomicReference> topQueriesHistorySnapshot;
- TopQueriesService(final MetricType metricType) {
+ /**
+ * Factory for validating and creating exporters
+ */
+ private final QueryInsightsExporterFactory queryInsightsExporterFactory;
+
+ /**
+ * The internal OpenSearch thread pool that execute async processing and exporting tasks
+ */
+ private final ThreadPool threadPool;
+
+ /**
+ * Exporter for exporting top queries data
+ */
+ private QueryInsightsExporter exporter;
+
+ TopQueriesService(
+ final MetricType metricType,
+ final ThreadPool threadPool,
+ final QueryInsightsExporterFactory queryInsightsExporterFactory
+ ) {
this.enabled = false;
this.metricType = metricType;
+ this.threadPool = threadPool;
+ this.queryInsightsExporterFactory = queryInsightsExporterFactory;
this.topNSize = QueryInsightsSettings.DEFAULT_TOP_N_SIZE;
this.windowSize = QueryInsightsSettings.DEFAULT_WINDOW_SIZE;
this.windowStart = -1L;
+ this.exporter = null;
topQueriesStore = new PriorityQueue<>(topNSize, (a, b) -> SearchQueryRecord.compare(a, b, metricType));
topQueriesCurrentSnapshot = new AtomicReference<>(new ArrayList<>());
topQueriesHistorySnapshot = new AtomicReference<>(new ArrayList<>());
@@ -169,6 +209,50 @@ public void validateWindowSize(final TimeValue windowSize) {
}
}
+ /**
+ * Set up the top queries exporter based on provided settings
+ *
+ * @param settings exporter config {@link Settings}
+ */
+ public void setExporter(final Settings settings) {
+ if (settings.get(EXPORTER_TYPE) != null) {
+ SinkType expectedType = SinkType.parse(settings.get(EXPORTER_TYPE, DEFAULT_TOP_QUERIES_EXPORTER_TYPE));
+ if (exporter != null && expectedType == SinkType.getSinkTypeFromExporter(exporter)) {
+ queryInsightsExporterFactory.updateExporter(
+ exporter,
+ settings.get(EXPORT_INDEX, DEFAULT_TOP_N_LATENCY_QUERIES_INDEX_PATTERN)
+ );
+ } else {
+ try {
+ queryInsightsExporterFactory.closeExporter(this.exporter);
+ } catch (IOException e) {
+ logger.error("Fail to close the current exporter when updating exporter, error: ", e);
+ }
+ this.exporter = queryInsightsExporterFactory.createExporter(
+ SinkType.parse(settings.get(EXPORTER_TYPE, DEFAULT_TOP_QUERIES_EXPORTER_TYPE)),
+ settings.get(EXPORT_INDEX, DEFAULT_TOP_N_LATENCY_QUERIES_INDEX_PATTERN)
+ );
+ }
+ } else {
+ // Disable exporter if exporter type is set to null
+ try {
+ queryInsightsExporterFactory.closeExporter(this.exporter);
+ this.exporter = null;
+ } catch (IOException e) {
+ logger.error("Fail to close the current exporter when disabling exporter, error: ", e);
+ }
+ }
+ }
+
+ /**
+ * Validate provided settings for top queries exporter
+ *
+ * @param settings settings exporter config {@link Settings}
+ */
+ public void validateExporterConfig(Settings settings) {
+ queryInsightsExporterFactory.validateExporterConfig(settings);
+ }
+
/**
* Get all top queries records that are in the current top n queries store
* Optionally include top N records from the last window.
@@ -254,6 +338,10 @@ private void rotateWindowIfNecessary(final long newWindowStart) {
topQueriesStore.clear();
topQueriesCurrentSnapshot.set(new ArrayList<>());
windowStart = newWindowStart;
+ // export to the configured sink
+ if (exporter != null) {
+ threadPool.executor(QUERY_INSIGHTS_EXECUTOR).execute(() -> exporter.export(history));
+ }
}
}
@@ -279,4 +367,11 @@ private long calculateWindowStart(final long timestamp) {
public List getTopQueriesCurrentSnapshot() {
return topQueriesCurrentSnapshot.get();
}
+
+ /**
+ * Close the top n queries service
+ */
+ public void close() throws IOException {
+ queryInsightsExporterFactory.closeExporter(this.exporter);
+ }
}
diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java
index c1d17edf9ff14..7ee4883c54023 100644
--- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java
+++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java
@@ -43,7 +43,11 @@ public enum Attribute {
/**
* The node id for this request
*/
- NODE_ID;
+ NODE_ID,
+ /**
+ * Custom search request labels
+ */
+ LABELS;
/**
* Read an Attribute from a StreamInput
diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java
index 060711edb5580..fec00a680ae58 100644
--- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java
+++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java
@@ -8,9 +8,11 @@
package org.opensearch.plugin.insights.rules.model;
+import org.opensearch.core.common.Strings;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
+import org.opensearch.core.xcontent.MediaTypeRegistry;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
@@ -173,4 +175,9 @@ public boolean equals(final Object o) {
public int hashCode() {
return Objects.hash(timestamp, measurements, attributes);
}
+
+ @Override
+ public String toString() {
+ return Strings.toString(MediaTypeRegistry.JSON, this);
+ }
}
diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/settings/QueryInsightsSettings.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/settings/QueryInsightsSettings.java
index 52cc1fbde790f..b2e01062e334c 100644
--- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/settings/QueryInsightsSettings.java
+++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/settings/QueryInsightsSettings.java
@@ -9,7 +9,9 @@
package org.opensearch.plugin.insights.settings;
import org.opensearch.common.settings.Setting;
+import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
+import org.opensearch.plugin.insights.core.exporter.SinkType;
import java.util.Arrays;
import java.util.HashSet;
@@ -109,6 +111,37 @@ public class QueryInsightsSettings {
Setting.Property.Dynamic
);
+ /**
+ * Config key for exporter type
+ */
+ public static final String EXPORTER_TYPE = "type";
+ /**
+ * Config key for export index
+ */
+ public static final String EXPORT_INDEX = "config.index";
+
+ /**
+ * Settings and defaults for top queries exporters
+ */
+ private static final String TOP_N_LATENCY_QUERIES_EXPORTER_PREFIX = TOP_N_LATENCY_QUERIES_PREFIX + ".exporter.";
+ /**
+ * Default index pattern of top n queries by latency
+ */
+ public static final String DEFAULT_TOP_N_LATENCY_QUERIES_INDEX_PATTERN = "'top_queries_by_latency-'YYYY.MM.dd";
+ /**
+ * Default exporter type of top queries
+ */
+ public static final String DEFAULT_TOP_QUERIES_EXPORTER_TYPE = SinkType.LOCAL_INDEX.toString();
+
+ /**
+ * Settings for the exporter of top latency queries
+ */
+ public static final Setting TOP_N_LATENCY_EXPORTER_SETTINGS = Setting.groupSetting(
+ TOP_N_LATENCY_QUERIES_EXPORTER_PREFIX,
+ Setting.Property.Dynamic,
+ Setting.Property.NodeScope
+ );
+
/**
* Default constructor
*/
diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsPluginTests.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsPluginTests.java
index 2f353f2a53329..8b8856e3e305c 100644
--- a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsPluginTests.java
+++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsPluginTests.java
@@ -50,6 +50,7 @@ public void setup() {
clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED);
clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE);
clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE);
+ clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_EXPORTER_SETTINGS);
clusterService = ClusterServiceUtils.createClusterService(settings, clusterSettings, threadPool);
}
@@ -59,7 +60,8 @@ public void testGetSettings() {
Arrays.asList(
QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED,
QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE,
- QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE
+ QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE,
+ QueryInsightsSettings.TOP_N_LATENCY_EXPORTER_SETTINGS
),
queryInsightsPlugin.getSettings()
);
diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/exporter/DebugExporterTests.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/exporter/DebugExporterTests.java
new file mode 100644
index 0000000000000..736e406289b2c
--- /dev/null
+++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/exporter/DebugExporterTests.java
@@ -0,0 +1,37 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.plugin.insights.core.exporter;
+
+import org.opensearch.plugin.insights.QueryInsightsTestUtils;
+import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;
+import org.opensearch.test.OpenSearchTestCase;
+import org.junit.Before;
+
+import java.util.List;
+
+/**
+ * Granular tests for the {@link DebugExporterTests} class.
+ */
+public class DebugExporterTests extends OpenSearchTestCase {
+ private DebugExporter debugExporter;
+
+ @Before
+ public void setup() {
+ debugExporter = DebugExporter.getInstance();
+ }
+
+ public void testExport() {
+ List records = QueryInsightsTestUtils.generateQueryInsightRecords(2);
+ try {
+ debugExporter.export(records);
+ } catch (Exception e) {
+ fail("No exception should be thrown when exporting query insights data");
+ }
+ }
+}
diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporterTests.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporterTests.java
new file mode 100644
index 0000000000000..9ea864a7083f4
--- /dev/null
+++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporterTests.java
@@ -0,0 +1,99 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.plugin.insights.core.exporter;
+
+import org.opensearch.action.bulk.BulkAction;
+import org.opensearch.action.bulk.BulkRequestBuilder;
+import org.opensearch.action.bulk.BulkResponse;
+import org.opensearch.action.support.PlainActionFuture;
+import org.opensearch.client.Client;
+import org.opensearch.plugin.insights.QueryInsightsTestUtils;
+import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;
+import org.opensearch.test.OpenSearchTestCase;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.junit.Before;
+
+import java.util.List;
+
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+/**
+ * Granular tests for the {@link LocalIndexExporterTests} class.
+ */
+public class LocalIndexExporterTests extends OpenSearchTestCase {
+ private final DateTimeFormatter format = DateTimeFormat.forPattern("YYYY.MM.dd");
+ private final Client client = mock(Client.class);
+ private LocalIndexExporter localIndexExporter;
+
+ @Before
+ public void setup() {
+ localIndexExporter = new LocalIndexExporter(client, format);
+ }
+
+ public void testExportEmptyRecords() {
+ List records = List.of();
+ try {
+ localIndexExporter.export(records);
+ } catch (Exception e) {
+ fail("No exception should be thrown when exporting empty query insights data");
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public void testExportRecords() {
+ BulkRequestBuilder bulkRequestBuilder = spy(new BulkRequestBuilder(client, BulkAction.INSTANCE));
+ final PlainActionFuture future = mock(PlainActionFuture.class);
+ when(future.actionGet()).thenReturn(null);
+ doAnswer(invocation -> future).when(bulkRequestBuilder).execute();
+ when(client.prepareBulk()).thenReturn(bulkRequestBuilder);
+
+ List records = QueryInsightsTestUtils.generateQueryInsightRecords(2);
+ try {
+ localIndexExporter.export(records);
+ } catch (Exception e) {
+ fail("No exception should be thrown when exporting query insights data");
+ }
+ assertEquals(2, bulkRequestBuilder.numberOfActions());
+ }
+
+ @SuppressWarnings("unchecked")
+ public void testExportRecordsWithError() {
+ BulkRequestBuilder bulkRequestBuilder = spy(new BulkRequestBuilder(client, BulkAction.INSTANCE));
+ final PlainActionFuture future = mock(PlainActionFuture.class);
+ when(future.actionGet()).thenReturn(null);
+ doThrow(new RuntimeException()).when(bulkRequestBuilder).execute();
+ when(client.prepareBulk()).thenReturn(bulkRequestBuilder);
+
+ List records = QueryInsightsTestUtils.generateQueryInsightRecords(2);
+ try {
+ localIndexExporter.export(records);
+ } catch (Exception e) {
+ fail("No exception should be thrown when exporting query insights data");
+ }
+ }
+
+ public void testClose() {
+ try {
+ localIndexExporter.close();
+ } catch (Exception e) {
+ fail("No exception should be thrown when closing local index exporter");
+ }
+ }
+
+ public void testGetAndSetIndexPattern() {
+ DateTimeFormatter newFormatter = mock(DateTimeFormatter.class);
+ localIndexExporter.setIndexPattern(newFormatter);
+ assert (localIndexExporter.getIndexPattern() == newFormatter);
+ }
+}
diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporterFactoryTests.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporterFactoryTests.java
new file mode 100644
index 0000000000000..f01dd2c17509c
--- /dev/null
+++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporterFactoryTests.java
@@ -0,0 +1,89 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.plugin.insights.core.exporter;
+
+import org.opensearch.client.Client;
+import org.opensearch.common.settings.Settings;
+import org.opensearch.test.OpenSearchTestCase;
+import org.joda.time.format.DateTimeFormat;
+import org.junit.Before;
+
+import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_TOP_QUERIES_EXPORTER_TYPE;
+import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.EXPORTER_TYPE;
+import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.EXPORT_INDEX;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Granular tests for the {@link QueryInsightsExporterFactoryTests} class.
+ */
+public class QueryInsightsExporterFactoryTests extends OpenSearchTestCase {
+ private final String format = "YYYY.MM.dd";
+
+ private final Client client = mock(Client.class);
+ private QueryInsightsExporterFactory queryInsightsExporterFactory;
+
+ @Before
+ public void setup() {
+ queryInsightsExporterFactory = new QueryInsightsExporterFactory(client);
+ }
+
+ public void testValidateConfigWhenResetExporter() {
+ Settings.Builder settingsBuilder = Settings.builder();
+ // empty settings
+ Settings settings = settingsBuilder.build();
+ try {
+ queryInsightsExporterFactory.validateExporterConfig(settings);
+ } catch (Exception e) {
+ fail("No exception should be thrown when setting is null");
+ }
+ }
+
+ public void testInvalidExporterTypeConfig() {
+ Settings.Builder settingsBuilder = Settings.builder();
+ Settings settings = settingsBuilder.put(EXPORTER_TYPE, "some_invalid_type").build();
+ assertThrows(IllegalArgumentException.class, () -> { queryInsightsExporterFactory.validateExporterConfig(settings); });
+ }
+
+ public void testInvalidLocalIndexConfig() {
+ Settings.Builder settingsBuilder = Settings.builder();
+ assertThrows(IllegalArgumentException.class, () -> {
+ queryInsightsExporterFactory.validateExporterConfig(
+ settingsBuilder.put(EXPORTER_TYPE, DEFAULT_TOP_QUERIES_EXPORTER_TYPE).put(EXPORT_INDEX, "").build()
+ );
+ });
+ assertThrows(IllegalArgumentException.class, () -> {
+ queryInsightsExporterFactory.validateExporterConfig(
+ settingsBuilder.put(EXPORTER_TYPE, DEFAULT_TOP_QUERIES_EXPORTER_TYPE).put(EXPORT_INDEX, "some_invalid_pattern").build()
+ );
+ });
+ }
+
+ public void testCreateAndCloseExporter() {
+ QueryInsightsExporter exporter1 = queryInsightsExporterFactory.createExporter(SinkType.LOCAL_INDEX, format);
+ assertTrue(exporter1 instanceof LocalIndexExporter);
+ QueryInsightsExporter exporter2 = queryInsightsExporterFactory.createExporter(SinkType.DEBUG, format);
+ assertTrue(exporter2 instanceof DebugExporter);
+ QueryInsightsExporter exporter3 = queryInsightsExporterFactory.createExporter(SinkType.DEBUG, format);
+ assertTrue(exporter3 instanceof DebugExporter);
+ try {
+ queryInsightsExporterFactory.closeExporter(exporter1);
+ queryInsightsExporterFactory.closeExporter(exporter2);
+ queryInsightsExporterFactory.closeAllExporters();
+ } catch (Exception e) {
+ fail("No exception should be thrown when closing exporter");
+ }
+ }
+
+ public void testUpdateExporter() {
+ LocalIndexExporter exporter = new LocalIndexExporter(client, DateTimeFormat.forPattern("yyyy-MM-dd"));
+ queryInsightsExporterFactory.updateExporter(exporter, "yyyy-MM-dd-HH");
+ assertEquals(DateTimeFormat.forPattern("yyyy-MM-dd-HH"), exporter.getIndexPattern());
+ }
+
+}
diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java
index 328ed0cd2ed15..b794a2e4b8608 100644
--- a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java
+++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java
@@ -11,28 +11,39 @@
import org.opensearch.action.search.SearchPhaseContext;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchRequestContext;
+import org.opensearch.action.search.SearchTask;
import org.opensearch.action.search.SearchType;
import org.opensearch.cluster.service.ClusterService;
+import org.opensearch.common.collect.Tuple;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
+import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.plugin.insights.core.service.QueryInsightsService;
import org.opensearch.plugin.insights.core.service.TopQueriesService;
+import org.opensearch.plugin.insights.rules.model.Attribute;
import org.opensearch.plugin.insights.rules.model.MetricType;
+import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;
import org.opensearch.plugin.insights.settings.QueryInsightsSettings;
import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.opensearch.search.aggregations.support.ValueType;
import org.opensearch.search.builder.SearchSourceBuilder;
+import org.opensearch.tasks.Task;
import org.opensearch.test.ClusterServiceUtils;
import org.opensearch.test.OpenSearchTestCase;
+import org.opensearch.threadpool.ThreadPool;
import org.junit.Before;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Phaser;
+import org.mockito.ArgumentCaptor;
+
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
@@ -48,6 +59,7 @@ public class QueryInsightsListenerTests extends OpenSearchTestCase {
private final SearchRequest searchRequest = mock(SearchRequest.class);
private final QueryInsightsService queryInsightsService = mock(QueryInsightsService.class);
private final TopQueriesService topQueriesService = mock(TopQueriesService.class);
+ private final ThreadPool threadPool = mock(ThreadPool.class);
private ClusterService clusterService;
@Before
@@ -61,8 +73,13 @@ public void setup() {
clusterService = ClusterServiceUtils.createClusterService(settings, clusterSettings, null);
when(queryInsightsService.isCollectionEnabled(MetricType.LATENCY)).thenReturn(true);
when(queryInsightsService.getTopQueriesService(MetricType.LATENCY)).thenReturn(topQueriesService);
+
+ ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
+ threadContext.setHeaders(new Tuple<>(Collections.singletonMap(Task.X_OPAQUE_ID, "userLabel"), new HashMap<>()));
+ when(threadPool.getThreadContext()).thenReturn(threadContext);
}
+ @SuppressWarnings("unchecked")
public void testOnRequestEnd() throws InterruptedException {
Long timestamp = System.currentTimeMillis() - 100L;
SearchType searchType = SearchType.QUERY_THEN_FETCH;
@@ -70,6 +87,7 @@ public void testOnRequestEnd() throws InterruptedException {
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.aggregation(new TermsAggregationBuilder("agg1").userValueTypeHint(ValueType.STRING).field("type.keyword"));
searchSourceBuilder.size(0);
+ SearchTask task = new SearchTask(0, "n/a", "n/a", () -> "test", null, Collections.singletonMap(Task.X_OPAQUE_ID, "userLabel"));
String[] indices = new String[] { "index-1", "index-2" };
@@ -89,10 +107,19 @@ public void testOnRequestEnd() throws InterruptedException {
when(searchRequestContext.phaseTookMap()).thenReturn(phaseLatencyMap);
when(searchPhaseContext.getRequest()).thenReturn(searchRequest);
when(searchPhaseContext.getNumShards()).thenReturn(numberOfShards);
+ when(searchPhaseContext.getTask()).thenReturn(task);
+ ArgumentCaptor captor = ArgumentCaptor.forClass(SearchQueryRecord.class);
queryInsightsListener.onRequestEnd(searchPhaseContext, searchRequestContext);
- verify(queryInsightsService, times(1)).addRecord(any());
+ verify(queryInsightsService, times(1)).addRecord(captor.capture());
+ SearchQueryRecord generatedRecord = captor.getValue();
+ assertEquals(timestamp.longValue(), generatedRecord.getTimestamp());
+ assertEquals(numberOfShards, generatedRecord.getAttributes().get(Attribute.TOTAL_SHARDS));
+ assertEquals(searchType.toString().toLowerCase(Locale.ROOT), generatedRecord.getAttributes().get(Attribute.SEARCH_TYPE));
+ assertEquals(searchSourceBuilder.toString(), generatedRecord.getAttributes().get(Attribute.SOURCE));
+ Map labels = (Map) generatedRecord.getAttributes().get(Attribute.LABELS);
+ assertEquals("userLabel", labels.get(Task.X_OPAQUE_ID));
}
public void testConcurrentOnRequestEnd() throws InterruptedException {
@@ -102,6 +129,7 @@ public void testConcurrentOnRequestEnd() throws InterruptedException {
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.aggregation(new TermsAggregationBuilder("agg1").userValueTypeHint(ValueType.STRING).field("type.keyword"));
searchSourceBuilder.size(0);
+ SearchTask task = new SearchTask(0, "n/a", "n/a", () -> "test", null, Collections.singletonMap(Task.X_OPAQUE_ID, "userLabel"));
String[] indices = new String[] { "index-1", "index-2" };
@@ -121,6 +149,7 @@ public void testConcurrentOnRequestEnd() throws InterruptedException {
when(searchRequestContext.phaseTookMap()).thenReturn(phaseLatencyMap);
when(searchPhaseContext.getRequest()).thenReturn(searchRequest);
when(searchPhaseContext.getNumShards()).thenReturn(numberOfShards);
+ when(searchPhaseContext.getTask()).thenReturn(task);
int numRequests = 50;
Thread[] threads = new Thread[numRequests];
diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/service/QueryInsightsServiceTests.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/service/QueryInsightsServiceTests.java
index c29b48b9690d1..428f615ce2f90 100644
--- a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/service/QueryInsightsServiceTests.java
+++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/service/QueryInsightsServiceTests.java
@@ -8,6 +8,9 @@
package org.opensearch.plugin.insights.core.service;
+import org.opensearch.client.Client;
+import org.opensearch.common.settings.ClusterSettings;
+import org.opensearch.common.settings.Settings;
import org.opensearch.plugin.insights.QueryInsightsTestUtils;
import org.opensearch.plugin.insights.rules.model.MetricType;
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;
@@ -23,11 +26,16 @@
*/
public class QueryInsightsServiceTests extends OpenSearchTestCase {
private final ThreadPool threadPool = mock(ThreadPool.class);
+ private final Client client = mock(Client.class);
private QueryInsightsService queryInsightsService;
@Before
public void setup() {
- queryInsightsService = new QueryInsightsService(threadPool);
+ Settings.Builder settingsBuilder = Settings.builder();
+ Settings settings = settingsBuilder.build();
+ ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
+ clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_EXPORTER_SETTINGS);
+ queryInsightsService = new QueryInsightsService(clusterSettings, threadPool, client);
queryInsightsService.enableCollection(MetricType.LATENCY, true);
queryInsightsService.enableCollection(MetricType.CPU, true);
queryInsightsService.enableCollection(MetricType.JVM, true);
@@ -46,4 +54,12 @@ public void testAddRecordToLimitAndDrain() {
queryInsightsService.getTopQueriesService(MetricType.LATENCY).getTopQueriesRecords(false).size()
);
}
+
+ public void testClose() {
+ try {
+ queryInsightsService.doClose();
+ } catch (Exception e) {
+ fail("No exception expected when closing query insights service");
+ }
+ }
}
diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/service/TopQueriesServiceTests.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/service/TopQueriesServiceTests.java
index 060df84a89485..3efd4c86833cc 100644
--- a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/service/TopQueriesServiceTests.java
+++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/service/TopQueriesServiceTests.java
@@ -11,24 +11,30 @@
import org.opensearch.cluster.coordination.DeterministicTaskQueue;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.plugin.insights.QueryInsightsTestUtils;
+import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporterFactory;
import org.opensearch.plugin.insights.rules.model.MetricType;
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;
import org.opensearch.plugin.insights.settings.QueryInsightsSettings;
import org.opensearch.test.OpenSearchTestCase;
+import org.opensearch.threadpool.ThreadPool;
import org.junit.Before;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import static org.mockito.Mockito.mock;
+
/**
* Unit Tests for {@link QueryInsightsService}.
*/
public class TopQueriesServiceTests extends OpenSearchTestCase {
private TopQueriesService topQueriesService;
+ private final ThreadPool threadPool = mock(ThreadPool.class);
+ private final QueryInsightsExporterFactory queryInsightsExporterFactory = mock(QueryInsightsExporterFactory.class);
@Before
public void setup() {
- topQueriesService = new TopQueriesService(MetricType.LATENCY);
+ topQueriesService = new TopQueriesService(MetricType.LATENCY, threadPool, queryInsightsExporterFactory);
topQueriesService.setTopNSize(Integer.MAX_VALUE);
topQueriesService.setWindowSize(new TimeValue(Long.MAX_VALUE));
topQueriesService.setEnabled(true);
diff --git a/server/src/internalClusterTest/java/org/opensearch/recovery/FullRollingRestartIT.java b/server/src/internalClusterTest/java/org/opensearch/recovery/FullRollingRestartIT.java
index 0752ab7c9d0f1..d9e3cec426edf 100644
--- a/server/src/internalClusterTest/java/org/opensearch/recovery/FullRollingRestartIT.java
+++ b/server/src/internalClusterTest/java/org/opensearch/recovery/FullRollingRestartIT.java
@@ -36,6 +36,7 @@
import org.opensearch.action.admin.cluster.health.ClusterHealthRequestBuilder;
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
+import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.action.admin.indices.recovery.RecoveryResponse;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
@@ -45,6 +46,8 @@
import org.opensearch.common.collect.MapBuilder;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
+import org.opensearch.common.xcontent.XContentFactory;
+import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope;
import org.opensearch.test.OpenSearchIntegTestCase.Scope;
@@ -253,4 +256,144 @@ public void testNoRebalanceOnRollingRestart() throws Exception {
);
}
}
+
+ public void testFullRollingRestart_withNoRecoveryPayloadAndSource() throws Exception {
+ internalCluster().startNode();
+ XContentBuilder builder = XContentFactory.jsonBuilder()
+ .startObject()
+ .startObject("_source")
+ .field("enabled")
+ .value(false)
+ .field("recovery_source_enabled")
+ .value(false)
+ .endObject()
+ .endObject();
+ CreateIndexResponse response = prepareCreate("test").setMapping(builder).get();
+ logger.info("Create index response is : {}", response);
+
+ final String healthTimeout = "1m";
+
+ for (int i = 0; i < 1000; i++) {
+ client().prepareIndex("test")
+ .setId(Long.toString(i))
+ .setSource(MapBuilder.newMapBuilder().put("test", "value" + i).map())
+ .execute()
+ .actionGet();
+ }
+
+ for (int i = 1000; i < 2000; i++) {
+ client().prepareIndex("test")
+ .setId(Long.toString(i))
+ .setSource(MapBuilder.newMapBuilder().put("test", "value" + i).map())
+ .execute()
+ .actionGet();
+ }
+ // ensuring all docs are committed to file system
+ flush();
+
+ logger.info("--> now start adding nodes");
+ internalCluster().startNode();
+ internalCluster().startNode();
+
+ // make sure the cluster state is green, and all has been recovered
+ assertTimeout(
+ client().admin()
+ .cluster()
+ .prepareHealth()
+ .setWaitForEvents(Priority.LANGUID)
+ .setTimeout(healthTimeout)
+ .setWaitForGreenStatus()
+ .setWaitForNoRelocatingShards(true)
+ .setWaitForNodes("3")
+ );
+
+ logger.info("--> add two more nodes");
+ internalCluster().startNode();
+ internalCluster().startNode();
+
+ // make sure the cluster state is green, and all has been recovered
+ assertTimeout(
+ client().admin()
+ .cluster()
+ .prepareHealth()
+ .setWaitForEvents(Priority.LANGUID)
+ .setTimeout(healthTimeout)
+ .setWaitForGreenStatus()
+ .setWaitForNoRelocatingShards(true)
+ .setWaitForNodes("5")
+ );
+
+ logger.info("--> refreshing and checking data");
+ refreshAndWaitForReplication();
+ for (int i = 0; i < 10; i++) {
+ assertHitCount(client().prepareSearch().setSize(0).setQuery(matchAllQuery()).get(), 2000L);
+ }
+
+ // now start shutting nodes down
+ internalCluster().stopRandomDataNode();
+ // make sure the cluster state is green, and all has been recovered
+ assertTimeout(
+ client().admin()
+ .cluster()
+ .prepareHealth()
+ .setWaitForEvents(Priority.LANGUID)
+ .setTimeout(healthTimeout)
+ .setWaitForGreenStatus()
+ .setWaitForNoRelocatingShards(true)
+ .setWaitForNodes("4")
+ );
+
+ internalCluster().stopRandomDataNode();
+ // make sure the cluster state is green, and all has been recovered
+ assertTimeout(
+ client().admin()
+ .cluster()
+ .prepareHealth()
+ .setWaitForEvents(Priority.LANGUID)
+ .setTimeout(healthTimeout)
+ .setWaitForGreenStatus()
+ .setWaitForNoRelocatingShards(true)
+ .setWaitForNodes("3")
+ );
+
+ logger.info("--> stopped two nodes, verifying data");
+ refreshAndWaitForReplication();
+ for (int i = 0; i < 10; i++) {
+ assertHitCount(client().prepareSearch().setSize(0).setQuery(matchAllQuery()).get(), 2000L);
+ }
+
+ // closing the 3rd node
+ internalCluster().stopRandomDataNode();
+ // make sure the cluster state is green, and all has been recovered
+ assertTimeout(
+ client().admin()
+ .cluster()
+ .prepareHealth()
+ .setWaitForEvents(Priority.LANGUID)
+ .setTimeout(healthTimeout)
+ .setWaitForGreenStatus()
+ .setWaitForNoRelocatingShards(true)
+ .setWaitForNodes("2")
+ );
+
+ internalCluster().stopRandomDataNode();
+
+ // make sure the cluster state is yellow, and all has been recovered
+ assertTimeout(
+ client().admin()
+ .cluster()
+ .prepareHealth()
+ .setWaitForEvents(Priority.LANGUID)
+ .setTimeout(healthTimeout)
+ .setWaitForYellowStatus()
+ .setWaitForNoRelocatingShards(true)
+ .setWaitForNodes("1")
+ );
+
+ logger.info("--> one node left, verifying data");
+ refreshAndWaitForReplication();
+ for (int i = 0; i < 10; i++) {
+ assertHitCount(client().prepareSearch().setSize(0).setQuery(matchAllQuery()).get(), 2000L);
+ }
+ }
}
diff --git a/server/src/internalClusterTest/java/org/opensearch/search/aggregations/bucket/terms/StringTermsIT.java b/server/src/internalClusterTest/java/org/opensearch/search/aggregations/bucket/terms/StringTermsIT.java
index edf9cd432dda2..f5d018b2ef491 100644
--- a/server/src/internalClusterTest/java/org/opensearch/search/aggregations/bucket/terms/StringTermsIT.java
+++ b/server/src/internalClusterTest/java/org/opensearch/search/aggregations/bucket/terms/StringTermsIT.java
@@ -42,10 +42,12 @@
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.script.Script;
import org.opensearch.script.ScriptType;
+import org.opensearch.search.aggregations.AggregationBuilders;
import org.opensearch.search.aggregations.AggregationExecutionException;
import org.opensearch.search.aggregations.Aggregator.SubAggCollectionMode;
import org.opensearch.search.aggregations.BucketOrder;
import org.opensearch.search.aggregations.bucket.filter.Filter;
+import org.opensearch.search.aggregations.bucket.filter.InternalFilters;
import org.opensearch.search.aggregations.bucket.terms.Terms.Bucket;
import org.opensearch.search.aggregations.metrics.Avg;
import org.opensearch.search.aggregations.metrics.ExtendedStats;
@@ -999,6 +1001,72 @@ public void testOtherDocCount() {
testOtherDocCount(SINGLE_VALUED_FIELD_NAME, MULTI_VALUED_FIELD_NAME);
}
+ public void testDeferredSubAggs() {
+ // Tests subAgg doc count is the same with different collection modes and additional top level aggs
+ SearchResponse r1 = client().prepareSearch("idx")
+ .setSize(0)
+ .addAggregation(
+ terms("terms1").collectMode(SubAggCollectionMode.BREADTH_FIRST)
+ .field("s_value")
+ .size(2)
+ .subAggregation(AggregationBuilders.filters("filter", QueryBuilders.boolQuery()))
+ )
+ .addAggregation(AggregationBuilders.min("min").field("constant"))
+ .get();
+
+ SearchResponse r2 = client().prepareSearch("idx")
+ .setSize(0)
+ .addAggregation(
+ terms("terms1").collectMode(SubAggCollectionMode.DEPTH_FIRST)
+ .field("s_value")
+ .size(2)
+ .subAggregation(AggregationBuilders.filters("filter", QueryBuilders.boolQuery()))
+ )
+ .addAggregation(AggregationBuilders.min("min").field("constant"))
+ .get();
+
+ SearchResponse r3 = client().prepareSearch("idx")
+ .setSize(0)
+ .addAggregation(
+ terms("terms1").collectMode(SubAggCollectionMode.BREADTH_FIRST)
+ .field("s_value")
+ .size(2)
+ .subAggregation(AggregationBuilders.filters("filter", QueryBuilders.boolQuery()))
+ )
+ .get();
+
+ SearchResponse r4 = client().prepareSearch("idx")
+ .setSize(0)
+ .addAggregation(
+ terms("terms1").collectMode(SubAggCollectionMode.DEPTH_FIRST)
+ .field("s_value")
+ .size(2)
+ .subAggregation(AggregationBuilders.filters("filter", QueryBuilders.boolQuery()))
+ )
+ .get();
+
+ assertNotNull(r1.getAggregations().get("terms1"));
+ assertNotNull(r2.getAggregations().get("terms1"));
+ assertNotNull(r3.getAggregations().get("terms1"));
+ assertNotNull(r4.getAggregations().get("terms1"));
+
+ Terms terms = r1.getAggregations().get("terms1");
+ Bucket b1 = terms.getBucketByKey("val0");
+ InternalFilters f1 = b1.getAggregations().get("filter");
+ long docCount1 = f1.getBuckets().get(0).getDocCount();
+ Bucket b2 = terms.getBucketByKey("val1");
+ InternalFilters f2 = b2.getAggregations().get("filter");
+ long docCount2 = f1.getBuckets().get(0).getDocCount();
+
+ for (SearchResponse response : new SearchResponse[] { r2, r3, r4 }) {
+ terms = response.getAggregations().get("terms1");
+ f1 = terms.getBucketByKey(b1.getKeyAsString()).getAggregations().get("filter");
+ f2 = terms.getBucketByKey(b2.getKeyAsString()).getAggregations().get("filter");
+ assertEquals(docCount1, f1.getBuckets().get(0).getDocCount());
+ assertEquals(docCount2, f2.getBuckets().get(0).getDocCount());
+ }
+ }
+
/**
* Make sure that a request using a deterministic script or not using a script get cached.
* Ensure requests using nondeterministic scripts do not get cached.
diff --git a/server/src/internalClusterTest/java/org/opensearch/search/query/SearchQueryIT.java b/server/src/internalClusterTest/java/org/opensearch/search/query/SearchQueryIT.java
index 01ad06757640c..a58db51780826 100644
--- a/server/src/internalClusterTest/java/org/opensearch/search/query/SearchQueryIT.java
+++ b/server/src/internalClusterTest/java/org/opensearch/search/query/SearchQueryIT.java
@@ -1914,8 +1914,14 @@ public void testRangeQueryWithTimeZone() throws Exception {
* Test range with a custom locale, e.g. "de" in this case. Documents here mention the day of week
* as "Mi" for "Mittwoch (Wednesday" and "Do" for "Donnerstag (Thursday)" and the month in the query
* as "Dez" for "Dezember (December)".
+ * Note: this test currently needs the JVM arg `-Djava.locale.providers=SPI,COMPAT` to be set.
+ * When running with gradle this is done implicitly through the BuildPlugin, but when running from
+ * an IDE this might need to be set manually in the run configuration. See also CONTRIBUTING.md section
+ * on "Configuring IDEs And Running Tests".
*/
public void testRangeQueryWithLocaleMapping() throws Exception {
+ assert ("SPI,COMPAT".equals(System.getProperty("java.locale.providers"))) : "`-Djava.locale.providers=SPI,COMPAT` needs to be set";
+
assertAcked(
prepareCreate("test").setMapping(
jsonBuilder().startObject()
@@ -1932,21 +1938,17 @@ public void testRangeQueryWithLocaleMapping() throws Exception {
indexRandom(
true,
- client().prepareIndex("test").setId("1").setSource("date_field", "Mi., 06 Dez. 2000 02:55:00 -0800"),
- client().prepareIndex("test").setId("2").setSource("date_field", "Do., 07 Dez. 2000 02:55:00 -0800")
+ client().prepareIndex("test").setId("1").setSource("date_field", "Mi, 06 Dez 2000 02:55:00 -0800"),
+ client().prepareIndex("test").setId("2").setSource("date_field", "Do, 07 Dez 2000 02:55:00 -0800")
);
SearchResponse searchResponse = client().prepareSearch("test")
- .setQuery(
- QueryBuilders.rangeQuery("date_field").gte("Di., 05 Dez. 2000 02:55:00 -0800").lte("Do., 07 Dez. 2000 00:00:00 -0800")
- )
+ .setQuery(QueryBuilders.rangeQuery("date_field").gte("Di, 05 Dez 2000 02:55:00 -0800").lte("Do, 07 Dez 2000 00:00:00 -0800"))
.get();
assertHitCount(searchResponse, 1L);
searchResponse = client().prepareSearch("test")
- .setQuery(
- QueryBuilders.rangeQuery("date_field").gte("Di., 05 Dez. 2000 02:55:00 -0800").lte("Fr., 08 Dez. 2000 00:00:00 -0800")
- )
+ .setQuery(QueryBuilders.rangeQuery("date_field").gte("Di, 05 Dez 2000 02:55:00 -0800").lte("Fr, 08 Dez 2000 00:00:00 -0800"))
.get();
assertHitCount(searchResponse, 2L);
}
diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequestContext.java b/server/src/main/java/org/opensearch/action/search/SearchRequestContext.java
index b8bbde65ca6bc..5b133ba0554f4 100644
--- a/server/src/main/java/org/opensearch/action/search/SearchRequestContext.java
+++ b/server/src/main/java/org/opensearch/action/search/SearchRequestContext.java
@@ -107,6 +107,10 @@ String formattedShardStats() {
);
}
}
+
+ public SearchRequest getRequest() {
+ return searchRequest;
+ }
}
enum ShardStatsFieldNames {
diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java b/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java
index 53efade174502..b944572cef122 100644
--- a/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java
+++ b/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java
@@ -41,11 +41,11 @@ protected SearchRequestOperationsListener(final boolean enabled) {
this.enabled = enabled;
}
- protected abstract void onPhaseStart(SearchPhaseContext context);
+ protected void onPhaseStart(SearchPhaseContext context) {};
- protected abstract void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext);
+ protected void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {};
- protected abstract void onPhaseFailure(SearchPhaseContext context, Throwable cause);
+ protected void onPhaseFailure(SearchPhaseContext context, Throwable cause) {};
protected void onRequestStart(SearchRequestContext searchRequestContext) {}
diff --git a/server/src/main/java/org/opensearch/common/cache/Cache.java b/server/src/main/java/org/opensearch/common/cache/Cache.java
index 6d346de25cadf..caae81e4387b4 100644
--- a/server/src/main/java/org/opensearch/common/cache/Cache.java
+++ b/server/src/main/java/org/opensearch/common/cache/Cache.java
@@ -36,9 +36,11 @@
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.util.concurrent.ReleasableLock;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
@@ -396,7 +398,12 @@ private V get(K key, long now, Consumer> onExpiration) {
if (entry == null) {
return null;
} else {
- promote(entry, now);
+ List> removalNotifications = promote(entry, now).v2();
+ if (!removalNotifications.isEmpty()) {
+ for (RemovalNotification removalNotification : removalNotifications) {
+ removalListener.onRemoval(removalNotification);
+ }
+ }
return entry.value;
}
}
@@ -446,8 +453,14 @@ private V compute(K key, CacheLoader loader) throws ExecutionException {
BiFunction super Entry, Throwable, ? extends V> handler = (ok, ex) -> {
if (ok != null) {
+ List> removalNotifications = new ArrayList<>();
try (ReleasableLock ignored = lruLock.acquire()) {
- promote(ok, now);
+ removalNotifications = promote(ok, now).v2();
+ }
+ if (!removalNotifications.isEmpty()) {
+ for (RemovalNotification removalNotification : removalNotifications) {
+ removalListener.onRemoval(removalNotification);
+ }
}
return ok.value;
} else {
@@ -512,16 +525,22 @@ private void put(K key, V value, long now) {
CacheSegment segment = getCacheSegment(key);
Tuple, Entry> tuple = segment.put(key, value, now);
boolean replaced = false;
+ List> removalNotifications = new ArrayList<>();
try (ReleasableLock ignored = lruLock.acquire()) {
if (tuple.v2() != null && tuple.v2().state == State.EXISTING) {
if (unlink(tuple.v2())) {
replaced = true;
}
}
- promote(tuple.v1(), now);
+ removalNotifications = promote(tuple.v1(), now).v2();
}
if (replaced) {
- removalListener.onRemoval(new RemovalNotification<>(tuple.v2().key, tuple.v2().value, RemovalReason.REPLACED));
+ removalNotifications.add(new RemovalNotification<>(tuple.v2().key, tuple.v2().value, RemovalReason.REPLACED));
+ }
+ if (!removalNotifications.isEmpty()) {
+ for (RemovalNotification removalNotification : removalNotifications) {
+ removalListener.onRemoval(removalNotification);
+ }
}
}
@@ -767,8 +786,17 @@ public long getEvictions() {
}
}
- private boolean promote(Entry entry, long now) {
+ /**
+ * Promotes the desired entry to the head of the lru list and tries to see if it needs to evict any entries in
+ * case the cache size is exceeding or the entry got expired.
+ * @param entry Entry to be promoted
+ * @param now the current time
+ * @return Returns a tuple. v1 signifies whether an entry got promoted, v2 signifies the list of removal
+ * notifications that the callers needs to handle.
+ */
+ private Tuple>> promote(Entry entry, long now) {
boolean promoted = true;
+ List> removalNotifications = new ArrayList<>();
try (ReleasableLock ignored = lruLock.acquire()) {
switch (entry.state) {
case DELETED:
@@ -782,10 +810,21 @@ private boolean promote(Entry entry, long now) {
break;
}
if (promoted) {
- evict(now);
+ while (tail != null && shouldPrune(tail, now)) {
+ Entry entryToBeRemoved = tail;
+ CacheSegment segment = getCacheSegment(entryToBeRemoved.key);
+ if (segment != null) {
+ segment.remove(entryToBeRemoved.key, entryToBeRemoved.value, f -> {});
+ }
+ if (unlink(entryToBeRemoved)) {
+ removalNotifications.add(
+ new RemovalNotification<>(entryToBeRemoved.key, entryToBeRemoved.value, RemovalReason.EVICTED)
+ );
+ }
+ }
}
}
- return promoted;
+ return new Tuple<>(promoted, removalNotifications);
}
private void evict(long now) {
diff --git a/server/src/main/java/org/opensearch/common/cache/RemovalListener.java b/server/src/main/java/org/opensearch/common/cache/RemovalListener.java
index 68e1cdf6139e2..eaaaec2bb07e0 100644
--- a/server/src/main/java/org/opensearch/common/cache/RemovalListener.java
+++ b/server/src/main/java/org/opensearch/common/cache/RemovalListener.java
@@ -42,5 +42,10 @@
@ExperimentalApi
@FunctionalInterface
public interface RemovalListener {
+
+ /**
+ * This may be called from multiple threads at once. So implementation needs to be thread safe.
+ * @param notification removal notification for desired entry.
+ */
void onRemoval(RemovalNotification notification);
}
diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java
index 297fc98764d07..09f32884e0ae1 100644
--- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java
+++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java
@@ -302,6 +302,7 @@ public void apply(Settings value, Settings current, Settings previous) {
RecoverySettings.INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING,
RecoverySettings.INDICES_RECOVERY_INTERNAL_ACTION_TIMEOUT_SETTING,
RecoverySettings.INDICES_RECOVERY_INTERNAL_LONG_ACTION_TIMEOUT_SETTING,
+ RecoverySettings.INDICES_RECOVERY_INTERNAL_ACTION_RETRY_TIMEOUT_SETTING,
RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING,
RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_OPERATIONS_SETTING,
RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_REMOTE_STORE_STREAMS_SETTING,
@@ -743,7 +744,8 @@ public void apply(Settings value, Settings current, Settings previous) {
RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING,
RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING,
RemoteStoreSettings.CLUSTER_REMOTE_MAX_TRANSLOG_READERS,
- RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_METADATA
+ RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_METADATA,
+ SearchService.CLUSTER_ALLOW_DERIVED_FIELD_SETTING
)
)
);
diff --git a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java
index 980c432774f6e..6fe8dec9c21b1 100644
--- a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java
+++ b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java
@@ -237,7 +237,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
// Settings for concurrent segment search
IndexSettings.INDEX_CONCURRENT_SEGMENT_SEARCH_SETTING,
-
+ IndexSettings.ALLOW_DERIVED_FIELDS,
// validate that built-in similarities don't get redefined
Setting.groupSetting("index.similarity.", (s) -> {
Map groups = s.getAsGroups();
diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java
index 6c0ab2f6b0153..613e93698d683 100644
--- a/server/src/main/java/org/opensearch/index/IndexSettings.java
+++ b/server/src/main/java/org/opensearch/index/IndexSettings.java
@@ -151,6 +151,14 @@ public static IndexMergePolicy fromString(String text) {
true,
Property.IndexScope
);
+
+ public static final Setting ALLOW_DERIVED_FIELDS = Setting.boolSetting(
+ "index.query.derived_field.enabled",
+ true,
+ Property.Dynamic,
+ Property.IndexScope
+ );
+
public static final Setting INDEX_TRANSLOG_SYNC_INTERVAL_SETTING = Setting.timeSetting(
"index.translog.sync_interval",
TimeValue.timeValueSeconds(5),
@@ -763,6 +771,7 @@ public static IndexMergePolicy fromString(String text) {
private final boolean assignedOnRemoteNode;
private final RemoteStorePathStrategy remoteStorePathStrategy;
private final boolean isTranslogMetadataEnabled;
+ private volatile boolean allowDerivedField;
/**
* The maximum age of a retention lease before it is considered expired.
@@ -856,6 +865,10 @@ private void setDefaultFields(List defaultFields) {
this.defaultFields = defaultFields;
}
+ private void setAllowDerivedField(boolean allowDerivedField) {
+ this.allowDerivedField = allowDerivedField;
+ }
+
/**
* Returns true
if query string parsing should be lenient. The default is false
*/
@@ -884,6 +897,13 @@ public boolean isDefaultAllowUnmappedFields() {
return defaultAllowUnmappedFields;
}
+ /**
+ * Returns true
if queries are allowed to define and use derived fields. The default is true
+ */
+ public boolean isDerivedFieldAllowed() {
+ return allowDerivedField;
+ }
+
/**
* Creates a new {@link IndexSettings} instance. The given node settings will be merged with the settings in the metadata
* while index level settings will overwrite node settings.
@@ -931,6 +951,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
this.queryStringAnalyzeWildcard = QUERY_STRING_ANALYZE_WILDCARD.get(nodeSettings);
this.queryStringAllowLeadingWildcard = QUERY_STRING_ALLOW_LEADING_WILDCARD.get(nodeSettings);
this.defaultAllowUnmappedFields = scopedSettings.get(ALLOW_UNMAPPED);
+ this.allowDerivedField = scopedSettings.get(ALLOW_DERIVED_FIELDS);
this.durability = scopedSettings.get(INDEX_TRANSLOG_DURABILITY_SETTING);
defaultFields = scopedSettings.get(DEFAULT_FIELD_SETTING);
syncInterval = INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.get(settings);
@@ -1105,6 +1126,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
INDEX_DOC_ID_FUZZY_SET_FALSE_POSITIVE_PROBABILITY_SETTING,
this::setDocIdFuzzySetFalsePositiveProbability
);
+ scopedSettings.addSettingsUpdateConsumer(ALLOW_DERIVED_FIELDS, this::setAllowDerivedField);
}
private void setSearchIdleAfter(TimeValue searchIdleAfter) {
diff --git a/server/src/main/java/org/opensearch/index/mapper/DefaultDerivedFieldResolver.java b/server/src/main/java/org/opensearch/index/mapper/DefaultDerivedFieldResolver.java
new file mode 100644
index 0000000000000..c577a4117247b
--- /dev/null
+++ b/server/src/main/java/org/opensearch/index/mapper/DefaultDerivedFieldResolver.java
@@ -0,0 +1,229 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.index.mapper;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.opensearch.common.regex.Regex;
+import org.opensearch.index.query.QueryShardContext;
+import org.opensearch.script.Script;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.opensearch.index.mapper.FieldMapper.IGNORE_MALFORMED_SETTING;
+
+/**
+ * Accepts definition of DerivedField from search request in both forms: map parsed from SearchRequest and {@link DerivedField} defined using client.
+ * The object is initialized per search request and is responsible to resolve {@link DerivedFieldType} given a field name.
+ * It uses {@link FieldTypeInference} to infer field type for a nested field within DerivedField of {@link DerivedFieldSupportedTypes#OBJECT} type.
+ */
+public class DefaultDerivedFieldResolver implements DerivedFieldResolver {
+ private final QueryShardContext queryShardContext;
+ private final Map derivedFieldTypeMap = new ConcurrentHashMap<>();
+ private final FieldTypeInference typeInference;
+ private static final Logger logger = LogManager.getLogger(DefaultDerivedFieldResolver.class);
+
+ DefaultDerivedFieldResolver(
+ QueryShardContext queryShardContext,
+ Map derivedFieldsObject,
+ List derivedFields
+ ) {
+ this(
+ queryShardContext,
+ derivedFieldsObject,
+ derivedFields,
+ new FieldTypeInference(
+ queryShardContext.index().getName(),
+ queryShardContext.getMapperService(),
+ queryShardContext.getIndexReader()
+ )
+ );
+ }
+
+ DefaultDerivedFieldResolver(
+ QueryShardContext queryShardContext,
+ Map derivedFieldsObject,
+ List derivedFields,
+ FieldTypeInference typeInference
+ ) {
+ this.queryShardContext = queryShardContext;
+ initDerivedFieldTypes(derivedFieldsObject, derivedFields);
+ this.typeInference = typeInference;
+ }
+
+ @Override
+ public Set resolvePattern(String pattern) {
+ Set derivedFields = new HashSet<>();
+ if (queryShardContext != null && queryShardContext.getMapperService() != null) {
+ for (MappedFieldType fieldType : queryShardContext.getMapperService().fieldTypes()) {
+ if (Regex.simpleMatch(pattern, fieldType.name()) && fieldType instanceof DerivedFieldType) {
+ derivedFields.add(fieldType.name());
+ }
+ }
+ }
+ for (String fieldName : derivedFieldTypeMap.keySet()) {
+ if (Regex.simpleMatch(pattern, fieldName)) {
+ derivedFields.add(fieldName);
+ }
+ }
+ return derivedFields;
+ }
+
+ /**
+ * Resolves the fieldName. The search request definitions are given precedence over derived fields definitions in the index mapping.
+ * It caches the response for previously resolved field names
+ * @param fieldName name of the field. It also accepts nested derived field
+ * @return DerivedFieldType if resolved successfully, a null otherwise.
+ */
+ @Override
+ public DerivedFieldType resolve(String fieldName) {
+ return Optional.ofNullable(resolveUsingSearchDefinitions(fieldName)).orElseGet(() -> resolveUsingMappings(fieldName));
+ }
+
+ private DerivedFieldType resolveUsingSearchDefinitions(String fieldName) {
+ return Optional.ofNullable(derivedFieldTypeMap.get(fieldName))
+ .orElseGet(
+ () -> Optional.ofNullable((DerivedFieldType) getParentDerivedField(fieldName))
+ .map(
+ // compute and cache nested derived field
+ parentDerivedField -> derivedFieldTypeMap.computeIfAbsent(
+ fieldName,
+ f -> this.resolveNestedField(f, parentDerivedField)
+ )
+ )
+ .orElse(null)
+ );
+ }
+
+ private DerivedFieldType resolveNestedField(String fieldName, DerivedFieldType parentDerivedField) {
+ Objects.requireNonNull(parentDerivedField);
+ try {
+ Script script = parentDerivedField.derivedField.getScript();
+ String nestedType = explicitTypeFromParent(parentDerivedField.derivedField, fieldName.substring(fieldName.indexOf(".") + 1));
+ if (nestedType == null) {
+ Mapper inferredFieldMapper = typeInference.infer(
+ getValueFetcher(fieldName, script, parentDerivedField.derivedField.getIgnoreMalformed())
+ );
+ if (inferredFieldMapper != null) {
+ nestedType = inferredFieldMapper.typeName();
+ }
+ }
+ if (nestedType != null) {
+ DerivedField derivedField = new DerivedField(fieldName, nestedType, script);
+ if (parentDerivedField.derivedField.getProperties() != null) {
+ derivedField.setProperties(parentDerivedField.derivedField.getProperties());
+ }
+ if (parentDerivedField.derivedField.getPrefilterField() != null) {
+ derivedField.setPrefilterField(parentDerivedField.derivedField.getPrefilterField());
+ }
+ if (parentDerivedField.derivedField.getFormat() != null) {
+ derivedField.setFormat(parentDerivedField.derivedField.getFormat());
+ }
+ if (parentDerivedField.derivedField.getIgnoreMalformed()) {
+ derivedField.setIgnoreMalformed(parentDerivedField.derivedField.getIgnoreMalformed());
+ }
+ return getDerivedFieldType(derivedField);
+ } else {
+ logger.warn(
+ "Field type cannot be inferred. Ensure the field {} is not rare across entire index or provide explicit mapping using [properties] under parent object [{}] ",
+ fieldName,
+ parentDerivedField.derivedField.getName()
+ );
+ }
+ } catch (IOException e) {
+ logger.warn(e.getMessage());
+ }
+ return null;
+ }
+
+ private MappedFieldType getParentDerivedField(String fieldName) {
+ if (fieldName.contains(".")) {
+ return resolve(fieldName.split("\\.")[0]);
+ }
+ return null;
+ }
+
+ private static String explicitTypeFromParent(DerivedField parentDerivedField, String subField) {
+ if (parentDerivedField == null) {
+ return null;
+ }
+ return parentDerivedField.getNestedFieldType(subField);
+ }
+
+ ValueFetcher getValueFetcher(String fieldName, Script script, boolean ignoreMalformed) {
+ String subFieldName = fieldName.substring(fieldName.indexOf(".") + 1);
+ return new ObjectDerivedFieldType.ObjectDerivedFieldValueFetcher(
+ subFieldName,
+ DerivedFieldType.getDerivedFieldLeafFactory(script, queryShardContext, queryShardContext.lookup()),
+ o -> o, // raw object returned will be used to infer the type without modifying it
+ ignoreMalformed
+ );
+ }
+
+ private void initDerivedFieldTypes(Map derivedFieldsObject, List derivedFields) {
+ if (derivedFieldsObject != null && !derivedFieldsObject.isEmpty()) {
+ Map derivedFieldObject = new HashMap<>();
+ derivedFieldObject.put(DerivedFieldMapper.CONTENT_TYPE, derivedFieldsObject);
+ derivedFieldTypeMap.putAll(getAllDerivedFieldTypeFromObject(derivedFieldObject));
+ }
+ if (derivedFields != null) {
+ for (DerivedField derivedField : derivedFields) {
+ derivedFieldTypeMap.put(derivedField.getName(), getDerivedFieldType(derivedField));
+ }
+ }
+ }
+
+ private Map getAllDerivedFieldTypeFromObject(Map derivedFieldObject) {
+ Map derivedFieldTypes = new HashMap<>();
+ DocumentMapper documentMapper = queryShardContext.getMapperService()
+ .documentMapperParser()
+ .parse(DerivedFieldMapper.CONTENT_TYPE, derivedFieldObject);
+ if (documentMapper != null && documentMapper.mappers() != null) {
+ for (Mapper mapper : documentMapper.mappers()) {
+ if (mapper instanceof DerivedFieldMapper) {
+ DerivedFieldType derivedFieldType = ((DerivedFieldMapper) mapper).fieldType();
+ derivedFieldTypes.put(derivedFieldType.name(), derivedFieldType);
+ }
+ }
+ }
+ return derivedFieldTypes;
+ }
+
+ private DerivedFieldType getDerivedFieldType(DerivedField derivedField) {
+ Mapper.BuilderContext builderContext = new Mapper.BuilderContext(
+ queryShardContext.getMapperService().getIndexSettings().getSettings(),
+ new ContentPath(1)
+ );
+ DerivedFieldMapper.Builder builder = new DerivedFieldMapper.Builder(
+ derivedField,
+ queryShardContext.getMapperService().getIndexAnalyzers(),
+ null,
+ IGNORE_MALFORMED_SETTING.getDefault(queryShardContext.getIndexSettings().getSettings())
+ );
+ return builder.build(builderContext).fieldType();
+ }
+
+ private DerivedFieldType resolveUsingMappings(String name) {
+ if (queryShardContext != null && queryShardContext.getMapperService() != null) {
+ MappedFieldType mappedFieldType = queryShardContext.getMapperService().fieldType(name);
+ if (mappedFieldType instanceof DerivedFieldType) {
+ return (DerivedFieldType) mappedFieldType;
+ }
+ }
+ return null;
+ }
+}
diff --git a/server/src/main/java/org/opensearch/index/mapper/DerivedField.java b/server/src/main/java/org/opensearch/index/mapper/DerivedField.java
index b502e41cbb97b..249b60a1c4ec5 100644
--- a/server/src/main/java/org/opensearch/index/mapper/DerivedField.java
+++ b/server/src/main/java/org/opensearch/index/mapper/DerivedField.java
@@ -30,7 +30,7 @@ public class DerivedField implements Writeable, ToXContentFragment {
private final String name;
private final String type;
private final Script script;
- private String sourceIndexedField;
+ private String prefilterField;
private Map properties;
private Boolean ignoreMalformed;
private String format;
@@ -49,7 +49,7 @@ public DerivedField(StreamInput in) throws IOException {
if (in.readBoolean()) {
properties = in.readMap();
}
- sourceIndexedField = in.readOptionalString();
+ prefilterField = in.readOptionalString();
format = in.readOptionalString();
ignoreMalformed = in.readOptionalBoolean();
}
@@ -67,7 +67,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(true);
out.writeMap(properties);
}
- out.writeOptionalString(sourceIndexedField);
+ out.writeOptionalString(prefilterField);
out.writeOptionalString(format);
out.writeOptionalBoolean(ignoreMalformed);
}
@@ -81,8 +81,8 @@ public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params par
if (properties != null) {
builder.field("properties", properties);
}
- if (sourceIndexedField != null) {
- builder.field("source_indexed_field", sourceIndexedField);
+ if (prefilterField != null) {
+ builder.field("prefilter_field", prefilterField);
}
if (format != null) {
builder.field("format", format);
@@ -110,8 +110,15 @@ public Map getProperties() {
return properties;
}
- public String getSourceIndexedField() {
- return sourceIndexedField;
+ public String getNestedFieldType(String fieldName) {
+ if (properties == null || properties.isEmpty() || fieldName == null || fieldName.isEmpty()) {
+ return null;
+ }
+ return (String) properties.get(fieldName);
+ }
+
+ public String getPrefilterField() {
+ return prefilterField;
}
public String getFormat() {
@@ -126,8 +133,8 @@ public void setProperties(Map properties) {
this.properties = properties;
}
- public void setSourceIndexedField(String sourceIndexedField) {
- this.sourceIndexedField = sourceIndexedField;
+ public void setPrefilterField(String prefilterField) {
+ this.prefilterField = prefilterField;
}
public void setFormat(String format) {
@@ -140,7 +147,7 @@ public void setIgnoreMalformed(boolean ignoreMalformed) {
@Override
public int hashCode() {
- return Objects.hash(name, type, script, sourceIndexedField, properties, ignoreMalformed, format);
+ return Objects.hash(name, type, script, prefilterField, properties, ignoreMalformed, format);
}
@Override
@@ -155,7 +162,7 @@ public boolean equals(Object obj) {
return Objects.equals(name, other.name)
&& Objects.equals(type, other.type)
&& Objects.equals(script, other.script)
- && Objects.equals(sourceIndexedField, other.sourceIndexedField)
+ && Objects.equals(prefilterField, other.prefilterField)
&& Objects.equals(properties, other.properties)
&& Objects.equals(ignoreMalformed, other.ignoreMalformed)
&& Objects.equals(format, other.format);
diff --git a/server/src/main/java/org/opensearch/index/mapper/DerivedFieldMapper.java b/server/src/main/java/org/opensearch/index/mapper/DerivedFieldMapper.java
index c6ae71320c35c..e08e46e1ea969 100644
--- a/server/src/main/java/org/opensearch/index/mapper/DerivedFieldMapper.java
+++ b/server/src/main/java/org/opensearch/index/mapper/DerivedFieldMapper.java
@@ -9,16 +9,20 @@
package org.opensearch.index.mapper;
import org.apache.lucene.index.IndexableField;
+import org.opensearch.common.time.DateFormatter;
import org.opensearch.core.xcontent.XContentBuilder;
+import org.opensearch.index.analysis.IndexAnalyzers;
import org.opensearch.script.Script;
import java.io.IOException;
import java.util.Arrays;
-import java.util.HashMap;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
+import static org.opensearch.index.mapper.DateFieldMapper.getDefaultDateTimeFormatter;
+
/**
* A field mapper for derived fields
*
@@ -28,6 +32,8 @@ public class DerivedFieldMapper extends ParametrizedFieldMapper {
public static final String CONTENT_TYPE = "derived";
+ protected final IndexAnalyzers indexAnalyzers;
+
private static DerivedFieldMapper toType(FieldMapper in) {
return (DerivedFieldMapper) in;
}
@@ -38,62 +44,180 @@ private static DerivedFieldMapper toType(FieldMapper in) {
* @opensearch.internal
*/
public static class Builder extends ParametrizedFieldMapper.Builder {
- // TODO: The type of parameter may change here if the actual underlying FieldType object is needed
- private final Parameter type = Parameter.stringParam("type", false, m -> toType(m).type, "");
+ private final Parameter type = Parameter.stringParam("type", true, m -> toType(m).type, "");
+ private final IndexAnalyzers indexAnalyzers;
+ private final boolean defaultIgnoreMalformed;
+ private final DateFormatter defaultDateFormatter;
private final Parameter