Skip to content

Commit

Permalink
Add missing index check
Browse files Browse the repository at this point in the history
Signed-off-by: Tyler Ohlsen <[email protected]>
  • Loading branch information
ohltyler committed Jan 17, 2024
1 parent ffb35aa commit 8481b44
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 46 deletions.
79 changes: 52 additions & 27 deletions src/main/java/org/opensearch/agent/tools/SearchMonitorsTool.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@
package org.opensearch.agent.tools;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import org.apache.commons.lang3.StringUtils;
import org.apache.lucene.search.join.ScoreMode;
Expand Down Expand Up @@ -105,22 +108,17 @@ public <T> void run(Map<String, String> parameters, ActionListener<T> listener)
if (monitorId != null) {
GetMonitorRequest getMonitorRequest = new GetMonitorRequest(monitorId, 1L, RestRequest.Method.GET, null);
ActionListener<GetMonitorResponse> getMonitorListener = ActionListener.<GetMonitorResponse>wrap(response -> {
StringBuilder sb = new StringBuilder();
Monitor monitor = response.getMonitor();
if (monitor != null) {
sb.append("Monitors=[");
sb.append("{");
sb.append("id=").append(monitor.getId()).append(",");
sb.append("name=").append(monitor.getName());
sb.append("}]");
sb.append("TotalMonitors=1");
processGetMonitorHit(monitor, listener);
}, e -> {
// System index isn't initialized by default, so ignore such errors. Alerting plugin does not return the
// standard IndexNotFoundException so we parse the message instead
if (e.getMessage().contains("Configured indices are not found")) {
processGetMonitorHit(null, listener);
} else {
sb.append("Monitors=[]TotalMonitors=0");
log.error("Failed to get monitor.", e);
listener.onFailure(e);
}
listener.onResponse((T) sb.toString());
}, e -> {
log.error("Failed to search monitors.", e);
listener.onFailure(e);
});
AlertingPluginInterface.INSTANCE.getMonitor((NodeClient) client, getMonitorRequest, getMonitorListener);
} else {
Expand Down Expand Up @@ -172,21 +170,19 @@ public <T> void run(Map<String, String> parameters, ActionListener<T> listener)
SearchMonitorRequest searchMonitorRequest = new SearchMonitorRequest(searchRequest);

ActionListener<SearchResponse> searchMonitorListener = ActionListener.<SearchResponse>wrap(response -> {
StringBuilder sb = new StringBuilder();
SearchHit[] hits = response.getHits().getHits();
sb.append("Monitors=[");
for (SearchHit hit : hits) {
sb.append("{");
sb.append("id=").append(hit.getId()).append(",");
sb.append("name=").append(hit.getSourceAsMap().get("name"));
sb.append("}");
}
sb.append("]");
sb.append("TotalMonitors=").append(response.getHits().getTotalHits().value);
listener.onResponse((T) sb.toString());
List<SearchHit> hits = Arrays.asList(response.getHits().getHits());
Map<String, SearchHit> hitsAsMap = hits.stream().collect(Collectors.toMap(SearchHit::getId, hit -> hit));
processHits(hitsAsMap, listener);

}, e -> {
log.error("Failed to search monitors.", e);
listener.onFailure(e);
// System index isn't initialized by default, so ignore such errors. Alerting plugin does not return the
// standard IndexNotFoundException so we parse the message instead
if (e.getMessage().contains("Configured indices are not found")) {
processHits(Collections.emptyMap(), listener);
} else {
log.error("Failed to search monitors.", e);
listener.onFailure(e);
}
});
AlertingPluginInterface.INSTANCE.searchMonitors((NodeClient) client, searchMonitorRequest, searchMonitorListener);
}
Expand All @@ -202,6 +198,35 @@ public String getType() {
return TYPE;
}

private <T> void processHits(Map<String, SearchHit> hitsAsMap, ActionListener<T> listener) {
StringBuilder sb = new StringBuilder();
sb.append("Monitors=[");
for (SearchHit hit : hitsAsMap.values()) {
sb.append("{");
sb.append("id=").append(hit.getId()).append(",");
sb.append("name=").append(hit.getSourceAsMap().get("name"));
sb.append("}");
}
sb.append("]");
sb.append("TotalMonitors=").append(hitsAsMap.size());
listener.onResponse((T) sb.toString());
}

private <T> void processGetMonitorHit(Monitor monitor, ActionListener<T> listener) {
StringBuilder sb = new StringBuilder();
if (monitor != null) {
sb.append("Monitors=[");
sb.append("{");
sb.append("id=").append(monitor.getId()).append(",");
sb.append("name=").append(monitor.getName());
sb.append("}]");
sb.append("TotalMonitors=1");
} else {
sb.append("Monitors=[]TotalMonitors=0");
}
listener.onResponse((T) sb.toString());
}

/**
* Factory for the {@link SearchMonitorsTool}
*/
Expand Down
19 changes: 0 additions & 19 deletions src/test/java/org/opensearch/integTest/SearchMonitorsToolIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,25 +55,6 @@ public void testSearchMonitorsToolInFlowAgent_withNoSystemIndex() {
assertEquals("Monitors=[]TotalMonitors=0", result);
}

// @SneakyThrows
// public void testSearchAnomalyDetectorsToolInFlowAgent_noMatching() {
// String agentId = createAgent(registerAgentRequestBody);
// String agentInput = "{\"parameters\":{\"detectorName\": \"" + detectorName + "foo" + "\"}}";
// String result = executeAgent(agentId, agentInput);
// assertEquals("AnomalyDetectors=[]TotalAnomalyDetectors=0", result);
// }

// @SneakyThrows
// public void testSearchAnomalyDetectorsToolInFlowAgent_matching() {
// String agentId = createAgent(registerAgentRequestBody);
// String agentInput = "{\"parameters\":{\"detectorName\": \"" + detectorName + "\"}}";
// String result = executeAgent(agentId, agentInput);
// assertEquals(
// String.format(Locale.ROOT, "AnomalyDetectors=[{id=%s,name=%s}]TotalAnomalyDetectors=%d", detectorId, detectorName, 1),
// result
// );
// }

@SneakyThrows
private void createMonitorsSystemIndex(String monitorId, String monitorName) {
createIndexWithConfiguration(
Expand Down

0 comments on commit 8481b44

Please sign in to comment.