Skip to content

Commit

Permalink
Increase JavaDoc coverage and update PR based comments
Browse files Browse the repository at this point in the history
Signed-off-by: Chenyang Ji <[email protected]>
  • Loading branch information
ansjcy committed Feb 6, 2024
1 parent 7bfd76f commit 7c6863c
Show file tree
Hide file tree
Showing 11 changed files with 270 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,9 @@
* @opensearch.internal
*/
public abstract class QueryInsightsExporter<T extends SearchQueryRecord<?>> {
private QueryInsightsExporterType type;
private String identifier;
private final String identifier;

QueryInsightsExporter(QueryInsightsExporterType type, String identifier) {
this.type = type;
QueryInsightsExporter(String identifier) {
this.identifier = identifier;
}

Expand All @@ -35,18 +33,10 @@ public abstract class QueryInsightsExporter<T extends SearchQueryRecord<?>> {
*/
public abstract void export(List<T> records) throws Exception;

public void setType(QueryInsightsExporterType type) {
this.type = type;
}

public QueryInsightsExporterType getType() {
return type;
}

public void setIdentifier(String identifier) {
this.identifier = identifier;
}

/**
* Get the identifier of this exporter
* @return identifier of this exporter
*/
public String getIdentifier() {
return identifier;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,19 @@
* @opensearch.internal
*/
public enum QueryInsightsExporterType {
/* local index exporter */
LOCAL_INDEX("local_index");
/** local index exporter */
LOCAL_INDEX;

private final String type;

QueryInsightsExporterType(String type) {
this.type = type;
@Override
public String toString() {
return super.toString().toLowerCase(Locale.ROOT);
}

/**
* Parse QueryInsightsExporterType from String
* @param type the String representation of the QueryInsightsExporterType
* @return QueryInsightsExporterType
*/
public static QueryInsightsExporterType parse(String type) {
return valueOf(type.toUpperCase(Locale.ROOT));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,20 @@ public class QueryInsightsLocalIndexExporter<T extends SearchQueryRecord<?>> ext
/** The mapping for the local index that holds the data */
private final InputStream localIndexMapping;

/**
* Create a QueryInsightsLocalIndexExporter Object
* @param clusterService The clusterService of the node
* @param client The OpenSearch Client to support index operations
* @param localIndexName The local index name to export the data to
* @param localIndexMapping The mapping for the local index
*/
public QueryInsightsLocalIndexExporter(
ClusterService clusterService,
Client client,
String localIndexName,
InputStream localIndexMapping
) {
super(QueryInsightsExporterType.LOCAL_INDEX, localIndexName);
super(localIndexName);
this.clusterService = clusterService;
this.client = client;
this.localIndexMapping = localIndexMapping;
Expand All @@ -70,42 +77,35 @@ public QueryInsightsLocalIndexExporter(
* @throws IOException if an error occurs
*/
@Override
public synchronized void export(List<T> records) throws IOException {
public void export(List<T> records) throws IOException {
if (records.size() == 0) {
return;
}
if (checkIfIndexExists()) {
bulkRecord(records);
} else {
// local index not exist
initLocalIndex(new ActionListener<>() {
@Override
public void onResponse(CreateIndexResponse response) {
if (response.isAcknowledged()) {
log.debug(
String.format(Locale.ROOT, "successfully initialized local index %s for query insight.", getIdentifier())
);
try {
bulkRecord(records);
} catch (IOException e) {
log.error(String.format(Locale.ROOT, "fail to ingest query insight data to local index, error: %s", e));
}
} else {
log.error(
String.format(
Locale.ROOT,
"request to created local index %s for query insight not acknowledged.",
getIdentifier()
)
);
boolean indexExists = checkAndInitLocalIndex(new ActionListener<>() {
@Override
public void onResponse(CreateIndexResponse response) {
if (response.isAcknowledged()) {
log.debug(String.format(Locale.ROOT, "successfully initialized local index %s for query insight.", getIdentifier()));
try {
bulkRecord(records);
} catch (IOException e) {
log.error(String.format(Locale.ROOT, "fail to ingest query insight data to local index, error: %s", e));
}
} else {
log.error(
String.format(Locale.ROOT, "request to created local index %s for query insight not acknowledged.", getIdentifier())
);
}
}

@Override
public void onFailure(Exception e) {
log.error(String.format(Locale.ROOT, "error creating local index for query insight: %s", e));
}
});
@Override
public void onFailure(Exception e) {
log.error(String.format(Locale.ROOT, "error creating local index for query insight: %s", e));
}
});

if (indexExists) {
bulkRecord(records);
}
}

Expand All @@ -120,15 +120,21 @@ private boolean checkIfIndexExists() {
}

/**
* Initialize the local OpenSearch Index for the exporter
* Check and initialize the local OpenSearch Index for the exporter
*
* @param listener the listener to be notified upon completion
* @return boolean to represent if the index has already been created before calling this function
* @throws IOException if an error occurs
*/
private synchronized void initLocalIndex(ActionListener<CreateIndexResponse> listener) throws IOException {
CreateIndexRequest createIndexRequest = new CreateIndexRequest(this.getIdentifier()).mapping(getIndexMappings())
.settings(Settings.builder().put("index.hidden", false).build());
client.admin().indices().create(createIndexRequest, listener);
private synchronized boolean checkAndInitLocalIndex(ActionListener<CreateIndexResponse> listener) throws IOException {
if (!checkIfIndexExists()) {
CreateIndexRequest createIndexRequest = new CreateIndexRequest(this.getIdentifier()).mapping(getIndexMappings())
.settings(Settings.builder().put("index.hidden", false).build());
client.admin().indices().create(createIndexRequest, listener);
return false;
} else {
return true;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public abstract class QueryInsightsService<R extends SearchQueryRecord<?>, S ext
/** enable insight data export */
private boolean enableExport;

/** The internal store that holds the query insight data */
/** The internal thread-safe store that holds the query insight data */
@Nullable
protected S store;

Expand All @@ -59,8 +59,19 @@ public abstract class QueryInsightsService<R extends SearchQueryRecord<?>, S ext

/** The internal OpenSearch thread pool that execute async processing and exporting tasks*/
protected final ThreadPool threadPool;

/**
* Holds a reference to delayed operation {@link Scheduler.Cancellable} so it can be cancelled when
* the service closed concurrently.
*/
protected volatile Scheduler.Cancellable scheduledFuture;

/**
* Create the Query Insights Service object
* @param threadPool The OpenSearch thread pool to run async tasks
* @param store The in memory store to keep the Query Insights data
* @param exporter The optional {@link QueryInsightsExporter} to export the Query Insights data
*/
@Inject
public QueryInsightsService(ThreadPool threadPool, @Nullable S store, @Nullable E exporter) {
this.threadPool = threadPool;
Expand Down Expand Up @@ -102,7 +113,13 @@ public List<R> getQueryData() throws IllegalArgumentException {
public abstract void clearOutdatedData();

/**
* Restart the exporter with new config
* Reset the exporter with new config
*
* This function can be used to enable/disable an exporter, change the type of the exporter,
* or change the identifier of the exporter.
* @param enabled the enable flag to set on the exporter
* @param type The QueryInsightsExporterType to set on the exporter
* @param identifier the Identifier to set on the exporter
*/
public abstract void resetExporter(boolean enabled, QueryInsightsExporterType type, String identifier);

Expand All @@ -113,18 +130,34 @@ public void clearAllData() {
store.clear();
}

/**
* Set flag to enable or disable Query Insights data collection
* @param enableCollect Flag to enable or disable Query Insights data collection
*/
public void setEnableCollect(boolean enableCollect) {
this.enableCollect = enableCollect;
}

/**
* Get if the Query Insights data collection is enabled
* @return if the Query Insights data collection is enabled
*/
public boolean getEnableCollect() {
return this.enableCollect;
}

/**
* Set flag to enable or disable Query Insights data export
* @param enableExport
*/
public void setEnableExport(boolean enableExport) {
this.enableExport = enableExport;
}

/**
* Get if the Query Insights data export is enabled
* @return if the Query Insights data export is enabled
*/
public boolean getEnableExport() {
return this.enableExport;
}
Expand Down Expand Up @@ -156,7 +189,6 @@ private void doExport() {
List<R> storedData = getQueryData();
try {
exporter.export(storedData);
log.debug(String.format(Locale.ROOT, "finish exporting query insight data to sink %s", storedData));
} catch (Exception e) {
throw new RuntimeException(String.format(Locale.ROOT, "failed to export query insight data to sink, error: %s", e));
}
Expand All @@ -165,6 +197,10 @@ private void doExport() {
@Override
protected void doClose() {}

/**
* Get the export interval set for the {@link QueryInsightsExporter}
* @return export interval
*/
public TimeValue getExportInterval() {
return exportInterval;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,36 @@ public final class SearchQueryLatencyRecord extends SearchQueryRecord<Long> {
// latency info for each search phase
private final Map<String, Long> phaseLatencyMap;

/**
* Constructor for SearchQueryLatencyRecord
*
* @param in A {@link StreamInput} object.
* @throws IOException if the stream cannot be deserialized.
*/
public SearchQueryLatencyRecord(final StreamInput in) throws IOException {
super(in);
this.phaseLatencyMap = in.readMap(StreamInput::readString, StreamInput::readLong);
this.setValue(in.readLong());
}

@Override
protected void addCustomXContent(XContentBuilder builder, Params params) throws IOException {
builder.field(PHASE_LATENCY_MAP, this.getPhaseLatencyMap());
builder.field(TOOK, this.getValue());
}

/**
* Constructor of the SearchQueryLatencyRecord
*
* @param timestamp The timestamp of the query.
* @param searchType The manner at which the search operation is executed. see {@link SearchType}
* @param source The search source that was executed by the query.
* @param totalShards Total number of shards as part of the search query across all indices
* @param indices The indices involved in the search query
* @param propertyMap Extra attributes and information about a search query
* @param phaseLatencyMap A map contains per-phase latency data
* @param tookInNanos Total time took to finish this request
*/
public SearchQueryLatencyRecord(
final Long timestamp,
final SearchType searchType,
Expand All @@ -49,26 +73,27 @@ public SearchQueryLatencyRecord(
this.phaseLatencyMap = phaseLatencyMap;
}

/**
* Get the phase level latency map of this request record
*
* @return Map contains per-phase latency of this request record
*/
public Map<String, Long> getPhaseLatencyMap() {
return phaseLatencyMap;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
super.toXContent(builder, params);
builder.field(PHASE_LATENCY_MAP, this.getPhaseLatencyMap());
builder.field(TOOK, this.getValue());
return builder.endObject();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeMap(phaseLatencyMap, StreamOutput::writeString, StreamOutput::writeLong);
out.writeLong(getValue());
}

/**
* Compare if two SearchQueryLatencyRecord are equal
* @param other The Other SearchQueryLatencyRecord to compare to
* @return boolean
*/
public boolean equals(SearchQueryLatencyRecord other) {
if (!super.equals(other)) {
return false;
Expand Down
Loading

0 comments on commit 7c6863c

Please sign in to comment.