Skip to content

Commit

Permalink
./gradlew tidy
Browse files Browse the repository at this point in the history
  • Loading branch information
patsonluk committed Oct 10, 2024
1 parent 35baa62 commit 9afbd9a
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ public void doGet(HttpServletRequest request, HttpServletResponse response)
ResultContext resultContext = new ResultContext(metrics);

AtomicInteger qTime = new AtomicInteger();
// callers might be invoked sequentially in the same thread as some of them might have dependencies on eachother
// callers might be invoked sequentially in the same thread as some of them might have
// dependencies on eachother
for (MetricsApiCaller caller : callers) {
caller.call(qTime, resultContext, request);
}
Expand Down Expand Up @@ -751,7 +752,7 @@ protected void handle(ResultContext resultContext, JsonNode metricsNode) throws
JsonNode nodeMetricNode = metricsNode.get("solr.node");

if (nodeMetricNode != null) {
resultContext.missingCoreMetrics = new ArrayList<>(); //explicitly set missing core metrics
resultContext.missingCoreMetrics = new ArrayList<>(); // explicitly set missing core metrics
for (CoreMetric metric : CoreMetric.values()) {
Number value =
metric.property != null
Expand Down Expand Up @@ -808,7 +809,8 @@ protected String buildQueryString(ResultContext resultContext) {

private List<CoreMetric> getTargetCoreMetrics(ResultContext resultContext) {
List<CoreMetric> targetCoreMetrics = resultContext.missingCoreMetrics;
if (targetCoreMetrics == null) { //if not explicitly defined by other callers, then just get everything
if (targetCoreMetrics
== null) { // if not explicitly defined by other callers, then just get everything
targetCoreMetrics = Arrays.asList(CoreMetric.values());
}
return targetCoreMetrics;
Expand Down Expand Up @@ -844,7 +846,7 @@ private List<CoreMetric> getTargetCoreMetrics(ResultContext resultContext) {
protected void handle(ResultContext resultContext, JsonNode metrics) throws IOException {
List<PrometheusMetric> results = resultContext.resultMetrics;
Map<CoreMetric, Long> accumulative = new LinkedHashMap<>();
for (CoreMetric missingCoreMetric :getTargetCoreMetrics(resultContext)) {
for (CoreMetric missingCoreMetric : getTargetCoreMetrics(resultContext)) {
for (JsonNode coreMetricNode : metrics) {
Number val =
missingCoreMetric.property != null
Expand Down Expand Up @@ -950,12 +952,12 @@ static SolrDispatchFilter getSolrDispatchFilter(HttpServletRequest request) thro
abstract static class MetricsApiCaller {

// use HttpSolrCall to simulate a call to the metrics api.
void call(
AtomicInteger qTime, ResultContext resultContext, HttpServletRequest originalRequest)
void call(AtomicInteger qTime, ResultContext resultContext, HttpServletRequest originalRequest)
throws IOException, UnavailableException {
SolrDispatchFilter filter = getSolrDispatchFilter(originalRequest);
CoreContainer cores = filter.getCores();
HttpServletRequest request = new MetricsApiRequest(originalRequest, buildQueryString(resultContext));
HttpServletRequest request =
new MetricsApiRequest(originalRequest, buildQueryString(resultContext));
MetricsApiResponse response = new MetricsApiResponse();
SolrDispatchFilter.Action action =
new HttpSolrCall(filter, cores, request, response, false).call();
Expand All @@ -982,7 +984,6 @@ void handleResponse(AtomicInteger qTime, ResultContext resultContext, JsonNode r
handle(resultContext, response.path("metrics"));
}


abstract void handle(ResultContext resultContext, JsonNode metrics) throws IOException;

abstract String buildQueryString(ResultContext resultContext);
Expand Down Expand Up @@ -1264,8 +1265,8 @@ public Locale getLocale() {
}

/**
* Context that carries the metrics results as well as information that needs to be propagated in the MetricsApiCaller
* call chain
* Context that carries the metrics results as well as information that needs to be propagated in
* the MetricsApiCaller call chain
*/
static class ResultContext {
final List<PrometheusMetric> resultMetrics;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,12 @@
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.SolrNamedThreadFactory;
import org.apache.solr.response.ResultContext;
import org.junit.Assert;
import org.junit.Test;

Expand All @@ -47,12 +43,12 @@ static void assertMetricsApiCaller(
}

static void assertMetricsApiCaller(
PrometheusMetricsServlet.MetricsApiCaller caller,
PrometheusMetricsServlet.ResultContext resultContext,
String json,
int expectedQTime,
String expectedOutput)
throws Exception {
PrometheusMetricsServlet.MetricsApiCaller caller,
PrometheusMetricsServlet.ResultContext resultContext,
String json,
int expectedQTime,
String expectedOutput)
throws Exception {
AtomicInteger qTime = new AtomicInteger();

List<PrometheusMetricsServlet.PrometheusMetric> metrics;
Expand Down Expand Up @@ -410,11 +406,7 @@ public void testCoresMetricsApiCaller() throws Exception {
+ "# HELP update_errors cumulative number of errors during updates across cores\n"
+ "# TYPE update_errors counter\n"
+ "update_errors 4\n";
assertMetricsApiCaller(
new PrometheusMetricsServlet.CoresMetricsApiCaller(),
json,
14,
output);
assertMetricsApiCaller(new PrometheusMetricsServlet.CoresMetricsApiCaller(), json, 14, output);
}

@Test
Expand Down Expand Up @@ -514,86 +506,87 @@ public void testCoresMetricsApiCallerMissingIndex() throws Exception {
+ "# HELP update_errors cumulative number of errors during updates across cores\n"
+ "# TYPE update_errors counter\n"
+ "update_errors 0\n";
assertMetricsApiCaller(
new PrometheusMetricsServlet.CoresMetricsApiCaller(),
json,
25,
output);
assertMetricsApiCaller(new PrometheusMetricsServlet.CoresMetricsApiCaller(), json, 25, output);
}


@Test
public void testConcurrentCallers() throws Exception {
String coreJson =
"{\n"
+ " \"responseHeader\":{\n"
+ " \"status\":0,\n"
+ " \"QTime\":25},\n"
+ " \"metrics\":{\n"
+ " \"solr.core.loadtest.shard1_1.replica_n8\":{\n"
+ " \"QUERY./get.requestTimes\":{\"count\":29},\n"
+ " \"QUERY./get[shard].requestTimes\":{\"count\":1},\n"
+ " \"UPDATE./update.requestTimes\":{\"count\":2},\n" +
" \"UPDATE./update[local].requestTimes\":{\"count\":1}}}}";
"{\n"
+ " \"responseHeader\":{\n"
+ " \"status\":0,\n"
+ " \"QTime\":25},\n"
+ " \"metrics\":{\n"
+ " \"solr.core.loadtest.shard1_1.replica_n8\":{\n"
+ " \"QUERY./get.requestTimes\":{\"count\":29},\n"
+ " \"QUERY./get[shard].requestTimes\":{\"count\":1},\n"
+ " \"UPDATE./update.requestTimes\":{\"count\":2},\n"
+ " \"UPDATE./update[local].requestTimes\":{\"count\":1}}}}";
String nodeJson =
"{\n"
+ " \"responseHeader\":{\n"
+ " \"status\":0,\n"
+ " \"QTime\":25},\n"
+ " \"metrics\":{\n"
+ "\"solr.node\":{\n" +
" \"UPDATE./update.requestTimes\":{\"count\":20},\n" +
" \"UPDATE./update[local].requestTimes\":{\"count\":10}}}}";

"{\n"
+ " \"responseHeader\":{\n"
+ " \"status\":0,\n"
+ " \"QTime\":25},\n"
+ " \"metrics\":{\n"
+ "\"solr.node\":{\n"
+ " \"UPDATE./update.requestTimes\":{\"count\":20},\n"
+ " \"UPDATE./update[local].requestTimes\":{\"count\":10}}}}";

String nodeOutput = "# HELP distributed_requests_update cumulative number of distributed updates across cores[node aggregated]\n" +
"# TYPE distributed_requests_update counter\n" +
"distributed_requests_update 20\n" +
"# HELP local_requests_update cumulative number of local updates across cores[node aggregated]\n" +
"# TYPE local_requests_update counter\n" +
"local_requests_update 10\n";
String nodeOutput =
"# HELP distributed_requests_update cumulative number of distributed updates across cores[node aggregated]\n"
+ "# TYPE distributed_requests_update counter\n"
+ "distributed_requests_update 20\n"
+ "# HELP local_requests_update cumulative number of local updates across cores[node aggregated]\n"
+ "# TYPE local_requests_update counter\n"
+ "local_requests_update 10\n";

//core CoresMetricsApiCaller output, it should contain both the nodeOutput and the new metrics added by the core
String coreOutput = "# HELP distributed_requests_update cumulative number of distributed updates across cores[node aggregated]\n" +
"# TYPE distributed_requests_update counter\n" +
"distributed_requests_update 20\n" +
"# HELP local_requests_update cumulative number of local updates across cores[node aggregated]\n" +
"# TYPE local_requests_update counter\n" +
"local_requests_update 10\n" +
"# HELP top_level_requests_get cumulative number of top-level gets across cores\n" +
"# TYPE top_level_requests_get counter\n" +
"top_level_requests_get 29\n" +
"# HELP sub_shard_requests_get cumulative number of sub (spawned by re-distributing a top-level req) gets across cores\n" +
"# TYPE sub_shard_requests_get counter\n" +
"sub_shard_requests_get 1\n";
// core CoresMetricsApiCaller output, it should contain both the nodeOutput and the new metrics
// added by the core
String coreOutput =
"# HELP distributed_requests_update cumulative number of distributed updates across cores[node aggregated]\n"
+ "# TYPE distributed_requests_update counter\n"
+ "distributed_requests_update 20\n"
+ "# HELP local_requests_update cumulative number of local updates across cores[node aggregated]\n"
+ "# TYPE local_requests_update counter\n"
+ "local_requests_update 10\n"
+ "# HELP top_level_requests_get cumulative number of top-level gets across cores\n"
+ "# TYPE top_level_requests_get counter\n"
+ "top_level_requests_get 29\n"
+ "# HELP sub_shard_requests_get cumulative number of sub (spawned by re-distributing a top-level req) gets across cores\n"
+ "# TYPE sub_shard_requests_get counter\n"
+ "sub_shard_requests_get 1\n";

final int THREAD_COUNT = 100;
ExecutorService executorService = ExecutorUtil.newMDCAwareFixedThreadPool(
ExecutorService executorService =
ExecutorUtil.newMDCAwareFixedThreadPool(
THREAD_COUNT, new SolrNamedThreadFactory("test-concurrent-metric-callers"));

List<Future<?>> futures = new ArrayList<>();
for (int i = 0 ; i < THREAD_COUNT; i ++) {
futures.add(executorService.submit(() -> {
PrometheusMetricsServlet.ResultContext resultContext = new PrometheusMetricsServlet.ResultContext(new ArrayList<>());
assertMetricsApiCaller(
new PrometheusMetricsServlet.AggregateMetricsApiCaller(),
resultContext,
nodeJson,
25,
nodeOutput);
for (int i = 0; i < THREAD_COUNT; i++) {
futures.add(
executorService.submit(
() -> {
PrometheusMetricsServlet.ResultContext resultContext =
new PrometheusMetricsServlet.ResultContext(new ArrayList<>());
assertMetricsApiCaller(
new PrometheusMetricsServlet.AggregateMetricsApiCaller(),
resultContext,
nodeJson,
25,
nodeOutput);

assertMetricsApiCaller(
new PrometheusMetricsServlet.CoresMetricsApiCaller(),
resultContext,
coreJson,
25,
coreOutput);
return null;
}));
assertMetricsApiCaller(
new PrometheusMetricsServlet.CoresMetricsApiCaller(),
resultContext,
coreJson,
25,
coreOutput);
return null;
}));
}

for (Future<?> future : futures) {
future.get(); //this should not throw any concurrent exceptions or ComparisonFailure
future.get(); // this should not throw any concurrent exceptions or ComparisonFailure
}
}
}

0 comments on commit 9afbd9a

Please sign in to comment.