forked from opensearch-project/OpenSearch
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Query insights exporters implementation (opensearch-project#12982) (o…
…pensearch-project#14061) --------- Signed-off-by: Chenyang Ji <[email protected]> (cherry picked from commit 0ddf4bd) Signed-off-by: kkewwei <[email protected]>
- Loading branch information
Showing
18 changed files
with
845 additions
and
11 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
61 changes: 61 additions & 0 deletions
61
...ry-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/DebugExporter.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.plugin.insights.core.exporter; | ||
|
||
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; | ||
|
||
import java.util.List; | ||
|
||
/** | ||
* Debug exporter for development purpose | ||
*/ | ||
public final class DebugExporter implements QueryInsightsExporter { | ||
/** | ||
* Logger of the debug exporter | ||
*/ | ||
private final Logger logger = LogManager.getLogger(); | ||
|
||
/** | ||
* Constructor of DebugExporter | ||
*/ | ||
private DebugExporter() {} | ||
|
||
private static class InstanceHolder { | ||
private static final DebugExporter INSTANCE = new DebugExporter(); | ||
} | ||
|
||
/** | ||
Get the singleton instance of DebugExporter | ||
* | ||
@return DebugExporter instance | ||
*/ | ||
public static DebugExporter getInstance() { | ||
return InstanceHolder.INSTANCE; | ||
} | ||
|
||
/** | ||
* Write the list of SearchQueryRecord to debug log | ||
* | ||
* @param records list of {@link SearchQueryRecord} | ||
*/ | ||
@Override | ||
public void export(final List<SearchQueryRecord> records) { | ||
logger.debug("QUERY_INSIGHTS_RECORDS: " + records.toString()); | ||
} | ||
|
||
/** | ||
* Close the debugger exporter sink | ||
*/ | ||
@Override | ||
public void close() { | ||
logger.debug("Closing the DebugExporter.."); | ||
} | ||
} |
113 changes: 113 additions & 0 deletions
113
...sights/src/main/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporter.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,113 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.plugin.insights.core.exporter; | ||
|
||
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
import org.opensearch.action.bulk.BulkRequestBuilder; | ||
import org.opensearch.action.bulk.BulkResponse; | ||
import org.opensearch.action.index.IndexRequest; | ||
import org.opensearch.client.Client; | ||
import org.opensearch.common.unit.TimeValue; | ||
import org.opensearch.common.xcontent.XContentFactory; | ||
import org.opensearch.core.action.ActionListener; | ||
import org.opensearch.core.xcontent.ToXContent; | ||
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; | ||
import org.joda.time.DateTime; | ||
import org.joda.time.DateTimeZone; | ||
import org.joda.time.format.DateTimeFormatter; | ||
|
||
import java.util.List; | ||
|
||
/** | ||
* Local index exporter for exporting query insights data to local OpenSearch indices. | ||
*/ | ||
public final class LocalIndexExporter implements QueryInsightsExporter { | ||
/** | ||
* Logger of the local index exporter | ||
*/ | ||
private final Logger logger = LogManager.getLogger(); | ||
private final Client client; | ||
private DateTimeFormatter indexPattern; | ||
|
||
/** | ||
* Constructor of LocalIndexExporter | ||
* | ||
* @param client OS client | ||
* @param indexPattern the pattern of index to export to | ||
*/ | ||
public LocalIndexExporter(final Client client, final DateTimeFormatter indexPattern) { | ||
this.indexPattern = indexPattern; | ||
this.client = client; | ||
} | ||
|
||
/** | ||
* Getter of indexPattern | ||
* | ||
* @return indexPattern | ||
*/ | ||
public DateTimeFormatter getIndexPattern() { | ||
return indexPattern; | ||
} | ||
|
||
/** | ||
* Setter of indexPattern | ||
* | ||
* @param indexPattern index pattern | ||
* @return the current LocalIndexExporter | ||
*/ | ||
public LocalIndexExporter setIndexPattern(DateTimeFormatter indexPattern) { | ||
this.indexPattern = indexPattern; | ||
return this; | ||
} | ||
|
||
/** | ||
* Export a list of SearchQueryRecord to a local index | ||
* | ||
* @param records list of {@link SearchQueryRecord} | ||
*/ | ||
@Override | ||
public void export(final List<SearchQueryRecord> records) { | ||
if (records == null || records.size() == 0) { | ||
return; | ||
} | ||
try { | ||
final String index = getDateTimeFromFormat(); | ||
final BulkRequestBuilder bulkRequestBuilder = client.prepareBulk().setTimeout(TimeValue.timeValueMinutes(1)); | ||
for (SearchQueryRecord record : records) { | ||
bulkRequestBuilder.add( | ||
new IndexRequest(index).source(record.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) | ||
); | ||
} | ||
bulkRequestBuilder.execute(new ActionListener<BulkResponse>() { | ||
@Override | ||
public void onResponse(BulkResponse bulkItemResponses) {} | ||
|
||
@Override | ||
public void onFailure(Exception e) { | ||
logger.error("Failed to execute bulk operation for query insights data: ", e); | ||
} | ||
}); | ||
} catch (final Exception e) { | ||
logger.error("Unable to index query insights data: ", e); | ||
} | ||
} | ||
|
||
/** | ||
* Close the exporter sink | ||
*/ | ||
@Override | ||
public void close() { | ||
logger.debug("Closing the LocalIndexExporter.."); | ||
} | ||
|
||
private String getDateTimeFromFormat() { | ||
return indexPattern.print(DateTime.now(DateTimeZone.UTC)); | ||
} | ||
} |
26 changes: 26 additions & 0 deletions
26
...hts/src/main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporter.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.plugin.insights.core.exporter; | ||
|
||
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; | ||
|
||
import java.io.Closeable; | ||
import java.util.List; | ||
|
||
/** | ||
* Base interface for Query Insights exporters | ||
*/ | ||
public interface QueryInsightsExporter extends Closeable { | ||
/** | ||
* Export a list of SearchQueryRecord to the exporter sink | ||
* | ||
* @param records list of {@link SearchQueryRecord} | ||
*/ | ||
void export(final List<SearchQueryRecord> records); | ||
} |
143 changes: 143 additions & 0 deletions
143
.../main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporterFactory.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,143 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.plugin.insights.core.exporter; | ||
|
||
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
import org.opensearch.client.Client; | ||
import org.opensearch.common.settings.Settings; | ||
import org.joda.time.format.DateTimeFormat; | ||
|
||
import java.io.IOException; | ||
import java.util.HashSet; | ||
import java.util.Locale; | ||
import java.util.Set; | ||
|
||
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_TOP_N_LATENCY_QUERIES_INDEX_PATTERN; | ||
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_TOP_QUERIES_EXPORTER_TYPE; | ||
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.EXPORTER_TYPE; | ||
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.EXPORT_INDEX; | ||
|
||
/** | ||
* Factory class for validating and creating exporters based on provided settings | ||
*/ | ||
public class QueryInsightsExporterFactory { | ||
/** | ||
* Logger of the query insights exporter factory | ||
*/ | ||
private final Logger logger = LogManager.getLogger(); | ||
final private Client client; | ||
final private Set<QueryInsightsExporter> exporters; | ||
|
||
/** | ||
* Constructor of QueryInsightsExporterFactory | ||
* | ||
* @param client OS client | ||
*/ | ||
public QueryInsightsExporterFactory(final Client client) { | ||
this.client = client; | ||
this.exporters = new HashSet<>(); | ||
} | ||
|
||
/** | ||
* Validate exporter sink config | ||
* | ||
* @param settings exporter sink config {@link Settings} | ||
* @throws IllegalArgumentException if provided exporter sink config settings are invalid | ||
*/ | ||
public void validateExporterConfig(final Settings settings) throws IllegalArgumentException { | ||
// Disable exporter if the EXPORTER_TYPE setting is null | ||
if (settings.get(EXPORTER_TYPE) == null) { | ||
return; | ||
} | ||
SinkType type; | ||
try { | ||
type = SinkType.parse(settings.get(EXPORTER_TYPE, DEFAULT_TOP_QUERIES_EXPORTER_TYPE)); | ||
} catch (IllegalArgumentException e) { | ||
throw new IllegalArgumentException( | ||
String.format( | ||
Locale.ROOT, | ||
"Invalid exporter type [%s], type should be one of %s", | ||
settings.get(EXPORTER_TYPE), | ||
SinkType.allSinkTypes() | ||
) | ||
); | ||
} | ||
switch (type) { | ||
case LOCAL_INDEX: | ||
final String indexPattern = settings.get(EXPORT_INDEX, DEFAULT_TOP_N_LATENCY_QUERIES_INDEX_PATTERN); | ||
if (indexPattern.length() == 0) { | ||
throw new IllegalArgumentException("Empty index pattern configured for the exporter"); | ||
} | ||
try { | ||
DateTimeFormat.forPattern(indexPattern); | ||
} catch (Exception e) { | ||
throw new IllegalArgumentException( | ||
String.format(Locale.ROOT, "Invalid index pattern [%s] configured for the exporter", indexPattern) | ||
); | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* Create an exporter based on provided parameters | ||
* | ||
* @param type The type of exporter to create | ||
* @param indexPattern the index pattern if creating a index exporter | ||
* @return QueryInsightsExporter the created exporter sink | ||
*/ | ||
public QueryInsightsExporter createExporter(SinkType type, String indexPattern) { | ||
if (SinkType.LOCAL_INDEX.equals(type)) { | ||
QueryInsightsExporter exporter = new LocalIndexExporter(client, DateTimeFormat.forPattern(indexPattern)); | ||
this.exporters.add(exporter); | ||
return exporter; | ||
} | ||
return DebugExporter.getInstance(); | ||
} | ||
|
||
/** | ||
* Update an exporter based on provided parameters | ||
* | ||
* @param exporter The exporter to update | ||
* @param indexPattern the index pattern if creating a index exporter | ||
* @return QueryInsightsExporter the updated exporter sink | ||
*/ | ||
public QueryInsightsExporter updateExporter(QueryInsightsExporter exporter, String indexPattern) { | ||
if (exporter.getClass() == LocalIndexExporter.class) { | ||
((LocalIndexExporter) exporter).setIndexPattern(DateTimeFormat.forPattern(indexPattern)); | ||
} | ||
return exporter; | ||
} | ||
|
||
/** | ||
* Close an exporter | ||
* | ||
* @param exporter the exporter to close | ||
*/ | ||
public void closeExporter(QueryInsightsExporter exporter) throws IOException { | ||
if (exporter != null) { | ||
exporter.close(); | ||
this.exporters.remove(exporter); | ||
} | ||
} | ||
|
||
/** | ||
* Close all exporters | ||
* | ||
*/ | ||
public void closeAllExporters() { | ||
for (QueryInsightsExporter exporter : exporters) { | ||
try { | ||
closeExporter(exporter); | ||
} catch (IOException e) { | ||
logger.error("Fail to close query insights exporter, error: ", e); | ||
} | ||
} | ||
} | ||
} |
Oops, something went wrong.