Skip to content

Commit

Permalink
Merge branch '8.x' into backport_118017
Browse files Browse the repository at this point in the history
  • Loading branch information
jimczi authored Dec 5, 2024
2 parents 196eeb4 + 90b116f commit 0230be8
Show file tree
Hide file tree
Showing 30 changed files with 480 additions and 446 deletions.
9 changes: 8 additions & 1 deletion docs/reference/inference/put-inference.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ Creates an {infer} endpoint to perform an {infer} task.
* For built-in models and models uploaded through Eland, the {infer} APIs offer an alternative way to use and manage trained models. However, if you do not plan to use the {infer} APIs to use these models or if you want to use non-NLP models, use the <<ml-df-trained-models-apis>>.
====


[discrete]
[[put-inference-api-request]]
==== {api-request-title}
Expand Down Expand Up @@ -47,6 +46,14 @@ Refer to the service list in the <<put-inference-api-desc,API description sectio

The create {infer} API enables you to create an {infer} endpoint and configure a {ml} model to perform a specific {infer} task.

[IMPORTANT]
====
* When creating an inference endpoint, the associated machine learning model is automatically deployed if it is not already running.
* After creating the endpoint, wait for the model deployment to complete before using it. You can verify the deployment status by using the <<get-trained-models-stats, Get trained model statistics>> API. In the response, look for `"state": "fully_allocated"` and ensure the `"allocation_count"` matches the `"target_allocation_count"`.
* Avoid creating multiple endpoints for the same model unless required, as each endpoint consumes significant resources.
====


The following services are available through the {infer} API.
You can find the available task types next to the service name.
Click the links to review the configuration details of the services:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket;
import org.elasticsearch.search.aggregations.bucket.terms.BucketAndOrd;
import org.elasticsearch.search.aggregations.support.AggregationPath;
import org.elasticsearch.xcontent.ToXContentObject;

Expand All @@ -20,13 +21,12 @@
import java.util.Comparator;
import java.util.List;
import java.util.function.BiFunction;
import java.util.function.ToLongFunction;

/**
* {@link Bucket} ordering strategy. Buckets can be order either as
* "complete" buckets using {@link #comparator()} or against a combination
* of the buckets internals with its ordinal with
* {@link #partiallyBuiltBucketComparator(ToLongFunction, Aggregator)}.
* {@link #partiallyBuiltBucketComparator(Aggregator)}.
*/
public abstract class BucketOrder implements ToXContentObject, Writeable {
/**
Expand Down Expand Up @@ -102,7 +102,7 @@ public final void validate(Aggregator aggregator) throws AggregationExecutionExc
* to validate this order because doing so checks all of the appropriate
* paths.
*/
partiallyBuiltBucketComparator(null, aggregator);
partiallyBuiltBucketComparator(aggregator);
}

/**
Expand All @@ -121,7 +121,7 @@ public final void validate(Aggregator aggregator) throws AggregationExecutionExc
* with it all the time.
* </p>
*/
public abstract <T extends Bucket> Comparator<T> partiallyBuiltBucketComparator(ToLongFunction<T> ordinalReader, Aggregator aggregator);
public abstract <T extends Bucket> Comparator<BucketAndOrd<T>> partiallyBuiltBucketComparator(Aggregator aggregator);

/**
* Build a comparator for fully built buckets.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.core.RestApiVersion;
import org.elasticsearch.search.aggregations.Aggregator.BucketComparator;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket;
import org.elasticsearch.search.aggregations.bucket.terms.BucketAndOrd;
import org.elasticsearch.search.aggregations.support.AggregationPath;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.search.sort.SortValue;
Expand All @@ -31,7 +32,6 @@
import java.util.List;
import java.util.Objects;
import java.util.function.BiFunction;
import java.util.function.ToLongFunction;

/**
* Implementations for {@link Bucket} ordering strategies.
Expand Down Expand Up @@ -64,10 +64,10 @@ public AggregationPath path() {
}

@Override
public <T extends Bucket> Comparator<T> partiallyBuiltBucketComparator(ToLongFunction<T> ordinalReader, Aggregator aggregator) {
public <T extends Bucket> Comparator<BucketAndOrd<T>> partiallyBuiltBucketComparator(Aggregator aggregator) {
try {
BucketComparator bucketComparator = path.bucketComparator(aggregator, order);
return (lhs, rhs) -> bucketComparator.compare(ordinalReader.applyAsLong(lhs), ordinalReader.applyAsLong(rhs));
return (lhs, rhs) -> bucketComparator.compare(lhs.ord, rhs.ord);
} catch (IllegalArgumentException e) {
throw new AggregationExecutionException.InvalidPath("Invalid aggregation order path [" + path + "]. " + e.getMessage(), e);
}
Expand Down Expand Up @@ -189,12 +189,13 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
}

@Override
public <T extends Bucket> Comparator<T> partiallyBuiltBucketComparator(ToLongFunction<T> ordinalReader, Aggregator aggregator) {
List<Comparator<T>> comparators = orderElements.stream()
.map(oe -> oe.partiallyBuiltBucketComparator(ordinalReader, aggregator))
.toList();
public <T extends Bucket> Comparator<BucketAndOrd<T>> partiallyBuiltBucketComparator(Aggregator aggregator) {
List<Comparator<BucketAndOrd<T>>> comparators = new ArrayList<>(orderElements.size());
for (BucketOrder order : orderElements) {
comparators.add(order.partiallyBuiltBucketComparator(aggregator));
}
return (lhs, rhs) -> {
for (Comparator<T> c : comparators) {
for (Comparator<BucketAndOrd<T>> c : comparators) {
int result = c.compare(lhs, rhs);
if (result != 0) {
return result;
Expand Down Expand Up @@ -300,9 +301,9 @@ byte id() {
}

@Override
public <T extends Bucket> Comparator<T> partiallyBuiltBucketComparator(ToLongFunction<T> ordinalReader, Aggregator aggregator) {
public <T extends Bucket> Comparator<BucketAndOrd<T>> partiallyBuiltBucketComparator(Aggregator aggregator) {
Comparator<Bucket> comparator = comparator();
return comparator::compare;
return (lhs, rhs) -> comparator.compare(lhs.bucket, rhs.bucket);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.apache.lucene.index.SortedDocValues;
import org.apache.lucene.index.SortedSetDocValues;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.util.IntArray;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.common.util.ObjectArray;
import org.elasticsearch.core.Releasables;
Expand All @@ -26,6 +27,7 @@
import org.elasticsearch.search.aggregations.InternalOrder;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
import org.elasticsearch.search.aggregations.bucket.terms.BucketAndOrd;
import org.elasticsearch.search.aggregations.bucket.terms.BucketPriorityQueue;
import org.elasticsearch.search.aggregations.bucket.terms.BytesKeyedBucketOrds;
import org.elasticsearch.search.aggregations.bucket.terms.InternalTerms;
Expand All @@ -38,7 +40,6 @@
import java.util.Arrays;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Supplier;

import static java.util.Collections.emptyList;
import static org.apache.lucene.index.SortedSetDocValues.NO_MORE_ORDS;
Expand Down Expand Up @@ -115,51 +116,57 @@ public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throw
LongArray otherDocCounts = bigArrays().newLongArray(owningBucketOrds.size());
ObjectArray<StringTerms.Bucket[]> topBucketsPerOrd = bigArrays().newObjectArray(owningBucketOrds.size())
) {
for (long ordIdx = 0; ordIdx < topBucketsPerOrd.size(); ordIdx++) {
int size = (int) Math.min(bucketOrds.size(), bucketCountThresholds.getShardSize());

// as users can't control sort order, in practice we'll always sort by doc count descending
try (
BucketPriorityQueue<StringTerms.Bucket> ordered = new BucketPriorityQueue<>(
size,
bigArrays(),
partiallyBuiltBucketComparator
)
) {
StringTerms.Bucket spare = null;
BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds.get(ordIdx));
Supplier<StringTerms.Bucket> emptyBucketBuilder = () -> new StringTerms.Bucket(
new BytesRef(),
0,
null,
false,
0,
format
);
while (ordsEnum.next()) {
long docCount = bucketDocCount(ordsEnum.ord());
otherDocCounts.increment(ordIdx, docCount);
if (spare == null) {
checkRealMemoryCBForInternalBucket();
spare = emptyBucketBuilder.get();
try (IntArray bucketsToCollect = bigArrays().newIntArray(owningBucketOrds.size())) {
// find how many buckets we are going to collect
long ordsToCollect = 0;
for (long ordIdx = 0; ordIdx < owningBucketOrds.size(); ordIdx++) {
int size = (int) Math.min(bucketOrds.bucketsInOrd(owningBucketOrds.get(ordIdx)), bucketCountThresholds.getShardSize());
bucketsToCollect.set(ordIdx, size);
ordsToCollect += size;
}
try (LongArray ordsArray = bigArrays().newLongArray(ordsToCollect)) {
long ordsCollected = 0;
for (long ordIdx = 0; ordIdx < owningBucketOrds.size(); ordIdx++) {
// as users can't control sort order, in practice we'll always sort by doc count descending
try (
BucketPriorityQueue<StringTerms.Bucket> ordered = new BucketPriorityQueue<>(
bucketsToCollect.get(ordIdx),
bigArrays(),
order.partiallyBuiltBucketComparator(this)
)
) {
BucketAndOrd<StringTerms.Bucket> spare = null;
BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds.get(ordIdx));
while (ordsEnum.next()) {
long docCount = bucketDocCount(ordsEnum.ord());
otherDocCounts.increment(ordIdx, docCount);
if (spare == null) {
checkRealMemoryCBForInternalBucket();
spare = new BucketAndOrd<>(new StringTerms.Bucket(new BytesRef(), 0, null, false, 0, format));
}
ordsEnum.readValue(spare.bucket.getTermBytes());
spare.bucket.setDocCount(docCount);
spare.ord = ordsEnum.ord();
spare = ordered.insertWithOverflow(spare);
}
final int orderedSize = (int) ordered.size();
final StringTerms.Bucket[] buckets = new StringTerms.Bucket[orderedSize];
for (int i = orderedSize - 1; i >= 0; --i) {
BucketAndOrd<StringTerms.Bucket> bucketAndOrd = ordered.pop();
buckets[i] = bucketAndOrd.bucket;
ordsArray.set(ordsCollected + i, bucketAndOrd.ord);
otherDocCounts.increment(ordIdx, -bucketAndOrd.bucket.getDocCount());
bucketAndOrd.bucket.setTermBytes(BytesRef.deepCopyOf(bucketAndOrd.bucket.getTermBytes()));
}
topBucketsPerOrd.set(ordIdx, buckets);
ordsCollected += orderedSize;
}
ordsEnum.readValue(spare.getTermBytes());
spare.setDocCount(docCount);
spare.setBucketOrd(ordsEnum.ord());
spare = ordered.insertWithOverflow(spare);
}

topBucketsPerOrd.set(ordIdx, new StringTerms.Bucket[(int) ordered.size()]);
for (int i = (int) ordered.size() - 1; i >= 0; --i) {
topBucketsPerOrd.get(ordIdx)[i] = ordered.pop();
otherDocCounts.increment(ordIdx, -topBucketsPerOrd.get(ordIdx)[i].getDocCount());
topBucketsPerOrd.get(ordIdx)[i].setTermBytes(BytesRef.deepCopyOf(topBucketsPerOrd.get(ordIdx)[i].getTermBytes()));
}
assert ordsCollected == ordsArray.size();
buildSubAggsForAllBuckets(topBucketsPerOrd, ordsArray, InternalTerms.Bucket::setAggregations);
}
}

buildSubAggsForAllBuckets(topBucketsPerOrd, InternalTerms.Bucket::getBucketOrd, InternalTerms.Bucket::setAggregations);

return buildAggregations(Math.toIntExact(owningBucketOrds.size()), ordIdx -> {
final BucketOrder reduceOrder;
if (isKeyOrder(order) == false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,17 @@

import java.util.Comparator;

public class BucketPriorityQueue<B> extends ObjectArrayPriorityQueue<B> {
public class BucketPriorityQueue<B> extends ObjectArrayPriorityQueue<BucketAndOrd<B>> {

private final Comparator<? super B> comparator;
private final Comparator<BucketAndOrd<B>> comparator;

public BucketPriorityQueue(int size, BigArrays bigArrays, Comparator<? super B> comparator) {
public BucketPriorityQueue(int size, BigArrays bigArrays, Comparator<BucketAndOrd<B>> comparator) {
super(size, bigArrays);
this.comparator = comparator;
}

@Override
protected boolean lessThan(B a, B b) {
protected boolean lessThan(BucketAndOrd<B> a, BucketAndOrd<B> b) {
return comparator.compare(a, b) > 0; // reverse, since we reverse again when adding to a list
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.ObjectArrayPriorityQueue;

public class BucketSignificancePriorityQueue<B extends SignificantTerms.Bucket> extends ObjectArrayPriorityQueue<B> {
public class BucketSignificancePriorityQueue<B extends SignificantTerms.Bucket> extends ObjectArrayPriorityQueue<BucketAndOrd<B>> {

public BucketSignificancePriorityQueue(int size, BigArrays bigArrays) {
super(size, bigArrays);
}

@Override
protected boolean lessThan(SignificantTerms.Bucket o1, SignificantTerms.Bucket o2) {
return o1.getSignificanceScore() < o2.getSignificanceScore();
protected boolean lessThan(BucketAndOrd<B> o1, BucketAndOrd<B> o2) {
return o1.bucket.getSignificanceScore() < o2.bucket.getSignificanceScore();
}
}
Loading

0 comments on commit 0230be8

Please sign in to comment.