Skip to content

Commit

Permalink
Use new runInParallel and startInParallel test util in more spots (el…
Browse files Browse the repository at this point in the history
…astic#110853)

It's in the title. Use the utility in a couple more spots to save some code and thread creation.
  • Loading branch information
original-brownbear authored Jul 15, 2024
1 parent e6713a5 commit 2dc28ac
Show file tree
Hide file tree
Showing 12 changed files with 445 additions and 811 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
Expand Down Expand Up @@ -585,85 +584,70 @@ public void testRandomIDsAndVersions() throws Exception {
}

final AtomicInteger upto = new AtomicInteger();
final CountDownLatch startingGun = new CountDownLatch(1);
Thread[] threads = new Thread[TestUtil.nextInt(random, 1, TEST_NIGHTLY ? 20 : 5)];
final long startTime = System.nanoTime();
for (int i = 0; i < threads.length; i++) {
final int threadID = i;
threads[i] = new Thread() {
@Override
public void run() {
try {
// final Random threadRandom = RandomizedContext.current().getRandom();
final Random threadRandom = random();
startingGun.await();
while (true) {

// TODO: sometimes use bulk:

int index = upto.getAndIncrement();
if (index >= idVersions.length) {
break;
}
if (index % 100 == 0) {
logger.trace("{}: index={}", Thread.currentThread().getName(), index);
}
IDAndVersion idVersion = idVersions[index];

String id = idVersion.id;
idVersion.threadID = threadID;
idVersion.indexStartTime = System.nanoTime() - startTime;
long version = idVersion.version;
if (idVersion.delete) {
try {
idVersion.response = client().prepareDelete("test", id)
.setVersion(version)
.setVersionType(VersionType.EXTERNAL)
.get();
} catch (VersionConflictEngineException vcee) {
// OK: our version is too old
assertThat(version, lessThanOrEqualTo(truth.get(id).version));
idVersion.versionConflict = true;
}
} else {
try {
idVersion.response = prepareIndex("test").setId(id)
.setSource("foo", "bar")
.setVersion(version)
.setVersionType(VersionType.EXTERNAL)
.get();

} catch (VersionConflictEngineException vcee) {
// OK: our version is too old
assertThat(version, lessThanOrEqualTo(truth.get(id).version));
idVersion.versionConflict = true;
}
}
idVersion.indexFinishTime = System.nanoTime() - startTime;

if (threadRandom.nextInt(100) == 7) {
logger.trace("--> {}: TEST: now refresh at {}", threadID, System.nanoTime() - startTime);
refresh();
logger.trace("--> {}: TEST: refresh done at {}", threadID, System.nanoTime() - startTime);
}
if (threadRandom.nextInt(100) == 7) {
logger.trace("--> {}: TEST: now flush at {}", threadID, System.nanoTime() - startTime);
flush();
logger.trace("--> {}: TEST: flush done at {}", threadID, System.nanoTime() - startTime);
}
startInParallel(TestUtil.nextInt(random, 1, TEST_NIGHTLY ? 20 : 5), threadID -> {
try {
// final Random threadRandom = RandomizedContext.current().getRandom();
final Random threadRandom = random();
while (true) {

// TODO: sometimes use bulk:

int index = upto.getAndIncrement();
if (index >= idVersions.length) {
break;
}
if (index % 100 == 0) {
logger.trace("{}: index={}", Thread.currentThread().getName(), index);
}
IDAndVersion idVersion = idVersions[index];

String id = idVersion.id;
idVersion.threadID = threadID;
idVersion.indexStartTime = System.nanoTime() - startTime;
long v = idVersion.version;
if (idVersion.delete) {
try {
idVersion.response = client().prepareDelete("test", id)
.setVersion(v)
.setVersionType(VersionType.EXTERNAL)
.get();
} catch (VersionConflictEngineException vcee) {
// OK: our version is too old
assertThat(v, lessThanOrEqualTo(truth.get(id).version));
idVersion.versionConflict = true;
}
} else {
try {
idVersion.response = prepareIndex("test").setId(id)
.setSource("foo", "bar")
.setVersion(v)
.setVersionType(VersionType.EXTERNAL)
.get();

} catch (VersionConflictEngineException vcee) {
// OK: our version is too old
assertThat(v, lessThanOrEqualTo(truth.get(id).version));
idVersion.versionConflict = true;
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
};
threads[i].start();
}
idVersion.indexFinishTime = System.nanoTime() - startTime;

startingGun.countDown();
for (Thread thread : threads) {
thread.join();
}
if (threadRandom.nextInt(100) == 7) {
logger.trace("--> {}: TEST: now refresh at {}", threadID, System.nanoTime() - startTime);
refresh();
logger.trace("--> {}: TEST: refresh done at {}", threadID, System.nanoTime() - startTime);
}
if (threadRandom.nextInt(100) == 7) {
logger.trace("--> {}: TEST: now flush at {}", threadID, System.nanoTime() - startTime);
flush();
logger.trace("--> {}: TEST: flush done at {}", threadID, System.nanoTime() - startTime);
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
});

// Verify against truth:
boolean failed = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -748,43 +748,34 @@ public void testConsumerConcurrently() throws Exception {
)
) {
AtomicInteger max = new AtomicInteger();
Thread[] threads = new Thread[expectedNumResults];
CountDownLatch latch = new CountDownLatch(expectedNumResults);
for (int i = 0; i < expectedNumResults; i++) {
int id = i;
threads[i] = new Thread(() -> {
int number = randomIntBetween(1, 1000);
max.updateAndGet(prev -> Math.max(prev, number));
QuerySearchResult result = new QuerySearchResult(
new ShardSearchContextId("", id),
new SearchShardTarget("node", new ShardId("a", "b", id), null),
null
runInParallel(expectedNumResults, id -> {
int number = randomIntBetween(1, 1000);
max.updateAndGet(prev -> Math.max(prev, number));
QuerySearchResult result = new QuerySearchResult(
new ShardSearchContextId("", id),
new SearchShardTarget("node", new ShardId("a", "b", id), null),
null
);
try {
result.topDocs(
new TopDocsAndMaxScore(
new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO), new ScoreDoc[] { new ScoreDoc(0, number) }),
number
),
new DocValueFormat[0]
);
try {
result.topDocs(
new TopDocsAndMaxScore(
new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO), new ScoreDoc[] { new ScoreDoc(0, number) }),
number
),
new DocValueFormat[0]
);
InternalAggregations aggs = InternalAggregations.from(
Collections.singletonList(new Max("test", (double) number, DocValueFormat.RAW, Collections.emptyMap()))
);
result.aggregations(aggs);
result.setShardIndex(id);
result.size(1);
consumer.consumeResult(result, latch::countDown);
} finally {
result.decRef();
}

});
threads[i].start();
}
for (int i = 0; i < expectedNumResults; i++) {
threads[i].join();
}
InternalAggregations aggs = InternalAggregations.from(
Collections.singletonList(new Max("test", (double) number, DocValueFormat.RAW, Collections.emptyMap()))
);
result.aggregations(aggs);
result.setShardIndex(id);
result.size(1);
consumer.consumeResult(result, latch::countDown);
} finally {
result.decRef();
}
});
latch.await();

SearchPhaseController.ReducedQueryPhase reduce = consumer.reduce();
Expand Down Expand Up @@ -1264,42 +1255,34 @@ public void onFinalReduce(List<SearchShard> shards, TotalHits totalHits, Interna
)
) {
AtomicInteger max = new AtomicInteger();
Thread[] threads = new Thread[expectedNumResults];
CountDownLatch latch = new CountDownLatch(expectedNumResults);
for (int i = 0; i < expectedNumResults; i++) {
int id = i;
threads[i] = new Thread(() -> {
int number = randomIntBetween(1, 1000);
max.updateAndGet(prev -> Math.max(prev, number));
QuerySearchResult result = new QuerySearchResult(
new ShardSearchContextId("", id),
new SearchShardTarget("node", new ShardId("a", "b", id), null),
null
runInParallel(expectedNumResults, id -> {
int number = randomIntBetween(1, 1000);
max.updateAndGet(prev -> Math.max(prev, number));
QuerySearchResult result = new QuerySearchResult(
new ShardSearchContextId("", id),
new SearchShardTarget("node", new ShardId("a", "b", id), null),
null
);
try {
result.topDocs(
new TopDocsAndMaxScore(
new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO), new ScoreDoc[] { new ScoreDoc(0, number) }),
number
),
new DocValueFormat[0]
);
try {
result.topDocs(
new TopDocsAndMaxScore(
new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO), new ScoreDoc[] { new ScoreDoc(0, number) }),
number
),
new DocValueFormat[0]
);
InternalAggregations aggs = InternalAggregations.from(
Collections.singletonList(new Max("test", (double) number, DocValueFormat.RAW, Collections.emptyMap()))
);
result.aggregations(aggs);
result.setShardIndex(id);
result.size(1);
consumer.consumeResult(result, latch::countDown);
} finally {
result.decRef();
}
});
threads[i].start();
}
for (int i = 0; i < expectedNumResults; i++) {
threads[i].join();
}
InternalAggregations aggs = InternalAggregations.from(
Collections.singletonList(new Max("test", (double) number, DocValueFormat.RAW, Collections.emptyMap()))
);
result.aggregations(aggs);
result.setShardIndex(id);
result.size(1);
consumer.consumeResult(result, latch::countDown);
} finally {
result.decRef();
}
});
latch.await();
SearchPhaseController.ReducedQueryPhase reduce = consumer.reduce();
assertAggReduction(request);
Expand Down Expand Up @@ -1354,39 +1337,31 @@ private void testReduceCase(int numShards, int bufferSize, boolean shouldFail) t
)
) {
CountDownLatch latch = new CountDownLatch(numShards);
Thread[] threads = new Thread[numShards];
for (int i = 0; i < numShards; i++) {
final int index = i;
threads[index] = new Thread(() -> {
QuerySearchResult result = new QuerySearchResult(
new ShardSearchContextId(UUIDs.randomBase64UUID(), index),
new SearchShardTarget("node", new ShardId("a", "b", index), null),
null
runInParallel(numShards, index -> {
QuerySearchResult result = new QuerySearchResult(
new ShardSearchContextId(UUIDs.randomBase64UUID(), index),
new SearchShardTarget("node", new ShardId("a", "b", index), null),
null
);
try {
result.topDocs(
new TopDocsAndMaxScore(
new TopDocs(new TotalHits(0, TotalHits.Relation.EQUAL_TO), Lucene.EMPTY_SCORE_DOCS),
Float.NaN
),
new DocValueFormat[0]
);
try {
result.topDocs(
new TopDocsAndMaxScore(
new TopDocs(new TotalHits(0, TotalHits.Relation.EQUAL_TO), Lucene.EMPTY_SCORE_DOCS),
Float.NaN
),
new DocValueFormat[0]
);
InternalAggregations aggs = InternalAggregations.from(
Collections.singletonList(new Max("test", 0d, DocValueFormat.RAW, Collections.emptyMap()))
);
result.aggregations(aggs);
result.setShardIndex(index);
result.size(1);
consumer.consumeResult(result, latch::countDown);
} finally {
result.decRef();
}
});
threads[index].start();
}
for (int i = 0; i < numShards; i++) {
threads[i].join();
}
InternalAggregations aggs = InternalAggregations.from(
Collections.singletonList(new Max("test", 0d, DocValueFormat.RAW, Collections.emptyMap()))
);
result.aggregations(aggs);
result.setShardIndex(index);
result.size(1);
consumer.consumeResult(result, latch::countDown);
} finally {
result.decRef();
}
});
latch.await();
if (shouldFail) {
if (shouldFailPartial == false) {
Expand Down
Loading

0 comments on commit 2dc28ac

Please sign in to comment.