Skip to content

Commit

Permalink
SAI-5163 : ConcurrentModificationException from metrics callers (#227)
Browse files Browse the repository at this point in the history
* Fixed ConcurrentModificationException triggered by concurrent write from AggregateMetricsApiCaller and read from CoresMetricsApiCaller on the missingCoreMetrics field

Instead we will pass a req specific ResultContext, which will not be shared among threads

* Added extra javadoc

* Tweak unit test case testConcurrentCallers

* ./gradlew tidy

* Fixed weird indentation

* Fixed javadoc
  • Loading branch information
patsonluk committed Oct 11, 2024
1 parent ad302ee commit d6a38cf
Show file tree
Hide file tree
Showing 2 changed files with 167 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,16 +72,14 @@ public final class PrometheusMetricsServlet extends BaseSolrServlet {
private final List<MetricsApiCaller> callers = getCallers();

private List<MetricsApiCaller> getCallers() {
AggregateMetricsApiCaller aggregateMetricsApiCaller = new AggregateMetricsApiCaller();
return List.of(
new GarbageCollectorMetricsApiCaller(),
new MemoryMetricsApiCaller(),
new OsMetricsApiCaller(),
new ThreadMetricsApiCaller(),
new StatusCodeMetricsApiCaller(),
aggregateMetricsApiCaller,
new CoresMetricsApiCaller(
Collections.unmodifiableList(aggregateMetricsApiCaller.missingCoreMetrics)));
new AggregateMetricsApiCaller(),
new CoresMetricsApiCaller());
}

private final Map<String, PrometheusMetricType> cacheMetricTypes =
Expand All @@ -96,9 +94,13 @@ private List<MetricsApiCaller> getCallers() {
public void doGet(HttpServletRequest request, HttpServletResponse response)
throws IOException, UnavailableException {
List<PrometheusMetric> metrics = new ArrayList<>();
ResultContext resultContext = new ResultContext(metrics);

AtomicInteger qTime = new AtomicInteger();
// callers should be invoked sequentially in the same thread there could be dependencies among
// them
for (MetricsApiCaller caller : callers) {
caller.call(qTime, metrics, request);
caller.call(qTime, resultContext, request);
}
getCompressingDirectoryPoolMetrics(metrics);
getCircuitBreakerMetrics(metrics);
Expand Down Expand Up @@ -289,7 +291,8 @@ static class GarbageCollectorMetricsApiCaller extends MetricsByPrefixApiCaller {
"memory.pools.G1-Survivor-Space.used-after-gc":20971520}}}
*/
@Override
protected void handle(List<PrometheusMetric> results, JsonNode metrics) throws IOException {
protected void handle(ResultContext resultContext, JsonNode metrics) throws IOException {
List<PrometheusMetric> results = resultContext.resultMetrics;
JsonNode parent = metrics.path("solr.jvm");
results.add(
new PrometheusMetric(
Expand Down Expand Up @@ -375,7 +378,8 @@ static class MemoryMetricsApiCaller extends MetricsByPrefixApiCaller {
"memory.non-heap.used":93135560}}}
*/
@Override
protected void handle(List<PrometheusMetric> results, JsonNode metrics) throws IOException {
protected void handle(ResultContext resultContext, JsonNode metrics) throws IOException {
List<PrometheusMetric> results = resultContext.resultMetrics;
JsonNode parent = metrics.path("solr.jvm");
results.add(
new PrometheusMetric(
Expand Down Expand Up @@ -430,7 +434,8 @@ static class OsMetricsApiCaller extends MetricsByPrefixApiCaller {
"os.version":"10.15.7"}}}
*/
@Override
protected void handle(List<PrometheusMetric> results, JsonNode metrics) throws IOException {
protected void handle(ResultContext resultContext, JsonNode metrics) throws IOException {
List<PrometheusMetric> results = resultContext.resultMetrics;
JsonNode parent = metrics.path("solr.jvm");
results.add(
new PrometheusMetric(
Expand Down Expand Up @@ -468,7 +473,8 @@ static class ThreadMetricsApiCaller extends MetricsByPrefixApiCaller {
"threads.waiting.count":1756}}}
*/
@Override
protected void handle(List<PrometheusMetric> results, JsonNode metrics) throws IOException {
protected void handle(ResultContext resultContext, JsonNode metrics) throws IOException {
List<PrometheusMetric> results = resultContext.resultMetrics;
JsonNode parent = metrics.path("solr.jvm");
results.add(
new PrometheusMetric(
Expand Down Expand Up @@ -534,7 +540,8 @@ static class StatusCodeMetricsApiCaller extends MetricsByPrefixApiCaller {
...
*/
@Override
protected void handle(List<PrometheusMetric> results, JsonNode metrics) throws IOException {
protected void handle(ResultContext resultContext, JsonNode metrics) throws IOException {
List<PrometheusMetric> results = resultContext.resultMetrics;
JsonNode parent = metrics.path("solr.jetty");
results.add(
new PrometheusMetric(
Expand Down Expand Up @@ -826,7 +833,6 @@ static class AggregateMetricsApiCaller extends MetricsByPrefixApiCaller {
},
...
*/
List<CoreMetric> missingCoreMetrics = new ArrayList<>();

AggregateMetricsApiCaller() {
super("solr.node", buildPrefix(), buildProperty());
Expand All @@ -847,11 +853,12 @@ private static String buildProperty() {
}

@Override
protected void handle(List<PrometheusMetric> results, JsonNode metricsNode) throws IOException {
missingCoreMetrics.clear();
protected void handle(ResultContext resultContext, JsonNode metricsNode) throws IOException {
List<PrometheusMetric> results = resultContext.resultMetrics;
JsonNode nodeMetricNode = metricsNode.get("solr.node");

if (nodeMetricNode != null) {
resultContext.missingCoreMetrics = new ArrayList<>(); // explicitly set missing core metrics
for (CoreMetric metric : CoreMetric.values()) {
Number value =
metric.property != null
Expand All @@ -860,13 +867,12 @@ protected void handle(List<PrometheusMetric> results, JsonNode metricsNode) thro
if (!INVALID_NUMBER.equals(value)) {
results.add(metric.createPrometheusMetric(value, "[node aggregated]"));
} else {
missingCoreMetrics.add(metric);
resultContext.missingCoreMetrics.add(metric);
}
}
} else {
log.warn(
"Cannot find the solr.node metrics, going to fall back to getting metrics from all cores");
missingCoreMetrics.addAll(Arrays.asList(CoreMetric.values()));
}
}
}
Expand All @@ -881,20 +887,15 @@ protected void handle(List<PrometheusMetric> results, JsonNode metricsNode) thro
* concurrently with it.
*/
static class CoresMetricsApiCaller extends MetricsApiCaller {
private final List<CoreMetric> missingCoreMetricsView;

CoresMetricsApiCaller(List<CoreMetric> missingCoreMetricsView) {
this.missingCoreMetricsView = missingCoreMetricsView;
}

@Override
protected String buildQueryString() {
protected String buildQueryString(ResultContext resultContext) {
List<String> prefixes = new ArrayList<>();
List<String> properties = new ArrayList<>();
for (CoreMetric missingMetric : missingCoreMetricsView) {
prefixes.add(missingMetric.key);
if (missingMetric.property != null) {
properties.add(missingMetric.property);

for (CoreMetric targetMetric : getTargetCoreMetrics(resultContext)) {
prefixes.add(targetMetric.key);
if (targetMetric.property != null) {
properties.add(targetMetric.property);
}
}

Expand All @@ -912,6 +913,15 @@ protected String buildQueryString() {
propertyClause);
}

private List<CoreMetric> getTargetCoreMetrics(ResultContext resultContext) {
List<CoreMetric> targetCoreMetrics = resultContext.missingCoreMetrics;
// if not explicitly defined by other callers, then just get everything
if (targetCoreMetrics == null) {
targetCoreMetrics = Arrays.asList(CoreMetric.values());
}
return targetCoreMetrics;
}

/*
"metrics":{
"solr.core.loadtest.shard1_1.replica_n8":{
Expand Down Expand Up @@ -939,9 +949,10 @@ protected String buildQueryString() {
*/

@Override
protected void handle(List<PrometheusMetric> results, JsonNode metrics) throws IOException {
protected void handle(ResultContext resultContext, JsonNode metrics) throws IOException {
List<PrometheusMetric> results = resultContext.resultMetrics;
Map<CoreMetric, Long> accumulative = new LinkedHashMap<>();
for (CoreMetric missingCoreMetric : missingCoreMetricsView) {
for (CoreMetric missingCoreMetric : getTargetCoreMetrics(resultContext)) {
for (JsonNode coreMetricNode : metrics) {
Number val =
missingCoreMetric.property != null
Expand Down Expand Up @@ -1047,12 +1058,12 @@ static SolrDispatchFilter getSolrDispatchFilter(HttpServletRequest request) thro
abstract static class MetricsApiCaller {

// use HttpSolrCall to simulate a call to the metrics api.
void call(
AtomicInteger qTime, List<PrometheusMetric> results, HttpServletRequest originalRequest)
void call(AtomicInteger qTime, ResultContext resultContext, HttpServletRequest originalRequest)
throws IOException, UnavailableException {
SolrDispatchFilter filter = getSolrDispatchFilter(originalRequest);
CoreContainer cores = filter.getCores();
HttpServletRequest request = new MetricsApiRequest(originalRequest, buildQueryString());
HttpServletRequest request =
new MetricsApiRequest(originalRequest, buildQueryString(resultContext));
MetricsApiResponse response = new MetricsApiResponse();
SolrDispatchFilter.Action action =
new HttpSolrCall(filter, cores, request, response, false).call();
Expand All @@ -1064,10 +1075,10 @@ void call(
action,
SolrDispatchFilter.Action.RETURN));
}
handleResponse(qTime, results, response.getJsonNode());
handleResponse(qTime, resultContext, response.getJsonNode());
}

void handleResponse(AtomicInteger qTime, List<PrometheusMetric> results, JsonNode response)
void handleResponse(AtomicInteger qTime, ResultContext resultContext, JsonNode response)
throws IOException {
JsonNode header = response.path("responseHeader");
int status = getNumber(header, "status").intValue();
Expand All @@ -1076,13 +1087,12 @@ void handleResponse(AtomicInteger qTime, List<PrometheusMetric> results, JsonNod
String.format(Locale.ROOT, "metrics api response status is %d; expected 0.", status));
}
qTime.addAndGet(getNumber(header, "QTime").intValue());
handle(results, response.path("metrics"));
handle(resultContext, response.path("metrics"));
}

protected abstract void handle(List<PrometheusMetric> results, JsonNode metrics)
throws IOException;
abstract void handle(ResultContext resultContext, JsonNode metrics) throws IOException;

protected abstract String buildQueryString();
abstract String buildQueryString(ResultContext resultContext);
}

private abstract static class MetricsByPrefixApiCaller extends MetricsApiCaller {
Expand All @@ -1099,7 +1109,7 @@ private abstract static class MetricsByPrefixApiCaller extends MetricsApiCaller
}

@Override
protected String buildQueryString() {
protected String buildQueryString(ResultContext resultContext) {
String propertyClause =
String.join(
"&property=",
Expand Down Expand Up @@ -1359,4 +1369,17 @@ public Locale getLocale() {
return Locale.ROOT;
}
}

/**
* Context that carries the metrics results as well as information that needs to be propagated in
* the MetricsApiCaller call chain
*/
static class ResultContext {
final List<PrometheusMetric> resultMetrics;
List<CoreMetric> missingCoreMetrics;

ResultContext(List<PrometheusMetric> resultMetrics) {
this.resultMetrics = resultMetrics;
}
}
}
Loading

0 comments on commit d6a38cf

Please sign in to comment.