Skip to content

Commit

Permalink
Fix handling of bulk requests with semantic text fields and delete ops (
Browse files Browse the repository at this point in the history
elastic#116942)

Previously, delete operations were not processed correctly when followed by operations containing semantic text fields. This issue caused the positions of subsequent operations in the items array to shift incorrectly by one.

This PR resolves the discrepancy and includes additional tests to ensure proper behavior.
  • Loading branch information
jimczi committed Nov 21, 2024
1 parent 618a3ae commit fa8905b
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 10 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/116942.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 116942
summary: Fix handling of bulk requests with semantic text fields and delete ops
area: Relevance
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequestBuilder;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
Expand All @@ -30,8 +31,10 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Locale;
import java.util.Map;
import java.util.Set;

import static org.hamcrest.Matchers.equalTo;

Expand Down Expand Up @@ -86,30 +89,38 @@ public void testBulkOperations() throws Exception {

int totalBulkReqs = randomIntBetween(2, 100);
long totalDocs = 0;
Set<String> ids = new HashSet<>();
for (int bulkReqs = 0; bulkReqs < totalBulkReqs; bulkReqs++) {
BulkRequestBuilder bulkReqBuilder = client().prepareBulk();
int totalBulkSize = randomIntBetween(1, 100);
for (int bulkSize = 0; bulkSize < totalBulkSize; bulkSize++) {
String id = Long.toString(totalDocs);
if (ids.size() > 0 && rarely(random())) {
String id = randomFrom(ids);
ids.remove(id);
DeleteRequestBuilder request = new DeleteRequestBuilder(client(), INDEX_NAME).setId(id);
bulkReqBuilder.add(request);
continue;
}
String id = Long.toString(totalDocs++);
boolean isIndexRequest = randomBoolean();
Map<String, Object> source = new HashMap<>();
source.put("sparse_field", isIndexRequest && rarely() ? null : randomAlphaOfLengthBetween(0, 1000));
source.put("dense_field", isIndexRequest && rarely() ? null : randomAlphaOfLengthBetween(0, 1000));
if (isIndexRequest) {
bulkReqBuilder.add(new IndexRequestBuilder(client()).setIndex(INDEX_NAME).setId(id).setSource(source));
totalDocs++;
ids.add(id);
} else {
boolean isUpsert = randomBoolean();
UpdateRequestBuilder request = new UpdateRequestBuilder(client()).setIndex(INDEX_NAME).setDoc(source);
if (isUpsert || totalDocs == 0) {
if (isUpsert || ids.size() == 0) {
request.setDocAsUpsert(true);
totalDocs++;
} else {
// Update already existing document
id = Long.toString(randomLongBetween(0, totalDocs - 1));
id = randomFrom(ids);
}
request.setId(id);
bulkReqBuilder.add(request);
ids.add(id);
}
}
BulkResponse bulkResponse = bulkReqBuilder.get();
Expand All @@ -134,7 +145,7 @@ public void testBulkOperations() throws Exception {
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().size(0).trackTotalHits(true);
SearchResponse searchResponse = client().search(new SearchRequest(INDEX_NAME).source(sourceBuilder)).get();
try {
assertThat(searchResponse.getHits().getTotalHits().value, equalTo(totalDocs));
assertThat(searchResponse.getHits().getTotalHits().value, equalTo((long) ids.size()));
} finally {
searchResponse.decRef();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ public Set<NodeFeature> getFeatures() {

@Override
public Set<NodeFeature> getTestFeatures() {
return Set.of(SemanticTextFieldMapper.SEMANTIC_TEXT_IN_OBJECT_FIELD_FIX, SemanticTextFieldMapper.SEMANTIC_TEXT_ZERO_SIZE_FIX);
return Set.of(
SemanticTextFieldMapper.SEMANTIC_TEXT_IN_OBJECT_FIELD_FIX,
SemanticTextFieldMapper.SEMANTIC_TEXT_ZERO_SIZE_FIX,
SemanticTextFieldMapper.SEMANTIC_TEXT_DELETE_FIX
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -412,8 +412,8 @@ private void applyInferenceResponses(BulkItemRequest item, FieldInferenceRespons
*/
private Map<String, List<FieldInferenceRequest>> createFieldInferenceRequests(BulkShardRequest bulkShardRequest) {
Map<String, List<FieldInferenceRequest>> fieldRequestsMap = new LinkedHashMap<>();
int itemIndex = 0;
for (var item : bulkShardRequest.items()) {
for (int itemIndex = 0; itemIndex < bulkShardRequest.items().length; itemIndex++) {
var item = bulkShardRequest.items()[itemIndex];
if (item.getPrimaryResponse() != null) {
// item was already aborted/processed by a filter in the chain upstream (e.g. security)
continue;
Expand All @@ -440,6 +440,7 @@ private Map<String, List<FieldInferenceRequest>> createFieldInferenceRequests(Bu
// ignore delete request
continue;
}

final Map<String, Object> docMap = indexRequest.sourceAsMap();
for (var entry : fieldInferenceMap.values()) {
String field = entry.getName();
Expand Down Expand Up @@ -481,7 +482,6 @@ private Map<String, List<FieldInferenceRequest>> createFieldInferenceRequests(Bu
}
}
}
itemIndex++;
}
return fieldRequestsMap;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
public class SemanticTextFieldMapper extends FieldMapper implements InferenceFieldMapper {
public static final NodeFeature SEMANTIC_TEXT_IN_OBJECT_FIELD_FIX = new NodeFeature("semantic_text.in_object_field_fix");
public static final NodeFeature SEMANTIC_TEXT_ZERO_SIZE_FIX = new NodeFeature("semantic_text.zero_size_fix");
public static final NodeFeature SEMANTIC_TEXT_DELETE_FIX = new NodeFeature("semantic_text.delete_fix");

public static final String CONTENT_TYPE = "semantic_text";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -502,3 +502,55 @@ setup:
- match: { _source.level_1.dense_field.text: "another inference test" }
- exists: _source.level_1.dense_field.inference.chunks.0.embeddings
- match: { _source.level_1.dense_field.inference.chunks.0.text: "another inference test" }

---
"Deletes on bulk operation":
- requires:
cluster_features: semantic_text.delete_fix
reason: Delete operations are properly applied when subsequent operations include a semantic text field.

- do:
bulk:
index: test-index
refresh: true
body: |
{"index":{"_id": "1"}}
{"dense_field": ["you know, for testing", "now with chunks"]}
{"index":{"_id": "2"}}
{"dense_field": ["some more tests", "that include chunks"]}
- do:
search:
index: test-index
body:
query:
semantic:
field: dense_field
query: "you know, for testing"

- match: { hits.total.value: 2 }
- match: { hits.total.relation: eq }
- match: { hits.hits.0._source.dense_field.text: ["you know, for testing", "now with chunks"] }
- match: { hits.hits.1._source.dense_field.text: ["some more tests", "that include chunks"] }

- do:
bulk:
index: test-index
refresh: true
body: |
{"delete":{ "_id": "2"}}
{"update":{"_id": "1"}}
{"doc":{"dense_field": "updated text"}}
- do:
search:
index: test-index
body:
query:
semantic:
field: dense_field
query: "you know, for testing"

- match: { hits.total.value: 1 }
- match: { hits.total.relation: eq }
- match: { hits.hits.0._source.dense_field.text: "updated text" }

0 comments on commit fa8905b

Please sign in to comment.