Skip to content

Commit

Permalink
Default incremental bulk functionality to false (elastic#113416)
Browse files Browse the repository at this point in the history
This commit flips the incremental bulk setting to false. Additionally,
it removes some test code which intermittently causes issues with
security test cases.
  • Loading branch information
Tim-Brooks authored Sep 23, 2024
1 parent f7ba89b commit d146b27
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, supportsDedicatedMasters = false, numDataNodes = 2, numClientNodes = 0)
public class IncrementalBulkRestIT extends HttpSmokeTestCase {

@Override
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal, otherSettings))
.put(IncrementalBulkService.INCREMENTAL_BULK.getKey(), true)
.build();
}

public void testBulkUriMatchingDoesNotMatchBulkCapabilitiesApi() throws IOException {
Request request = new Request("GET", "/_capabilities?method=GET&path=%2F_bulk&capabilities=failure_store_status&pretty");
Response response = getRestClient().performRequest(request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class IncrementalBulkService {

public static final Setting<Boolean> INCREMENTAL_BULK = boolSetting(
"rest.incremental_bulk",
true,
false,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainRequest;
import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainResponse;
Expand All @@ -49,8 +48,6 @@
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequestBuilder;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.bulk.IncrementalBulkService;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.ingest.DeletePipelineRequest;
import org.elasticsearch.action.ingest.DeletePipelineTransportAction;
Expand Down Expand Up @@ -196,7 +193,6 @@
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
Expand Down Expand Up @@ -1782,48 +1778,11 @@ public void indexRandom(boolean forceRefresh, boolean dummyDocuments, boolean ma
logger.info("Index [{}] docs async: [{}] bulk: [{}] partitions [{}]", builders.size(), false, true, partition.size());
for (List<IndexRequestBuilder> segmented : partition) {
BulkResponse actionGet;
if (randomBoolean()) {
BulkRequestBuilder bulkBuilder = client().prepareBulk();
for (IndexRequestBuilder indexRequestBuilder : segmented) {
bulkBuilder.add(indexRequestBuilder);
}
actionGet = bulkBuilder.get();
} else {
IncrementalBulkService bulkService = internalCluster().getInstance(IncrementalBulkService.class);
IncrementalBulkService.Handler handler = bulkService.newBulkRequest();

ConcurrentLinkedQueue<IndexRequest> queue = new ConcurrentLinkedQueue<>();
segmented.forEach(b -> queue.add(b.request()));

PlainActionFuture<BulkResponse> future = new PlainActionFuture<>();
AtomicInteger runs = new AtomicInteger(0);
Runnable r = new Runnable() {

@Override
public void run() {
int toRemove = Math.min(randomIntBetween(5, 10), queue.size());
ArrayList<DocWriteRequest<?>> docs = new ArrayList<>();
for (int i = 0; i < toRemove; i++) {
docs.add(queue.poll());
}

if (queue.isEmpty()) {
handler.lastItems(docs, () -> {}, future);
} else {
handler.addItems(docs, () -> {}, () -> {
// Every 10 runs dispatch to new thread to prevent stackoverflow
if (runs.incrementAndGet() % 10 == 0) {
new Thread(this).start();
} else {
this.run();
}
});
}
}
};
r.run();
actionGet = future.actionGet();
BulkRequestBuilder bulkBuilder = client().prepareBulk();
for (IndexRequestBuilder indexRequestBuilder : segmented) {
bulkBuilder.add(indexRequestBuilder);
}
actionGet = bulkBuilder.get();
assertThat(actionGet.hasFailures() ? actionGet.buildFailureMessage() : "", actionGet.hasFailures(), equalTo(false));
}
}
Expand Down

0 comments on commit d146b27

Please sign in to comment.