void transformSearchPhaseResults(
final String currentPhase,
final String nextPhase
) {
- pipeline.runSearchPhaseResultsTransformer(searchPhaseResult, searchPhaseContext, currentPhase, nextPhase);
+ pipeline.runSearchPhaseResultsTransformer(searchPhaseResult, searchPhaseContext, currentPhase, nextPhase, requestContext);
}
// Visible for testing
diff --git a/server/src/main/java/org/opensearch/search/pipeline/Processor.java b/server/src/main/java/org/opensearch/search/pipeline/Processor.java
index 0120d68ceb5aa..a06383fbe9cef 100644
--- a/server/src/main/java/org/opensearch/search/pipeline/Processor.java
+++ b/server/src/main/java/org/opensearch/search/pipeline/Processor.java
@@ -21,13 +21,6 @@
* @opensearch.internal
*/
public interface Processor {
- /**
- * Processor configuration key to let the factory know the context for pipeline creation.
- *
- * See {@link PipelineSource}.
- */
- String PIPELINE_SOURCE = "pipeline_source";
-
/**
* Gets the type of processor
*/
diff --git a/server/src/main/java/org/opensearch/search/pipeline/SearchPhaseResultsProcessor.java b/server/src/main/java/org/opensearch/search/pipeline/SearchPhaseResultsProcessor.java
index 772dc8758bace..a64266cfb2a2b 100644
--- a/server/src/main/java/org/opensearch/search/pipeline/SearchPhaseResultsProcessor.java
+++ b/server/src/main/java/org/opensearch/search/pipeline/SearchPhaseResultsProcessor.java
@@ -32,6 +32,22 @@ void process(
final SearchPhaseContext searchPhaseContext
);
+ /**
+ * Processes the {@link SearchPhaseResults} obtained from a SearchPhase which will be returned to next
+ * SearchPhase. Receives the {@link PipelineProcessingContext} passed to other processors.
+ * @param searchPhaseResult {@link SearchPhaseResults}
+ * @param searchPhaseContext {@link SearchContext}
+ * @param requestContext {@link PipelineProcessingContext}
+ * @param {@link SearchPhaseResult}
+ */
+ default void process(
+ final SearchPhaseResults searchPhaseResult,
+ final SearchPhaseContext searchPhaseContext,
+ final PipelineProcessingContext requestContext
+ ) {
+ process(searchPhaseResult, searchPhaseContext);
+ }
+
/**
* The phase which should have run before, this processor can start executing.
* @return {@link SearchPhaseName}
diff --git a/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java b/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java
index 580fe1b7c4216..2175b5d135394 100644
--- a/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java
+++ b/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java
@@ -408,7 +408,8 @@ public PipelinedRequest resolvePipeline(SearchRequest searchRequest) {
pipeline = pipelineHolder.pipeline;
}
}
- return new PipelinedRequest(pipeline, searchRequest);
+ PipelineProcessingContext requestContext = new PipelineProcessingContext();
+ return new PipelinedRequest(pipeline, searchRequest, requestContext);
}
Map> getRequestProcessorFactories() {
diff --git a/server/src/main/java/org/opensearch/search/pipeline/SearchRequestProcessor.java b/server/src/main/java/org/opensearch/search/pipeline/SearchRequestProcessor.java
index 427c9e4ab694c..30adc9b0afbe8 100644
--- a/server/src/main/java/org/opensearch/search/pipeline/SearchRequestProcessor.java
+++ b/server/src/main/java/org/opensearch/search/pipeline/SearchRequestProcessor.java
@@ -15,18 +15,27 @@
* Interface for a search pipeline processor that modifies a search request.
*/
public interface SearchRequestProcessor extends Processor {
-
/**
- * Transform a {@link SearchRequest}. Executed on the coordinator node before any {@link org.opensearch.action.search.SearchPhase}
- * executes.
- *
+ * Process a SearchRequest without receiving request-scoped state.
* Implement this method if the processor makes no asynchronous calls.
- * @param request the executed {@link SearchRequest}
- * @return a new {@link SearchRequest} (or the input {@link SearchRequest} if no changes)
- * @throws Exception if an error occurs during processing
+ * @param request the search request (which may have been modified by an earlier processor)
+ * @return the modified search request
+ * @throws Exception implementation-specific processing exception
*/
SearchRequest processRequest(SearchRequest request) throws Exception;
+ /**
+ * Process a SearchRequest, with request-scoped state shared across processors in the pipeline
+ * Implement this method if the processor makes no asynchronous calls.
+ * @param request the search request (which may have been modified by an earlier processor)
+ * @param requestContext request-scoped state shared across processors in the pipeline
+ * @return the modified search request
+ * @throws Exception implementation-specific processing exception
+ */
+ default SearchRequest processRequest(SearchRequest request, PipelineProcessingContext requestContext) throws Exception {
+ return processRequest(request);
+ }
+
/**
* Transform a {@link SearchRequest}. Executed on the coordinator node before any {@link org.opensearch.action.search.SearchPhase}
* executes.
@@ -35,9 +44,13 @@ public interface SearchRequestProcessor extends Processor {
* @param request the executed {@link SearchRequest}
* @param requestListener callback to be invoked on successful processing or on failure
*/
- default void processRequestAsync(SearchRequest request, ActionListener requestListener) {
+ default void processRequestAsync(
+ SearchRequest request,
+ PipelineProcessingContext requestContext,
+ ActionListener requestListener
+ ) {
try {
- requestListener.onResponse(processRequest(request));
+ requestListener.onResponse(processRequest(request, requestContext));
} catch (Exception e) {
requestListener.onFailure(e);
}
diff --git a/server/src/main/java/org/opensearch/search/pipeline/SearchResponseProcessor.java b/server/src/main/java/org/opensearch/search/pipeline/SearchResponseProcessor.java
index 21136ce208fee..98591ab9d0def 100644
--- a/server/src/main/java/org/opensearch/search/pipeline/SearchResponseProcessor.java
+++ b/server/src/main/java/org/opensearch/search/pipeline/SearchResponseProcessor.java
@@ -21,24 +21,47 @@ public interface SearchResponseProcessor extends Processor {
* Transform a {@link SearchResponse}, possibly based on the executed {@link SearchRequest}.
*
* Implement this method if the processor makes no asynchronous calls.
- * @param request the executed {@link SearchRequest}
+ *
+ * @param request the executed {@link SearchRequest}
* @param response the current {@link SearchResponse}, possibly modified by earlier processors
* @return a modified {@link SearchResponse} (or the input {@link SearchResponse} if no changes)
* @throws Exception if an error occurs during processing
*/
SearchResponse processResponse(SearchRequest request, SearchResponse response) throws Exception;
+ /**
+ * Process a SearchResponse, with request-scoped state shared across processors in the pipeline
+ *
+ * Implement this method if the processor makes no asynchronous calls.
+ *
+ * @param request the (maybe transformed) search request
+ * @param response the search response (which may have been modified by an earlier processor)
+ * @param requestContext request-scoped state shared across processors in the pipeline
+ * @return the modified search response
+ * @throws Exception implementation-specific processing exception
+ */
+ default SearchResponse processResponse(SearchRequest request, SearchResponse response, PipelineProcessingContext requestContext)
+ throws Exception {
+ return processResponse(request, response);
+ }
+
/**
* Transform a {@link SearchResponse}, possibly based on the executed {@link SearchRequest}.
*
* Expert method: Implement this if the processor needs to make asynchronous calls. Otherwise, implement processResponse.
- * @param request the executed {@link SearchRequest}
- * @param response the current {@link SearchResponse}, possibly modified by earlier processors
+ *
+ * @param request the executed {@link SearchRequest}
+ * @param response the current {@link SearchResponse}, possibly modified by earlier processors
* @param responseListener callback to be invoked on successful processing or on failure
*/
- default void processResponseAsync(SearchRequest request, SearchResponse response, ActionListener responseListener) {
+ default void processResponseAsync(
+ SearchRequest request,
+ SearchResponse response,
+ PipelineProcessingContext requestContext,
+ ActionListener responseListener
+ ) {
try {
- responseListener.onResponse(processResponse(request, response));
+ responseListener.onResponse(processResponse(request, response, requestContext));
} catch (Exception e) {
responseListener.onFailure(e);
}
diff --git a/server/src/main/java/org/opensearch/search/pipeline/StatefulSearchRequestProcessor.java b/server/src/main/java/org/opensearch/search/pipeline/StatefulSearchRequestProcessor.java
new file mode 100644
index 0000000000000..67e1c1147cb87
--- /dev/null
+++ b/server/src/main/java/org/opensearch/search/pipeline/StatefulSearchRequestProcessor.java
@@ -0,0 +1,25 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.search.pipeline;
+
+import org.opensearch.action.search.SearchRequest;
+
+/**
+ * A specialization of {@link SearchRequestProcessor} that makes use of the request-scoped processor state.
+ * Implementors must implement the processRequest method that accepts request-scoped processor state.
+ */
+public interface StatefulSearchRequestProcessor extends SearchRequestProcessor {
+ @Override
+ default SearchRequest processRequest(SearchRequest request) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ SearchRequest processRequest(SearchRequest request, PipelineProcessingContext requestContext) throws Exception;
+}
diff --git a/server/src/main/java/org/opensearch/search/pipeline/StatefulSearchResponseProcessor.java b/server/src/main/java/org/opensearch/search/pipeline/StatefulSearchResponseProcessor.java
new file mode 100644
index 0000000000000..f0842d24e1b56
--- /dev/null
+++ b/server/src/main/java/org/opensearch/search/pipeline/StatefulSearchResponseProcessor.java
@@ -0,0 +1,27 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.search.pipeline;
+
+import org.opensearch.action.search.SearchRequest;
+import org.opensearch.action.search.SearchResponse;
+
+/**
+ * A specialization of {@link SearchResponseProcessor} that makes use of the request-scoped processor state.
+ * Implementors must implement the processResponse method that accepts request-scoped processor state.
+ */
+public interface StatefulSearchResponseProcessor extends SearchResponseProcessor {
+ @Override
+ default SearchResponse processResponse(SearchRequest request, SearchResponse response) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ SearchResponse processResponse(SearchRequest request, SearchResponse response, PipelineProcessingContext requestContext)
+ throws Exception;
+}
diff --git a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java
index c825ecc8abe9f..12052598d3671 100644
--- a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java
+++ b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java
@@ -231,6 +231,7 @@ public ThreadPool(
final Map builders = new HashMap<>();
final int allocatedProcessors = OpenSearchExecutors.allocatedProcessors(settings);
+ final int halfProc = halfAllocatedProcessors(allocatedProcessors);
final int halfProcMaxAt5 = halfAllocatedProcessorsMaxFive(allocatedProcessors);
final int halfProcMaxAt10 = halfAllocatedProcessorsMaxTen(allocatedProcessors);
final int genericThreadPoolMax = boundedBy(4 * allocatedProcessors, 128, 512);
@@ -264,13 +265,13 @@ public ThreadPool(
builders.put(Names.SYSTEM_WRITE, new FixedExecutorBuilder(settings, Names.SYSTEM_WRITE, halfProcMaxAt5, 1000, false));
builders.put(
Names.TRANSLOG_TRANSFER,
- new ScalingExecutorBuilder(Names.TRANSLOG_TRANSFER, 1, halfProcMaxAt10, TimeValue.timeValueMinutes(5))
+ new ScalingExecutorBuilder(Names.TRANSLOG_TRANSFER, 1, halfProc, TimeValue.timeValueMinutes(5))
);
builders.put(Names.TRANSLOG_SYNC, new FixedExecutorBuilder(settings, Names.TRANSLOG_SYNC, allocatedProcessors * 4, 10000));
- builders.put(Names.REMOTE_PURGE, new ScalingExecutorBuilder(Names.REMOTE_PURGE, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));
+ builders.put(Names.REMOTE_PURGE, new ScalingExecutorBuilder(Names.REMOTE_PURGE, 1, halfProc, TimeValue.timeValueMinutes(5)));
builders.put(
Names.REMOTE_REFRESH_RETRY,
- new ScalingExecutorBuilder(Names.REMOTE_REFRESH_RETRY, 1, halfProcMaxAt10, TimeValue.timeValueMinutes(5))
+ new ScalingExecutorBuilder(Names.REMOTE_REFRESH_RETRY, 1, halfProc, TimeValue.timeValueMinutes(5))
);
builders.put(
Names.REMOTE_RECOVERY,
@@ -555,6 +556,10 @@ static int boundedBy(int value, int min, int max) {
return Math.min(max, Math.max(min, value));
}
+ static int halfAllocatedProcessors(int allocatedProcessors) {
+ return (allocatedProcessors + 1) / 2;
+ }
+
static int halfAllocatedProcessorsMaxFive(final int allocatedProcessors) {
return boundedBy((allocatedProcessors + 1) / 2, 1, 5);
}
diff --git a/server/src/test/java/org/opensearch/action/search/TransportDeletePitActionTests.java b/server/src/test/java/org/opensearch/action/search/TransportDeletePitActionTests.java
index 8d3cdc070c695..a1e3a2b03caf7 100644
--- a/server/src/test/java/org/opensearch/action/search/TransportDeletePitActionTests.java
+++ b/server/src/test/java/org/opensearch/action/search/TransportDeletePitActionTests.java
@@ -21,6 +21,7 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
+import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.tasks.TaskId;
import org.opensearch.index.query.IdsQueryBuilder;
import org.opensearch.index.query.MatchAllQueryBuilder;
@@ -33,6 +34,7 @@
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.RemoteClusterConnectionTests;
import org.opensearch.transport.Transport;
+import org.opensearch.transport.TransportService;
import org.junit.Before;
import java.util.ArrayList;
@@ -262,6 +264,46 @@ public void getAllPits(ActionListener getAllPitsListener
}
}
+ public void testDeleteAllPITSuccessWhenNoPITsExist() throws InterruptedException, ExecutionException {
+ ActionFilters actionFilters = mock(ActionFilters.class);
+ when(actionFilters.filters()).thenReturn(new ActionFilter[0]);
+ List knownNodes = new CopyOnWriteArrayList<>();
+ try (MockTransportService cluster1Transport = startTransport("cluster_1_node", knownNodes, Version.CURRENT)) {
+ knownNodes.add(cluster1Transport.getLocalDiscoNode());
+ TransportService mockTransportService = mock(TransportService.class);
+ PitService pitService = new PitService(clusterServiceMock, mock(SearchTransportService.class), mockTransportService, client) {
+ @Override
+ public void getAllPits(ActionListener getAllPitsListener) {
+ List list = new ArrayList<>();
+ GetAllPitNodeResponse getAllPitNodeResponse = new GetAllPitNodeResponse(cluster1Transport.getLocalDiscoNode(), list);
+ List nodeList = new ArrayList();
+ nodeList.add(getAllPitNodeResponse);
+ getAllPitsListener.onResponse(new GetAllPitNodesResponse(new ClusterName("cn"), nodeList, new ArrayList()));
+ }
+ };
+ TransportDeletePitAction action = new TransportDeletePitAction(
+ mockTransportService,
+ actionFilters,
+ namedWriteableRegistry,
+ pitService
+ );
+ DeletePitRequest deletePITRequest = new DeletePitRequest("_all");
+ ActionListener listener = new ActionListener() {
+ @Override
+ public void onResponse(DeletePitResponse deletePitResponse) {
+ assertEquals(RestStatus.OK, deletePitResponse.status());
+ assertEquals(0, deletePitResponse.getDeletePitResults().size());
+ }
+
+ @Override
+ public void onFailure(Exception e) {
+ fail("Should not receive Exception");
+ }
+ };
+ action.execute(task, deletePITRequest, listener);
+ }
+ }
+
public void testDeletePitWhenNodeIsDown() throws InterruptedException, ExecutionException {
List deleteNodesInvoked = new CopyOnWriteArrayList<>();
ActionFilters actionFilters = mock(ActionFilters.class);
diff --git a/server/src/test/java/org/opensearch/cluster/metadata/MetadataCreateIndexServiceTests.java b/server/src/test/java/org/opensearch/cluster/metadata/MetadataCreateIndexServiceTests.java
index e40826915c848..cea151748bfb6 100644
--- a/server/src/test/java/org/opensearch/cluster/metadata/MetadataCreateIndexServiceTests.java
+++ b/server/src/test/java/org/opensearch/cluster/metadata/MetadataCreateIndexServiceTests.java
@@ -71,6 +71,7 @@
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.query.QueryShardContext;
import org.opensearch.index.translog.Translog;
+import org.opensearch.indices.IndexCreationException;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.InvalidAliasNameException;
import org.opensearch.indices.InvalidIndexNameException;
@@ -117,6 +118,7 @@
import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_ROUTING_SHARDS_SETTING;
import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING;
import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_READ_ONLY_BLOCK;
+import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_REPLICATION_TYPE_SETTING;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_READ_ONLY;
@@ -136,6 +138,7 @@
import static org.opensearch.index.IndexSettings.INDEX_SOFT_DELETES_SETTING;
import static org.opensearch.index.IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING;
import static org.opensearch.indices.IndicesService.CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING;
+import static org.opensearch.indices.IndicesService.CLUSTER_INDEX_RESTRICT_REPLICATION_TYPE_SETTING;
import static org.opensearch.indices.IndicesService.CLUSTER_MINIMUM_INDEX_REFRESH_INTERVAL_SETTING;
import static org.opensearch.indices.IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING;
import static org.opensearch.indices.IndicesService.CLUSTER_REPLICATION_TYPE_SETTING;
@@ -165,6 +168,9 @@ public class MetadataCreateIndexServiceTests extends OpenSearchTestCase {
private static final String translogRepositoryNameAttributeKey = NODE_ATTRIBUTES.getKey()
+ REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY;
+ final String REPLICATION_MISMATCH_VALIDATION_ERROR =
+ "Validation Failed: 1: index setting [index.replication.type] is not allowed to be set as [cluster.index.restrict.replication.type=true];";
+
@Before
public void setup() throws Exception {
super.setUp();
@@ -1217,6 +1223,126 @@ public void testvalidateIndexSettings() {
threadPool.shutdown();
}
+ public void testIndexTemplateReplicationType() {
+ Settings templateSettings = Settings.builder().put(INDEX_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.SEGMENT).build();
+
+ request = new CreateIndexClusterStateUpdateRequest("create index", "test", "test");
+ final Settings.Builder requestSettings = Settings.builder();
+ request.settings(requestSettings.build());
+ Settings indexSettings = aggregateIndexSettings(
+ ClusterState.EMPTY_STATE,
+ request,
+ templateSettings,
+ null,
+ Settings.EMPTY,
+ IndexScopedSettings.DEFAULT_SCOPED_SETTINGS,
+ randomShardLimitService(),
+ Collections.emptySet(),
+ clusterSettings
+ );
+ assertNotEquals(ReplicationType.SEGMENT, clusterSettings.get(CLUSTER_REPLICATION_TYPE_SETTING));
+ assertEquals(ReplicationType.SEGMENT.toString(), indexSettings.get(INDEX_REPLICATION_TYPE_SETTING.getKey()));
+ }
+
+ public void testClusterForceReplicationTypeInAggregateSettings() {
+ Settings settings = Settings.builder()
+ .put(CLUSTER_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.SEGMENT)
+ .put(CLUSTER_INDEX_RESTRICT_REPLICATION_TYPE_SETTING.getKey(), true)
+ .build();
+ ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
+ Settings nonMatchingReplicationIndexSettings = Settings.builder()
+ .put(INDEX_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.DOCUMENT)
+ .build();
+ request = new CreateIndexClusterStateUpdateRequest("create index", "test", "test");
+ request.settings(nonMatchingReplicationIndexSettings);
+ IndexCreationException exception = expectThrows(
+ IndexCreationException.class,
+ () -> aggregateIndexSettings(
+ ClusterState.EMPTY_STATE,
+ request,
+ Settings.EMPTY,
+ null,
+ Settings.EMPTY,
+ IndexScopedSettings.DEFAULT_SCOPED_SETTINGS,
+ randomShardLimitService(),
+ Collections.emptySet(),
+ clusterSettings
+ )
+ );
+ assertEquals(REPLICATION_MISMATCH_VALIDATION_ERROR, exception.getCause().getMessage());
+
+ Settings matchingReplicationIndexSettings = Settings.builder()
+ .put(INDEX_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.SEGMENT)
+ .build();
+ request.settings(matchingReplicationIndexSettings);
+ Settings aggregateIndexSettings = aggregateIndexSettings(
+ ClusterState.EMPTY_STATE,
+ request,
+ Settings.EMPTY,
+ null,
+ Settings.EMPTY,
+ IndexScopedSettings.DEFAULT_SCOPED_SETTINGS,
+ randomShardLimitService(),
+ Collections.emptySet(),
+ clusterSettings
+ );
+ assertEquals(ReplicationType.SEGMENT.toString(), aggregateIndexSettings.get(INDEX_REPLICATION_TYPE_SETTING.getKey()));
+ }
+
+ public void testClusterForceReplicationTypeInValidateIndexSettings() {
+ ClusterService clusterService = mock(ClusterService.class);
+ Metadata metadata = Metadata.builder()
+ .transientSettings(Settings.builder().put(Metadata.DEFAULT_REPLICA_COUNT_SETTING.getKey(), 1).build())
+ .build();
+ ClusterState clusterState = ClusterState.builder(org.opensearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
+ .metadata(metadata)
+ .build();
+ ThreadPool threadPool = new TestThreadPool(getTestName());
+ // Enforce cluster level replication type setting
+ final Settings forceClusterSettingEnabled = Settings.builder()
+ .put(CLUSTER_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.SEGMENT)
+ .put(CLUSTER_INDEX_RESTRICT_REPLICATION_TYPE_SETTING.getKey(), true)
+ .build();
+ ClusterSettings clusterSettings = new ClusterSettings(forceClusterSettingEnabled, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
+ when(clusterService.getSettings()).thenReturn(forceClusterSettingEnabled);
+ when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
+ when(clusterService.state()).thenReturn(clusterState);
+
+ final MetadataCreateIndexService checkerService = new MetadataCreateIndexService(
+ forceClusterSettingEnabled,
+ clusterService,
+ null,
+ null,
+ null,
+ createTestShardLimitService(randomIntBetween(1, 1000), false, clusterService),
+ new Environment(Settings.builder().put("path.home", "dummy").build(), null),
+ IndexScopedSettings.DEFAULT_SCOPED_SETTINGS,
+ threadPool,
+ null,
+ new SystemIndices(Collections.emptyMap()),
+ true,
+ new AwarenessReplicaBalance(forceClusterSettingEnabled, clusterService.getClusterSettings())
+ );
+ // Use DOCUMENT replication type setting for index creation
+ final Settings indexSettings = Settings.builder().put(INDEX_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.DOCUMENT).build();
+
+ IndexCreationException exception = expectThrows(
+ IndexCreationException.class,
+ () -> checkerService.validateIndexSettings("test", indexSettings, false)
+ );
+ assertEquals(REPLICATION_MISMATCH_VALIDATION_ERROR, exception.getCause().getMessage());
+
+ // Cluster level replication type setting not enforced
+ final Settings forceClusterSettingDisabled = Settings.builder()
+ .put(CLUSTER_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.SEGMENT)
+ .put(CLUSTER_INDEX_RESTRICT_REPLICATION_TYPE_SETTING.getKey(), false)
+ .build();
+ clusterSettings = new ClusterSettings(forceClusterSettingDisabled, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
+ when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
+ checkerService.validateIndexSettings("test", indexSettings, false);
+ threadPool.shutdown();
+ }
+
public void testRemoteStoreNoUserOverrideExceptReplicationTypeSegmentIndexSettings() {
Settings settings = Settings.builder()
.put(CLUSTER_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.DOCUMENT)
diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/TargetPoolAllocationDeciderTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/TargetPoolAllocationDeciderTests.java
index 8f2db5db969d2..052c7877404a8 100644
--- a/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/TargetPoolAllocationDeciderTests.java
+++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/TargetPoolAllocationDeciderTests.java
@@ -200,4 +200,88 @@ public void testTargetPoolDedicatedSearchNodeAllocationDecisions() {
assertEquals(Decision.YES.type(), deciders.shouldAutoExpandToNode(localIdx, localOnlyNode.node(), globalAllocation).type());
assertEquals(Decision.YES.type(), deciders.shouldAutoExpandToNode(remoteIdx, remoteCapableNode.node(), globalAllocation).type());
}
+
+ public void testDebugMessage() {
+ ClusterState clusterState = createInitialCluster(3, 3, true, 2, 2);
+ AllocationService service = this.createRemoteCapableAllocationService();
+ clusterState = allocateShardsAndBalance(clusterState, service);
+
+ // Add an unassigned primary shard for force allocation checks
+ Metadata metadata = Metadata.builder(clusterState.metadata())
+ .put(IndexMetadata.builder("test_local_unassigned").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1))
+ .build();
+ RoutingTable routingTable = RoutingTable.builder(clusterState.routingTable())
+ .addAsNew(metadata.index("test_local_unassigned"))
+ .build();
+ clusterState = ClusterState.builder(clusterState).metadata(metadata).routingTable(routingTable).build();
+
+ // Add remote index unassigned primary
+ clusterState = createRemoteIndex(clusterState, "test_remote_unassigned");
+
+ RoutingNodes defaultRoutingNodes = clusterState.getRoutingNodes();
+ RoutingAllocation globalAllocation = getRoutingAllocation(clusterState, defaultRoutingNodes);
+ globalAllocation.setDebugMode(RoutingAllocation.DebugMode.ON);
+
+ ShardRouting localShard = clusterState.routingTable()
+ .allShards(getIndexName(0, false))
+ .stream()
+ .filter(ShardRouting::primary)
+ .collect(Collectors.toList())
+ .get(0);
+ ShardRouting remoteShard = clusterState.routingTable()
+ .allShards(getIndexName(0, true))
+ .stream()
+ .filter(ShardRouting::primary)
+ .collect(Collectors.toList())
+ .get(0);
+ ShardRouting unassignedLocalShard = clusterState.routingTable()
+ .allShards("test_local_unassigned")
+ .stream()
+ .filter(ShardRouting::primary)
+ .collect(Collectors.toList())
+ .get(0);
+ ShardRouting unassignedRemoteShard = clusterState.routingTable()
+ .allShards("test_remote_unassigned")
+ .stream()
+ .filter(ShardRouting::primary)
+ .collect(Collectors.toList())
+ .get(0);
+ IndexMetadata localIdx = globalAllocation.metadata().getIndexSafe(localShard.index());
+ IndexMetadata remoteIdx = globalAllocation.metadata().getIndexSafe(remoteShard.index());
+ String localNodeId = LOCAL_NODE_PREFIX;
+ for (RoutingNode routingNode : globalAllocation.routingNodes()) {
+ if (routingNode.nodeId().startsWith(LOCAL_NODE_PREFIX)) {
+ localNodeId = routingNode.nodeId();
+ break;
+ }
+ }
+ String remoteNodeId = remoteShard.currentNodeId();
+ RoutingNode localOnlyNode = defaultRoutingNodes.node(localNodeId);
+ RoutingNode remoteCapableNode = defaultRoutingNodes.node(remoteNodeId);
+
+ TargetPoolAllocationDecider targetPoolAllocationDecider = new TargetPoolAllocationDecider();
+ Decision decision = targetPoolAllocationDecider.canAllocate(localShard, remoteCapableNode, globalAllocation);
+ assertEquals(
+ "Routing pools are incompatible. Shard pool: [LOCAL_ONLY], node pool: [REMOTE_CAPABLE] without [data] role",
+ decision.getExplanation()
+ );
+
+ decision = targetPoolAllocationDecider.canAllocate(remoteShard, localOnlyNode, globalAllocation);
+ assertEquals("Routing pools are incompatible. Shard pool: [REMOTE_CAPABLE], node pool: [LOCAL_ONLY]", decision.getExplanation());
+
+ decision = targetPoolAllocationDecider.canAllocate(remoteShard, remoteCapableNode, globalAllocation);
+ assertEquals("Routing pools are compatible. Shard pool: [REMOTE_CAPABLE], node pool: [REMOTE_CAPABLE]", decision.getExplanation());
+
+ decision = targetPoolAllocationDecider.canAllocate(localIdx, remoteCapableNode, globalAllocation);
+ assertEquals(
+ "Routing pools are incompatible. Index pool: [LOCAL_ONLY], node pool: [REMOTE_CAPABLE] without [data] role",
+ decision.getExplanation()
+ );
+
+ decision = targetPoolAllocationDecider.canAllocate(remoteIdx, localOnlyNode, globalAllocation);
+ assertEquals("Routing pools are incompatible. Index pool: [REMOTE_CAPABLE], node pool: [LOCAL_ONLY]", decision.getExplanation());
+
+ decision = targetPoolAllocationDecider.canAllocate(remoteIdx, remoteCapableNode, globalAllocation);
+ assertEquals("Routing pools are compatible. Index pool: [REMOTE_CAPABLE], node pool: [REMOTE_CAPABLE]", decision.getExplanation());
+ }
}
diff --git a/server/src/test/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainerTests.java b/server/src/test/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainerTests.java
index a33e5f453d1e1..074f659850c7b 100644
--- a/server/src/test/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainerTests.java
+++ b/server/src/test/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainerTests.java
@@ -8,21 +8,36 @@
package org.opensearch.common.blobstore.transfer;
+import org.apache.lucene.store.AlreadyClosedException;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.RateLimiter;
import org.opensearch.common.StreamContext;
import org.opensearch.common.blobstore.stream.write.WriteContext;
import org.opensearch.common.blobstore.stream.write.WritePriority;
import org.opensearch.common.blobstore.transfer.stream.OffsetRangeFileInputStream;
+import org.opensearch.common.blobstore.transfer.stream.OffsetRangeIndexInputStream;
import org.opensearch.common.blobstore.transfer.stream.OffsetRangeInputStream;
+import org.opensearch.common.blobstore.transfer.stream.RateLimitingOffsetRangeInputStream;
import org.opensearch.common.blobstore.transfer.stream.ResettableCheckedInputStream;
import org.opensearch.common.io.InputStreamContainer;
import org.opensearch.test.OpenSearchTestCase;
import org.junit.Before;
import java.io.IOException;
+import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
+import java.util.Arrays;
import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
+
+import org.mockito.Mockito;
+import org.mockito.stubbing.Answer;
public class RemoteTransferContainerTests extends OpenSearchTestCase {
@@ -92,25 +107,37 @@ private void testSupplyStreamContext(
int partCount = streamContext.getNumberOfParts();
assertEquals(expectedPartCount, partCount);
Thread[] threads = new Thread[partCount];
+ InputStream[] streams = new InputStream[partCount];
long totalContentLength = remoteTransferContainer.getContentLength();
assert partSize * (partCount - 1) + lastPartSize == totalContentLength
: "part sizes and last part size don't add up to total content length";
logger.info("partSize: {}, lastPartSize: {}, partCount: {}", partSize, lastPartSize, streamContext.getNumberOfParts());
- for (int partIdx = 0; partIdx < partCount; partIdx++) {
- int finalPartIdx = partIdx;
- long expectedPartSize = (partIdx == partCount - 1) ? lastPartSize : partSize;
- threads[partIdx] = new Thread(() -> {
+ try {
+ for (int partIdx = 0; partIdx < partCount; partIdx++) {
+ int finalPartIdx = partIdx;
+ long expectedPartSize = (partIdx == partCount - 1) ? lastPartSize : partSize;
+ threads[partIdx] = new Thread(() -> {
+ try {
+ InputStreamContainer inputStreamContainer = streamContext.provideStream(finalPartIdx);
+ streams[finalPartIdx] = inputStreamContainer.getInputStream();
+ assertEquals(expectedPartSize, inputStreamContainer.getContentLength());
+ } catch (IOException e) {
+ fail("IOException during stream creation");
+ }
+ });
+ threads[partIdx].start();
+ }
+ for (int i = 0; i < partCount; i++) {
+ threads[i].join();
+ }
+ } finally {
+ Arrays.stream(streams).forEach(stream -> {
try {
- InputStreamContainer inputStreamContainer = streamContext.provideStream(finalPartIdx);
- assertEquals(expectedPartSize, inputStreamContainer.getContentLength());
+ stream.close();
} catch (IOException e) {
- fail("IOException during stream creation");
+ throw new RuntimeException(e);
}
});
- threads[partIdx].start();
- }
- for (int i = 0; i < partCount; i++) {
- threads[i].join();
}
}
@@ -182,6 +209,7 @@ public OffsetRangeInputStream get(long size, long position) throws IOException {
}
private void testTypeOfProvidedStreams(boolean isRemoteDataIntegritySupported) throws IOException {
+ InputStream inputStream = null;
try (
RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer(
testFile.getFileName().toString(),
@@ -201,12 +229,132 @@ public OffsetRangeInputStream get(long size, long position) throws IOException {
) {
StreamContext streamContext = remoteTransferContainer.supplyStreamContext(16);
InputStreamContainer inputStreamContainer = streamContext.provideStream(0);
+ inputStream = inputStreamContainer.getInputStream();
if (shouldOffsetInputStreamsBeChecked(isRemoteDataIntegritySupported)) {
assertTrue(inputStreamContainer.getInputStream() instanceof ResettableCheckedInputStream);
} else {
assertTrue(inputStreamContainer.getInputStream() instanceof OffsetRangeInputStream);
}
assertThrows(RuntimeException.class, () -> remoteTransferContainer.supplyStreamContext(16));
+ } finally {
+ if (inputStream != null) {
+ inputStream.close();
+ }
+ }
+ }
+
+ public void testCloseDuringOngoingReadOnStream() throws IOException, InterruptedException {
+ Supplier rateLimiterSupplier = Mockito.mock(Supplier.class);
+ Mockito.when(rateLimiterSupplier.get()).thenReturn(null);
+ CountDownLatch readInvokedLatch = new CountDownLatch(1);
+ AtomicBoolean readAfterClose = new AtomicBoolean();
+ CountDownLatch streamClosed = new CountDownLatch(1);
+ AtomicBoolean indexInputClosed = new AtomicBoolean();
+ AtomicInteger closedCount = new AtomicInteger();
+ try (
+ RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer(
+ testFile.getFileName().toString(),
+ testFile.getFileName().toString(),
+ TEST_FILE_SIZE_BYTES,
+ true,
+ WritePriority.NORMAL,
+ new RemoteTransferContainer.OffsetRangeInputStreamSupplier() {
+ @Override
+ public OffsetRangeInputStream get(long size, long position) throws IOException {
+ IndexInput indexInput = Mockito.mock(IndexInput.class);
+ Mockito.doAnswer(invocation -> {
+ indexInputClosed.set(true);
+ closedCount.incrementAndGet();
+ return null;
+ }).when(indexInput).close();
+ Mockito.when(indexInput.getFilePointer()).thenAnswer((Answer) invocation -> {
+ if (readAfterClose.get() == false) {
+ return 0L;
+ }
+ readInvokedLatch.countDown();
+ boolean closedSuccess = streamClosed.await(30, TimeUnit.SECONDS);
+ assertTrue(closedSuccess);
+ assertFalse(indexInputClosed.get());
+ return 0L;
+ });
+
+ OffsetRangeIndexInputStream offsetRangeIndexInputStream = new OffsetRangeIndexInputStream(
+ indexInput,
+ size,
+ position
+ );
+ return new RateLimitingOffsetRangeInputStream(offsetRangeIndexInputStream, rateLimiterSupplier, null);
+ }
+ },
+ 0,
+ true
+ )
+ ) {
+ StreamContext streamContext = remoteTransferContainer.supplyStreamContext(16);
+ InputStreamContainer inputStreamContainer = streamContext.provideStream(0);
+ assertTrue(inputStreamContainer.getInputStream() instanceof RateLimitingOffsetRangeInputStream);
+ CountDownLatch latch = new CountDownLatch(1);
+ new Thread(() -> {
+ try {
+ readAfterClose.set(true);
+ inputStreamContainer.getInputStream().readAllBytes();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } finally {
+ latch.countDown();
+ }
+ }).start();
+ boolean successReadWait = readInvokedLatch.await(30, TimeUnit.SECONDS);
+ assertTrue(successReadWait);
+ // Closing stream here. Test Multiple invocations of close. Shouldn't throw any exception
+ inputStreamContainer.getInputStream().close();
+ inputStreamContainer.getInputStream().close();
+ inputStreamContainer.getInputStream().close();
+ streamClosed.countDown();
+ boolean processed = latch.await(30, TimeUnit.SECONDS);
+ assertTrue(processed);
+ assertTrue(readAfterClose.get());
+ assertTrue(indexInputClosed.get());
+
+ // Test Multiple invocations of close. Close count should always be 1.
+ inputStreamContainer.getInputStream().close();
+ inputStreamContainer.getInputStream().close();
+ inputStreamContainer.getInputStream().close();
+ assertEquals(1, closedCount.get());
+
+ }
+ }
+
+ public void testReadAccessWhenStreamClosed() throws IOException {
+ Supplier rateLimiterSupplier = Mockito.mock(Supplier.class);
+ Mockito.when(rateLimiterSupplier.get()).thenReturn(null);
+ try (
+ RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer(
+ testFile.getFileName().toString(),
+ testFile.getFileName().toString(),
+ TEST_FILE_SIZE_BYTES,
+ true,
+ WritePriority.NORMAL,
+ new RemoteTransferContainer.OffsetRangeInputStreamSupplier() {
+ @Override
+ public OffsetRangeInputStream get(long size, long position) throws IOException {
+ IndexInput indexInput = Mockito.mock(IndexInput.class);
+ OffsetRangeIndexInputStream offsetRangeIndexInputStream = new OffsetRangeIndexInputStream(
+ indexInput,
+ size,
+ position
+ );
+ return new RateLimitingOffsetRangeInputStream(offsetRangeIndexInputStream, rateLimiterSupplier, null);
+ }
+ },
+ 0,
+ true
+ )
+ ) {
+ StreamContext streamContext = remoteTransferContainer.supplyStreamContext(16);
+ InputStreamContainer inputStreamContainer = streamContext.provideStream(0);
+ inputStreamContainer.getInputStream().close();
+ assertThrows(AlreadyClosedException.class, () -> inputStreamContainer.getInputStream().readAllBytes());
}
}
diff --git a/server/src/test/java/org/opensearch/gateway/AsyncShardFetchTests.java b/server/src/test/java/org/opensearch/gateway/AsyncShardFetchTests.java
index 31a27503069d7..4e5e9c71e1fe4 100644
--- a/server/src/test/java/org/opensearch/gateway/AsyncShardFetchTests.java
+++ b/server/src/test/java/org/opensearch/gateway/AsyncShardFetchTests.java
@@ -39,6 +39,7 @@
import org.opensearch.cluster.node.DiscoveryNodeRole;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.core.index.shard.ShardId;
+import org.opensearch.indices.store.ShardAttributes;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;
@@ -46,12 +47,13 @@
import org.junit.Before;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
-import static java.util.Collections.emptySet;
+import static java.util.Collections.emptyMap;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.sameInstance;
@@ -84,7 +86,16 @@ public class AsyncShardFetchTests extends OpenSearchTestCase {
public void setUp() throws Exception {
super.setUp();
this.threadPool = new TestThreadPool(getTestName());
- this.test = new TestFetch(threadPool);
+ if (randomBoolean()) {
+ this.test = new TestFetch(threadPool);
+ } else {
+ HashMap shardToCustomDataPath = new HashMap<>();
+ ShardId shardId0 = new ShardId("index1", "index_uuid1", 0);
+ ShardId shardId1 = new ShardId("index2", "index_uuid2", 0);
+ shardToCustomDataPath.put(shardId0, new ShardAttributes(shardId0, ""));
+ shardToCustomDataPath.put(shardId1, new ShardAttributes(shardId1, ""));
+ this.test = new TestFetch(threadPool, shardToCustomDataPath);
+ }
}
@After
@@ -97,7 +108,7 @@ public void testClose() throws Exception {
test.addSimulation(node1.getId(), response1);
// first fetch, no data, still on going
- AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, emptySet());
+ AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, emptyMap());
assertThat(fetchData.hasData(), equalTo(false));
assertThat(test.reroute.get(), equalTo(0));
@@ -107,7 +118,7 @@ public void testClose() throws Exception {
assertThat(test.reroute.get(), equalTo(1));
test.close();
try {
- test.fetchData(nodes, emptySet());
+ test.fetchData(nodes, emptyMap());
fail("fetch data should fail when closed");
} catch (IllegalStateException e) {
// all is well
@@ -119,7 +130,7 @@ public void testFullCircleSingleNodeSuccess() throws Exception {
test.addSimulation(node1.getId(), response1);
// first fetch, no data, still on going
- AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, emptySet());
+ AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, emptyMap());
assertThat(fetchData.hasData(), equalTo(false));
assertThat(test.reroute.get(), equalTo(0));
@@ -127,7 +138,7 @@ public void testFullCircleSingleNodeSuccess() throws Exception {
test.fireSimulationAndWait(node1.getId());
// verify we get back the data node
assertThat(test.reroute.get(), equalTo(1));
- fetchData = test.fetchData(nodes, emptySet());
+ fetchData = test.fetchData(nodes, emptyMap());
assertThat(fetchData.hasData(), equalTo(true));
assertThat(fetchData.getData().size(), equalTo(1));
assertThat(fetchData.getData().get(node1), sameInstance(response1));
@@ -139,7 +150,7 @@ public void testFullCircleSingleNodeFailure() throws Exception {
test.addSimulation(node1.getId(), failure1);
// first fetch, no data, still on going
- AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, emptySet());
+ AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, emptyMap());
assertThat(fetchData.hasData(), equalTo(false));
assertThat(test.reroute.get(), equalTo(0));
@@ -147,19 +158,19 @@ public void testFullCircleSingleNodeFailure() throws Exception {
test.fireSimulationAndWait(node1.getId());
// failure, fetched data exists, but has no data
assertThat(test.reroute.get(), equalTo(1));
- fetchData = test.fetchData(nodes, emptySet());
+ fetchData = test.fetchData(nodes, emptyMap());
assertThat(fetchData.hasData(), equalTo(true));
assertThat(fetchData.getData().size(), equalTo(0));
// on failure, we reset the failure on a successive call to fetchData, and try again afterwards
test.addSimulation(node1.getId(), response1);
- fetchData = test.fetchData(nodes, emptySet());
+ fetchData = test.fetchData(nodes, emptyMap());
assertThat(fetchData.hasData(), equalTo(false));
test.fireSimulationAndWait(node1.getId());
// 2 reroutes, cause we have a failure that we clear
assertThat(test.reroute.get(), equalTo(3));
- fetchData = test.fetchData(nodes, emptySet());
+ fetchData = test.fetchData(nodes, emptyMap());
assertThat(fetchData.hasData(), equalTo(true));
assertThat(fetchData.getData().size(), equalTo(1));
assertThat(fetchData.getData().get(node1), sameInstance(response1));
@@ -170,7 +181,7 @@ public void testIgnoreResponseFromDifferentRound() throws Exception {
test.addSimulation(node1.getId(), response1);
// first fetch, no data, still on going
- AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, emptySet());
+ AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, emptyMap());
assertThat(fetchData.hasData(), equalTo(false));
assertThat(test.reroute.get(), equalTo(0));
@@ -183,7 +194,7 @@ public void testIgnoreResponseFromDifferentRound() throws Exception {
test.fireSimulationAndWait(node1.getId());
// verify we get back the data node
assertThat(test.reroute.get(), equalTo(2));
- fetchData = test.fetchData(nodes, emptySet());
+ fetchData = test.fetchData(nodes, emptyMap());
assertThat(fetchData.hasData(), equalTo(true));
assertThat(fetchData.getData().size(), equalTo(1));
assertThat(fetchData.getData().get(node1), sameInstance(response1));
@@ -195,7 +206,7 @@ public void testIgnoreFailureFromDifferentRound() throws Exception {
test.addSimulation(node1.getId(), failure1);
// first fetch, no data, still on going
- AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, emptySet());
+ AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, emptyMap());
assertThat(fetchData.hasData(), equalTo(false));
assertThat(test.reroute.get(), equalTo(0));
@@ -212,7 +223,7 @@ public void testIgnoreFailureFromDifferentRound() throws Exception {
test.fireSimulationAndWait(node1.getId());
// failure, fetched data exists, but has no data
assertThat(test.reroute.get(), equalTo(2));
- fetchData = test.fetchData(nodes, emptySet());
+ fetchData = test.fetchData(nodes, emptyMap());
assertThat(fetchData.hasData(), equalTo(true));
assertThat(fetchData.getData().size(), equalTo(0));
}
@@ -223,7 +234,7 @@ public void testTwoNodesOnSetup() throws Exception {
test.addSimulation(node2.getId(), response2);
// no fetched data, 2 requests still on going
- AsyncShardFetch.FetchResult