Skip to content

Commit

Permalink
Revert "Merge remote-tracking branch 'upstream/inference_metadata_fie…
Browse files Browse the repository at this point in the history
…lds' into rescorer_retriever"

This reverts commit 3f421ef, reversing
changes made to b1ddf79.
  • Loading branch information
jimczi committed Dec 18, 2024
1 parent 582d88a commit 8d21423
Show file tree
Hide file tree
Showing 55 changed files with 922 additions and 6,758 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,6 @@
"compatibilityChangeArea": {
"type": "string",
"enum": [
"Aggregations",
"Analysis",
"Authorization",
"Cluster and node setting",
Expand Down
14 changes: 0 additions & 14 deletions docs/changelog/118484.yaml

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,8 @@ normal priority deployments.
Controls how many inference requests are allowed in the queue at a time.
Every machine learning node in the cluster where the model can be allocated
has a queue of this size; when the number of requests exceeds the total value,
new requests are rejected with a 429 error. Defaults to 10000. Max allowed value
is 100000.
new requests are rejected with a 429 error. Defaults to 1024. Max allowed value
is 1000000.

`threads_per_allocation`::
(Optional, integer)
Expand Down Expand Up @@ -173,7 +173,7 @@ The API returns the following results:
"model_bytes": 265632637,
"threads_per_allocation" : 1,
"number_of_allocations" : 1,
"queue_capacity" : 10000,
"queue_capacity" : 1024,
"priority": "normal"
},
"routing_table": {
Expand Down Expand Up @@ -229,4 +229,4 @@ POST _ml/trained_models/my_model/deployment/_start?deployment_id=my_model_for_se
}
}
--------------------------------------------------
// TEST[skip:TBD]
// TEST[skip:TBD]
3 changes: 0 additions & 3 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -302,9 +302,6 @@ tests:
- class: org.elasticsearch.index.engine.RecoverySourcePruneMergePolicyTests
method: testPruneSome
issue: https://github.com/elastic/elasticsearch/issues/118728
- class: org.elasticsearch.smoketest.DocsClientYamlTestSuiteIT
method: test {yaml=reference/indices/shard-stores/line_150}
issue: https://github.com/elastic/elasticsearch/issues/118896

# Examples:
#
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,9 @@
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.InferenceMetadataFieldsMapper;
import org.elasticsearch.index.mapper.MapperException;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.MappingLookup;
import org.elasticsearch.index.mapper.RoutingFieldMapper;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IndexShard;
Expand Down Expand Up @@ -328,8 +326,7 @@ static boolean executeBulkItemRequest(
if (opType == DocWriteRequest.OpType.UPDATE) {
final UpdateRequest updateRequest = (UpdateRequest) context.getCurrent();
try {
var gFields = getStoredFieldsSpec(context.getPrimary());
updateResult = updateHelper.prepare(updateRequest, context.getPrimary(), nowInMillisSupplier, gFields);
updateResult = updateHelper.prepare(updateRequest, context.getPrimary(), nowInMillisSupplier);
} catch (Exception failure) {
// we may fail translating a update to index or delete operation
// we use index result to communicate failure while translating update request
Expand Down Expand Up @@ -404,16 +401,6 @@ static boolean executeBulkItemRequest(
return true;
}

private static String[] getStoredFieldsSpec(IndexShard indexShard) {
if (InferenceMetadataFieldsMapper.isEnabled(indexShard.indexSettings().getIndexVersionCreated())) {
if (indexShard.mapperService().mappingLookup().inferenceFields().size() > 0) {
// Retrieves the inference metadata field containing the inference results for all semantic fields defined in the mapping.
return new String[] { RoutingFieldMapper.NAME, InferenceMetadataFieldsMapper.NAME };
}
}
return new String[] { RoutingFieldMapper.NAME };
}

private static boolean handleMappingUpdateRequired(
BulkPrimaryExecutionContext context,
MappingUpdatePerformer mappingUpdater,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.mapper.InferenceFieldMapper;
import org.elasticsearch.index.mapper.InferenceMetadataFieldsMapper;
import org.elasticsearch.index.mapper.Mapper;
import org.elasticsearch.index.mapper.MappingLookup;
import org.elasticsearch.index.shard.IndexShard;
Expand Down Expand Up @@ -375,8 +374,7 @@ private static UpdateHelper.Result deleteInferenceResults(
IndexMetadata indexMetadata,
MappingLookup mappingLookup
) {
if (result.getResponseResult() != DocWriteResponse.Result.UPDATED
|| InferenceMetadataFieldsMapper.isEnabled(indexMetadata.getCreationVersion())) {
if (result.getResponseResult() != DocWriteResponse.Result.UPDATED) {
return result;
}

Expand Down Expand Up @@ -405,7 +403,7 @@ private static UpdateHelper.Result deleteInferenceResults(
String inferenceFieldName = entry.getKey();
Mapper mapper = mappingLookup.getMapper(inferenceFieldName);

if (mapper instanceof InferenceFieldMapper) {
if (mapper instanceof InferenceFieldMapper inferenceFieldMapper) {
String[] sourceFields = entry.getValue().getSourceFields();
for (String sourceField : sourceFields) {
if (sourceField.equals(inferenceFieldName) == false
Expand All @@ -414,7 +412,7 @@ private static UpdateHelper.Result deleteInferenceResults(
// This has two important side effects:
// - The inference field value will remain parsable by its mapper
// - The inference results will be removed, forcing them to be re-generated downstream
updatedSource.put(inferenceFieldName, getOriginalValueLegacy(inferenceFieldName, updatedSource));
updatedSource.put(inferenceFieldName, inferenceFieldMapper.getOriginalValue(updatedSource));
updatedSourceModified = true;
break;
}
Expand All @@ -437,24 +435,4 @@ private static UpdateHelper.Result deleteInferenceResults(

return returnedResult;
}

/**
* Get the field's original value (i.e. the value the user specified) from the provided source.
*
* @param sourceAsMap The source as a map
* @return The field's original value, or {@code null} if none was provided
*/
private static Object getOriginalValueLegacy(String fullPath, Map<String, Object> sourceAsMap) {
// TODO: Fix bug here when semantic text field is in an object
Object fieldValue = sourceAsMap.get(fullPath);
if (fieldValue == null) {
return null;
} else if (fieldValue instanceof Map<?, ?> == false) {
// Don't try to further validate the non-map value, that will be handled when the source is fully parsed
return fieldValue;
}

Map<String, Object> fieldValueMap = XContentMapValues.nodeMapValue(fieldValue, "Field [" + fullPath + "]");
return XContentMapValues.extractValue("text", fieldValueMap);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,7 @@ public UpdateHelper(ScriptService scriptService) {
* Prepares an update request by converting it into an index or delete request or an update response (no action).
*/
public Result prepare(UpdateRequest request, IndexShard indexShard, LongSupplier nowInMillis) throws IOException {
// TODO: Don't hard-code gFields
return prepare(request, indexShard, nowInMillis, new String[] { RoutingFieldMapper.NAME });
}

/**
* Prepares an update request by converting it into an index or delete request or an update response (no action).
*/
public Result prepare(UpdateRequest request, IndexShard indexShard, LongSupplier nowInMillis, String[] gFields) throws IOException {
final GetResult getResult = indexShard.getService().getForUpdate(request.id(), request.ifSeqNo(), request.ifPrimaryTerm(), gFields);
final GetResult getResult = indexShard.getService().getForUpdate(request.id(), request.ifSeqNo(), request.ifPrimaryTerm());
return prepare(indexShard, request, getResult, nowInMillis);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,6 @@ private static Version parseUnchecked(String version) {
public static final IndexVersion TIME_BASED_K_ORDERED_DOC_ID = def(9_002_00_0, Version.LUCENE_10_0_0);
public static final IndexVersion DEPRECATE_SOURCE_MODE_MAPPER = def(9_003_00_0, Version.LUCENE_10_0_0);
public static final IndexVersion USE_SYNTHETIC_SOURCE_FOR_RECOVERY = def(9_004_00_0, Version.LUCENE_10_0_0);
public static final IndexVersion INFERENCE_METADATA_FIELDS = def(9_005_00_0, Version.LUCENE_10_0_0);

/*
* STOP! READ THIS FIRST! No, really,
* ____ _____ ___ ____ _ ____ _____ _ ____ _____ _ _ ___ ____ _____ ___ ____ ____ _____ _
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ private void readStoredFieldsDirectly(StoredFieldVisitor visitor) throws IOExcep
SourceFieldMapper mapper = mappingLookup.getMapping().getMetadataMapperByClass(SourceFieldMapper.class);
if (mapper != null) {
try {
sourceBytes = mapper.applyFilters(null, sourceBytes, null);
sourceBytes = mapper.applyFilters(sourceBytes, null);
} catch (IOException e) {
throw new IOException("Failed to reapply filters after reading from translog", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@

package org.elasticsearch.index.get;

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SortedSetDocValues;
import org.apache.lucene.search.IndexSearcher;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.document.DocumentField;
Expand All @@ -28,7 +26,6 @@
import org.elasticsearch.index.fieldvisitor.LeafStoredFieldLoader;
import org.elasticsearch.index.fieldvisitor.StoredFieldLoader;
import org.elasticsearch.index.mapper.IgnoredFieldMapper;
import org.elasticsearch.index.mapper.InferenceMetadataFieldsMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.Mapper;
import org.elasticsearch.index.mapper.MapperMetrics;
Expand All @@ -42,7 +39,6 @@
import org.elasticsearch.index.shard.MultiEngineGet;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.search.lookup.Source;
import org.elasticsearch.search.lookup.SourceFilter;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -194,13 +190,9 @@ public GetResult getFromTranslog(
}

public GetResult getForUpdate(String id, long ifSeqNo, long ifPrimaryTerm) throws IOException {
return getForUpdate(id, ifSeqNo, ifPrimaryTerm, new String[] { RoutingFieldMapper.NAME });
}

public GetResult getForUpdate(String id, long ifSeqNo, long ifPrimaryTerm, String[] gFields) throws IOException {
return get(
id,
gFields,
new String[] { RoutingFieldMapper.NAME },
true,
Versions.MATCH_ANY,
VersionType.INTERNAL,
Expand Down Expand Up @@ -296,25 +288,18 @@ private GetResult innerGetFetch(
boolean forceSyntheticSource
) throws IOException {
assert get.exists() : "method should only be called if document could be retrieved";

// check first if stored fields to be loaded don't contain an object field
MappingLookup mappingLookup = mapperService.mappingLookup();
final IndexVersion indexVersion = indexSettings.getIndexVersionCreated();
final Set<String> storedFieldSet = new HashSet<>();
boolean hasInferenceMetadataFields = false;
if (storedFields != null) {
for (String field : storedFields) {
if (field.equals(InferenceMetadataFieldsMapper.NAME) && InferenceMetadataFieldsMapper.isEnabled(indexVersion)) {
hasInferenceMetadataFields = true;
continue;
}
Mapper fieldMapper = mappingLookup.getMapper(field);
if (fieldMapper == null) {
if (mappingLookup.objectMappers().get(field) != null) {
// Only fail if we know it is a object field, missing paths / fields shouldn't fail.
throw new IllegalArgumentException("field [" + field + "] isn't a leaf field");
}
}
storedFieldSet.add(field);
}
}

Expand All @@ -328,8 +313,8 @@ private GetResult innerGetFetch(
() -> mappingLookup.getMapping().syntheticFieldLoader(sourceFilter),
mapperMetrics.sourceFieldMetrics()
)
: mappingLookup.newSourceLoader(sourceFilter, mapperMetrics.sourceFieldMetrics());
StoredFieldLoader storedFieldLoader = buildStoredFieldLoader(storedFieldSet, fetchSourceContext, loader);
: mappingLookup.newSourceLoader(fetchSourceContext.filter(), mapperMetrics.sourceFieldMetrics());
StoredFieldLoader storedFieldLoader = buildStoredFieldLoader(storedFields, fetchSourceContext, loader);
LeafStoredFieldLoader leafStoredFieldLoader = storedFieldLoader.getLoader(docIdAndVersion.reader.getContext(), null);
try {
leafStoredFieldLoader.advanceTo(docIdAndVersion.docId);
Expand All @@ -338,6 +323,7 @@ private GetResult innerGetFetch(
}

// put stored fields into result objects
final IndexVersion indexVersion = indexSettings.getIndexVersionCreated();
if (leafStoredFieldLoader.storedFields().isEmpty() == false) {
Set<String> needed = new HashSet<>();
if (storedFields != null) {
Expand Down Expand Up @@ -386,19 +372,6 @@ private GetResult innerGetFetch(
if (mapperService.mappingLookup().isSourceEnabled() && fetchSourceContext.fetchSource()) {
Source source = loader.leaf(docIdAndVersion.reader, new int[] { docIdAndVersion.docId })
.source(leafStoredFieldLoader, docIdAndVersion.docId);

SourceFilter filter = fetchSourceContext.filter();
if (filter != null) {
source = source.filter(filter);
}

if (hasInferenceMetadataFields) {
/**
* Adds the {@link InferenceMetadataFieldsMapper#NAME} field from the document fields
* to the original _source if it has been requested.
*/
source = addInferenceMetadataFields(mapperService, docIdAndVersion.reader.getContext(), docIdAndVersion.docId, source);
}
sourceBytes = source.internalSourceRef();
}

Expand Down Expand Up @@ -431,38 +404,18 @@ private static DocumentField loadIgnoredMetadataField(final DocIdAndVersion docI
return new DocumentField(IgnoredFieldMapper.NAME, ignoredValues);
}

private static Source addInferenceMetadataFields(MapperService mapperService, LeafReaderContext readerContext, int docId, Source source)
throws IOException {
var mappingLookup = mapperService.mappingLookup();
var inferenceMetadata = (InferenceMetadataFieldsMapper) mappingLookup.getMapping()
.getMetadataMapperByName(InferenceMetadataFieldsMapper.NAME);
if (inferenceMetadata == null || mapperService.mappingLookup().inferenceFields().isEmpty()) {
return source;
private static StoredFieldLoader buildStoredFieldLoader(String[] fields, FetchSourceContext fetchSourceContext, SourceLoader loader) {
Set<String> fieldsToLoad = new HashSet<>();
if (fields != null && fields.length > 0) {
Collections.addAll(fieldsToLoad, fields);
}
var inferenceLoader = inferenceMetadata.fieldType()
.valueFetcher(mappingLookup, mapperService.getBitSetProducer(), new IndexSearcher(readerContext.reader()));
inferenceLoader.setNextReader(readerContext);
List<Object> values = inferenceLoader.fetchValues(source, docId, List.of());
if (values.size() == 1) {
var newSource = source.source();
newSource.put(InferenceMetadataFieldsMapper.NAME, values.get(0));
return Source.fromMap(newSource, source.sourceContentType());
}
return source;
}

private static StoredFieldLoader buildStoredFieldLoader(
Set<String> fields,
FetchSourceContext fetchSourceContext,
SourceLoader loader
) {
if (fetchSourceContext.fetchSource()) {
fields.addAll(loader.requiredStoredFields());
fieldsToLoad.addAll(loader.requiredStoredFields());
} else {
if (fields.isEmpty()) {
if (fieldsToLoad.isEmpty()) {
return StoredFieldLoader.empty();
}
}
return StoredFieldLoader.create(fetchSourceContext.fetchSource(), fields);
return StoredFieldLoader.create(fetchSourceContext.fetchSource(), fieldsToLoad);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.cluster.metadata.InferenceFieldMetadata;
import org.elasticsearch.inference.InferenceService;

import java.util.Map;
import java.util.Set;

/**
Expand All @@ -25,4 +26,12 @@ public interface InferenceFieldMapper {
* @param sourcePaths The source path that populates the input for the field (before inference)
*/
InferenceFieldMetadata getMetadata(Set<String> sourcePaths);

/**
* Get the field's original value (i.e. the value the user specified) from the provided source.
*
* @param sourceAsMap The source as a map
* @return The field's original value, or {@code null} if none was provided
*/
Object getOriginalValue(Map<String, Object> sourceAsMap);
}
Loading

0 comments on commit 8d21423

Please sign in to comment.