Skip to content

Commit

Permalink
Merge branch 'main' into upgrade_bc
Browse files Browse the repository at this point in the history
  • Loading branch information
jakelandis authored Oct 16, 2023
2 parents dab1392 + e26ad8f commit 928b45d
Show file tree
Hide file tree
Showing 44 changed files with 1,121 additions and 712 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/100862.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 100862
summary: Sending an index name to `DocumentParsingObserver` that is not ever null
area: Ingest Node
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -580,20 +580,10 @@ private void runWaitForShardsTest(
final CountDownLatch actionLatch = new CountDownLatch(1);
final AtomicBoolean success = new AtomicBoolean();

primaryConsumer.accept(primary, new ActionListener<ReplicationResponse>() {

@Override
public void onResponse(final ReplicationResponse replicationResponse) {
success.set(true);
actionLatch.countDown();
}

@Override
public void onFailure(final Exception e) {
failWithException(e);
}

});
primaryConsumer.accept(primary, ActionTestUtils.assertNoFailureListener(ignored -> {
success.set(true);
actionLatch.countDown();
}));
actionLatch.await();
assertTrue(success.get());
afterSync.accept(primary);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,8 @@ public class IndexRecoveryIT extends AbstractIndexRecoveryIntegTestCase {

private static final int MIN_DOC_COUNT = 500;
private static final int MAX_DOC_COUNT = 1000;
private static final int SHARD_COUNT = 1;
private static final int REPLICA_COUNT = 0;
private static final int SHARD_COUNT_1 = 1;
private static final int REPLICA_COUNT_0 = 0;

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
Expand Down Expand Up @@ -249,18 +249,20 @@ public Settings.Builder createRecoverySettingsChunkPerSecond(long chunkSizeBytes
.put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), chunkSizeBytes, ByteSizeUnit.BYTES);
}

private void slowDownRecovery(ByteSizeValue shardSize) {
long chunkSize = Math.max(1, shardSize.getBytes() / 10);
updateClusterSettings(
Settings.builder()
// one chunk per sec..
.put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), chunkSize, ByteSizeUnit.BYTES)
// small chunks
.put(CHUNK_SIZE_SETTING.getKey(), new ByteSizeValue(chunkSize, ByteSizeUnit.BYTES))
);
/**
* Updates the cluster state settings to throttle recovery data transmission to 'dataSize' every 10 seconds.
*
* @param dataSize size in bytes to recover in 10 seconds
*/
private void throttleRecovery10Seconds(ByteSizeValue dataSize) {
long chunkSize = Math.max(1, dataSize.getBytes() / 10);
updateClusterSettings(createRecoverySettingsChunkPerSecond(chunkSize));
}

private void restoreRecoverySpeed() {
/**
* Sets high MB per second throttling for recovery on all nodes in the cluster.
*/
private void unthrottleRecovery() {
updateClusterSettings(
Settings.builder()
// 200mb is an arbitrary number intended to be large enough to avoid more throttling.
Expand Down Expand Up @@ -343,15 +345,15 @@ public void testGatewayRecovery() throws Exception {
logger.info("--> start nodes");
String node = internalCluster().startNode();

createAndPopulateIndex(INDEX_NAME, 1, SHARD_COUNT, REPLICA_COUNT);
createAndPopulateIndex(INDEX_NAME, 1, SHARD_COUNT_1, REPLICA_COUNT_0);

logger.info("--> restarting cluster");
internalCluster().fullRestart();
ensureGreen();

logger.info("--> request recoveries");
RecoveryResponse response = indicesAdmin().prepareRecoveries(INDEX_NAME).execute().actionGet();
assertThat(response.shardRecoveryStates().size(), equalTo(SHARD_COUNT));
assertThat(response.shardRecoveryStates().size(), equalTo(SHARD_COUNT_1));
assertThat(response.shardRecoveryStates().get(INDEX_NAME).size(), equalTo(1));

List<RecoveryState> recoveryStates = response.shardRecoveryStates().get(INDEX_NAME);
Expand All @@ -368,7 +370,7 @@ public void testGatewayRecoveryTestActiveOnly() throws Exception {
logger.info("--> start nodes");
internalCluster().startNode();

createAndPopulateIndex(INDEX_NAME, 1, SHARD_COUNT, REPLICA_COUNT);
createAndPopulateIndex(INDEX_NAME, 1, SHARD_COUNT_1, REPLICA_COUNT_0);

logger.info("--> restarting cluster");
internalCluster().fullRestart();
Expand All @@ -383,7 +385,7 @@ public void testGatewayRecoveryTestActiveOnly() throws Exception {

public void testReplicaRecovery() throws Exception {
final String nodeA = internalCluster().startNode();
createIndex(INDEX_NAME, SHARD_COUNT, REPLICA_COUNT);
createIndex(INDEX_NAME, SHARD_COUNT_1, REPLICA_COUNT_0);
ensureGreen(INDEX_NAME);

final int numOfDocs = scaledRandomIntBetween(0, 200);
Expand Down Expand Up @@ -535,7 +537,7 @@ public void testRerouteRecovery() throws Exception {
final String nodeA = internalCluster().startNode();

logger.info("--> create index on node: {}", nodeA);
ByteSizeValue shardSize = createAndPopulateIndex(INDEX_NAME, 1, SHARD_COUNT, REPLICA_COUNT).getShards()[0].getStats()
ByteSizeValue shardSize = createAndPopulateIndex(INDEX_NAME, 1, SHARD_COUNT_1, REPLICA_COUNT_0).getShards()[0].getStats()
.getStore()
.size();

Expand All @@ -545,7 +547,7 @@ public void testRerouteRecovery() throws Exception {
ensureGreen();

logger.info("--> slowing down recoveries");
slowDownRecovery(shardSize);
throttleRecovery10Seconds(shardSize);

logger.info("--> move shard from: {} to: {}", nodeA, nodeB);
clusterAdmin().prepareReroute().add(new MoveAllocationCommand(INDEX_NAME, 0, nodeA, nodeB)).execute().actionGet().getState();
Expand Down Expand Up @@ -592,7 +594,7 @@ public void testRerouteRecovery() throws Exception {
}

logger.info("--> speeding up recoveries");
restoreRecoverySpeed();
unthrottleRecovery();

// wait for it to be finished
ensureGreen();
Expand Down Expand Up @@ -631,7 +633,7 @@ public void testRerouteRecovery() throws Exception {
assertFalse(clusterAdmin().prepareHealth().setWaitForNodes("3").get().isTimedOut());

logger.info("--> slowing down recoveries");
slowDownRecovery(shardSize);
throttleRecovery10Seconds(shardSize);

logger.info("--> move replica shard from: {} to: {}", nodeA, nodeC);
clusterAdmin().prepareReroute().add(new MoveAllocationCommand(INDEX_NAME, 0, nodeA, nodeC)).execute().actionGet().getState();
Expand Down Expand Up @@ -679,7 +681,7 @@ public void testRerouteRecovery() throws Exception {
}

logger.info("--> speeding up recoveries");
restoreRecoverySpeed();
unthrottleRecovery();
ensureGreen();

response = indicesAdmin().prepareRecoveries(INDEX_NAME).execute().actionGet();
Expand Down Expand Up @@ -711,7 +713,7 @@ public void testSourceThrottling() throws Exception {
final String nodeA = internalCluster().startNode();

logger.info("--> creating index on node A");
ByteSizeValue shardSize = createAndPopulateIndex(INDEX_NAME, 1, SHARD_COUNT, REPLICA_COUNT).getShards()[0].getStats()
ByteSizeValue shardSize = createAndPopulateIndex(INDEX_NAME, 1, SHARD_COUNT_1, REPLICA_COUNT_0).getShards()[0].getStats()
.getStore()
.size();

Expand Down Expand Up @@ -759,7 +761,7 @@ public Settings onNodeStopped(String nodeName) {
});

logger.info("--> increasing the recovery throttle limit so that the shard recovery completes quickly");
restoreRecoverySpeed();
unthrottleRecovery();

logger.info("--> waiting for the shard recovery to complete");
ensureGreen();
Expand All @@ -782,7 +784,7 @@ public void testTargetThrottling() throws Exception {
final String nodeA = internalCluster().startNode();

logger.info("--> creating index on node A");
ByteSizeValue shardSize = createAndPopulateIndex(INDEX_NAME, 1, SHARD_COUNT, REPLICA_COUNT).getShards()[0].getStats()
ByteSizeValue shardSize = createAndPopulateIndex(INDEX_NAME, 1, SHARD_COUNT_1, REPLICA_COUNT_0).getShards()[0].getStats()
.getStore()
.size();

Expand Down Expand Up @@ -821,7 +823,7 @@ public void testTargetThrottling() throws Exception {
});

logger.info("--> increasing the recovery throttle limit so that the shard recovery completes quickly");
restoreRecoverySpeed();
unthrottleRecovery();

logger.info("--> waiting for the shard recovery to complete");
ensureGreen();
Expand All @@ -845,7 +847,7 @@ public void testSnapshotRecovery() throws Exception {
ensureGreen();

logger.info("--> create index on node: {}", nodeA);
createAndPopulateIndex(INDEX_NAME, 1, SHARD_COUNT, REPLICA_COUNT);
createAndPopulateIndex(INDEX_NAME, 1, SHARD_COUNT_1, REPLICA_COUNT_0);

logger.info("--> snapshot");
CreateSnapshotResponse createSnapshotResponse = createSnapshot(INDEX_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@

import org.elasticsearch.action.ActionListener;

import java.io.Closeable;
import java.util.Map;
import java.util.Set;

public interface InferenceService {
public interface InferenceService extends Closeable {

String name();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -722,7 +722,8 @@ public void onFailure(Exception e) {
executePipelines(pipelines, indexRequest, ingestDocument, documentListener);
indexRequest.setPipelinesHaveRun();

documentParsingObserver.setIndexName(indexRequest.index());
assert actionRequest.index() != null;
documentParsingObserver.setIndexName(actionRequest.index());
documentParsingObserver.close();

i++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ActionTestUtils;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlocks;
Expand Down Expand Up @@ -156,14 +157,6 @@ void createIndex(String index, TimeValue timeout, ActionListener<CreateIndexResp
}
}
};
action.doExecute(null, bulkRequest, new ActionListener<>() {
@Override
public void onResponse(BulkResponse bulkItemResponses) {}

@Override
public void onFailure(Exception e) {
throw new AssertionError(e);
}
});
action.doExecute(null, bulkRequest, ActionTestUtils.assertNoFailureListener(bulkItemResponse -> {}));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.LatchedActionListener;
import org.elasticsearch.action.support.ActionTestUtils;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
import org.elasticsearch.cluster.node.DiscoveryNodes;
Expand Down Expand Up @@ -40,18 +41,13 @@ public void testClearAll() throws InterruptedException {
DiscoveryNode node3 = DiscoveryNodeUtils.create("node_3");
DiscoveryNodes nodes = DiscoveryNodes.builder().add(node1).add(node2).add(node3).build();
CountDownLatch latch = new CountDownLatch(1);
ActionListener<ClearScrollResponse> listener = new LatchedActionListener<>(new ActionListener<ClearScrollResponse>() {
@Override
public void onResponse(ClearScrollResponse clearScrollResponse) {
ActionListener<ClearScrollResponse> listener = new LatchedActionListener<>(
ActionTestUtils.assertNoFailureListener(clearScrollResponse -> {
assertEquals(3, clearScrollResponse.getNumFreed());
assertTrue(clearScrollResponse.isSucceeded());
}

@Override
public void onFailure(Exception e) {
throw new AssertionError(e);
}
}, latch);
}),
latch
);
List<DiscoveryNode> nodesInvoked = new CopyOnWriteArrayList<>();
SearchTransportService searchTransportService = new SearchTransportService(null, null, null) {
@Override
Expand Down Expand Up @@ -103,18 +99,13 @@ public void testClearScrollIds() throws IOException, InterruptedException {
String scrollId = TransportSearchHelper.buildScrollId(array);
DiscoveryNodes nodes = DiscoveryNodes.builder().add(node1).add(node2).add(node3).build();
CountDownLatch latch = new CountDownLatch(1);
ActionListener<ClearScrollResponse> listener = new LatchedActionListener<>(new ActionListener<ClearScrollResponse>() {
@Override
public void onResponse(ClearScrollResponse clearScrollResponse) {
ActionListener<ClearScrollResponse> listener = new LatchedActionListener<>(
ActionTestUtils.assertNoFailureListener(clearScrollResponse -> {
assertEquals(numFreed.get(), clearScrollResponse.getNumFreed());
assertTrue(clearScrollResponse.isSucceeded());
}

@Override
public void onFailure(Exception e) {
throw new AssertionError(e);
}
}, latch);
}),
latch
);
List<DiscoveryNode> nodesInvoked = new CopyOnWriteArrayList<>();
SearchTransportService searchTransportService = new SearchTransportService(null, null, null) {

Expand Down Expand Up @@ -178,22 +169,17 @@ public void testClearScrollIdsWithFailure() throws IOException, InterruptedExcep
DiscoveryNodes nodes = DiscoveryNodes.builder().add(node1).add(node2).add(node3).build();
CountDownLatch latch = new CountDownLatch(1);

ActionListener<ClearScrollResponse> listener = new LatchedActionListener<>(new ActionListener<ClearScrollResponse>() {
@Override
public void onResponse(ClearScrollResponse clearScrollResponse) {
ActionListener<ClearScrollResponse> listener = new LatchedActionListener<>(
ActionTestUtils.assertNoFailureListener(clearScrollResponse -> {
assertEquals(numFreed.get(), clearScrollResponse.getNumFreed());
if (numFailures.get() > 0) {
assertFalse(clearScrollResponse.isSucceeded());
} else {
assertTrue(clearScrollResponse.isSucceeded());
}
}

@Override
public void onFailure(Exception e) {
throw new AssertionError(e);
}
}, latch);
}),
latch
);
List<DiscoveryNode> nodesInvoked = new CopyOnWriteArrayList<>();
SearchTransportService searchTransportService = new SearchTransportService(null, null, null) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package org.elasticsearch.action.search;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionTestUtils;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
import org.elasticsearch.cluster.node.DiscoveryNodes;
Expand Down Expand Up @@ -461,16 +462,6 @@ private static ParsedScrollId getParsedScrollId(SearchContextIdForNode... idsFor
}

private ActionListener<SearchResponse> dummyListener() {
return new ActionListener<SearchResponse>() {
@Override
public void onResponse(SearchResponse response) {
fail("dummy");
}

@Override
public void onFailure(Exception e) {
throw new AssertionError(e);
}
};
return ActionTestUtils.assertNoFailureListener(response -> fail("dummy"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,7 @@ public class CountDownActionListenerTests extends ESTestCase {

public void testNotifications() throws InterruptedException {
AtomicBoolean called = new AtomicBoolean(false);
ActionListener<Void> result = new ActionListener<>() {
@Override
public void onResponse(Void ignored) {
called.set(true);
}

@Override
public void onFailure(Exception e) {
throw new AssertionError(e);
}
};
ActionListener<Void> result = ActionTestUtils.assertNoFailureListener(ignored -> called.set(true));
final int groupSize = randomIntBetween(10, 1000);
AtomicInteger count = new AtomicInteger();
CountDownActionListener listener = new CountDownActionListener(groupSize, result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,7 @@ public class GroupedActionListenerTests extends ESTestCase {

public void testNotifications() throws InterruptedException {
AtomicReference<Collection<Integer>> resRef = new AtomicReference<>();
ActionListener<Collection<Integer>> result = new ActionListener<Collection<Integer>>() {
@Override
public void onResponse(Collection<Integer> integers) {
resRef.set(integers);
}

@Override
public void onFailure(Exception e) {
throw new AssertionError(e);
}
};
ActionListener<Collection<Integer>> result = ActionTestUtils.assertNoFailureListener(resRef::set);
final int groupSize = randomIntBetween(10, 1000);
AtomicInteger count = new AtomicInteger();
GroupedActionListener<Integer> listener = new GroupedActionListener<>(groupSize, result);
Expand Down
Loading

0 comments on commit 928b45d

Please sign in to comment.