Skip to content

Commit

Permalink
Merge branch 'main' into transforming_rank_to_retriever
Browse files Browse the repository at this point in the history
  • Loading branch information
elasticmachine authored Nov 7, 2024
2 parents e3914db + a3eba57 commit 3bfbec9
Show file tree
Hide file tree
Showing 15 changed files with 96 additions and 36 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/115655.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 115655
summary: Better sizing `BytesRef` for Strings in Queries
area: Search
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -689,7 +689,7 @@ private void raisePhaseFailure(SearchPhaseExecutionException exception) {
* @see #onShardResult(SearchPhaseResult, SearchShardIterator)
*/
final void onPhaseDone() { // as a tribute to @kimchy aka. finishHim()
executeNextPhase(this, () -> getNextPhase(results, this));
executeNextPhase(this, this::getNextPhase);
}

@Override
Expand Down Expand Up @@ -746,11 +746,8 @@ protected final ShardSearchRequest buildShardSearchRequest(SearchShardIterator s

/**
* Returns the next phase based on the results of the initial search phase
* @param results the results of the initial search phase. Each non null element in the result array represent a successfully
* executed shard request
* @param context the search context for the next phase
*/
protected abstract SearchPhase getNextPhase(SearchPhaseResults<Result> results, SearchPhaseContext context);
protected abstract SearchPhase getNextPhase();

private static final class PendingExecutions {
private final Semaphore semaphore;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,6 @@ final class DfsQueryPhase extends SearchPhase {
this.nextPhaseFactory = nextPhaseFactory;
this.context = context;
this.searchTransportService = context.getSearchTransport();

// register the release of the query consumer to free up the circuit breaker memory
// at the end of the search
context.addReleasable(queryResult);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ protected void executePhaseOnShard(
}

@Override
protected SearchPhase getNextPhase(final SearchPhaseResults<DfsSearchResult> results, SearchPhaseContext context) {
protected SearchPhase getNextPhase() {
final List<DfsSearchResult> dfsSearchResults = results.getAtomicArray().asList();
final AggregatedDfs aggregatedDfs = SearchPhaseController.aggregateDfs(dfsSearchResults);
final List<DfsKnnResults> mergedKnnResults = SearchPhaseController.mergeKnnResults(getRequest(), dfsSearchResults);
Expand All @@ -107,8 +107,8 @@ protected SearchPhase getNextPhase(final SearchPhaseResults<DfsSearchResult> res
aggregatedDfs,
mergedKnnResults,
queryPhaseResultConsumer,
(queryResults) -> SearchQueryThenFetchAsyncAction.nextPhase(client, context, queryResults, aggregatedDfs),
context
(queryResults) -> SearchQueryThenFetchAsyncAction.nextPhase(client, this, queryResults, aggregatedDfs),
this
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ static SearchPhase nextPhase(
}

@Override
protected SearchPhase getNextPhase(final SearchPhaseResults<SearchPhaseResult> results, SearchPhaseContext context) {
protected SearchPhase getNextPhase() {
return nextPhase(client, this, results, null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ protected void executePhaseOnShard(
}

@Override
protected SearchPhase getNextPhase(SearchPhaseResults<SearchPhaseResult> results, SearchPhaseContext context) {
protected SearchPhase getNextPhase() {
return new SearchPhase(getName()) {

private void onExecuteFailure(Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.UnicodeUtil;

public class BytesRefs {

Expand Down Expand Up @@ -56,6 +57,25 @@ public static BytesRef checkIndexableLength(BytesRef input) {
return input;
}

/**
* Converts a given string to a {@link BytesRef} object with an exactly sized byte array.
* <p>
* This method alternative method to the standard {@link BytesRef} constructor's allocates the
* exact byte array size needed for the string. This is done by parsing the UTF-16 string two
* times the first to estimate the array length and the second to copy the string value inside
* the array.
* </p>
*
* @param s the input string to convert
* @return a BytesRef object representing the input string
*/
public static BytesRef toExactSizedBytesRef(String s) {
int l = s.length();
byte[] b = new byte[UnicodeUtil.calcUTF16toUTF8Length(s, 0, l)];
UnicodeUtil.UTF16toUTF8(s, 0, l, b);
return new BytesRef(b, 0, b.length);
}

/**
* Produces a UTF-string prefix of the input BytesRef. If the prefix cutoff would produce
* ill-formed UTF, it falls back to the hexadecimal representation.
Expand All @@ -70,5 +90,4 @@ private static String safeStringPrefix(BytesRef input, int prefixLength) {
return prefix.toString();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -216,12 +216,12 @@ public final int hashCode() {
* @return the same input object or a {@link BytesRef} representation if input was of type string
*/
static Object maybeConvertToBytesRef(Object obj) {
if (obj instanceof String) {
return BytesRefs.checkIndexableLength(BytesRefs.toBytesRef(obj));
} else if (obj instanceof CharBuffer) {
return BytesRefs.checkIndexableLength(new BytesRef((CharBuffer) obj));
} else if (obj instanceof BigInteger) {
return BytesRefs.toBytesRef(obj);
if (obj instanceof String v) {
return BytesRefs.checkIndexableLength(BytesRefs.toExactSizedBytesRef(v));
} else if (obj instanceof CharBuffer v) {
return BytesRefs.checkIndexableLength(new BytesRef(v));
} else if (obj instanceof BigInteger v) {
return BytesRefs.toBytesRef(v);
}
return obj;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,7 @@ public final void collectBucket(LeafBucketCollector subCollector, int doc, long
grow(bucketOrd + 1);
int docCount = docCountProvider.getDocCount(doc);
if (docCounts.increment(bucketOrd, docCount) == docCount) {
// We call the circuit breaker the time to time in order to give it a chance to check available
// memory in the parent breaker and break the execution if we are running out. To achieve that we
// are passing 0 as the estimated bytes every 1024 calls
if ((++callCount & 0x3FF) == 0) {
breaker.addEstimateBytesAndMaybeBreak(0, "allocated_buckets");
}
updateCircuitBreaker("allocated_buckets");
}
subCollector.collect(doc, bucketOrd);
}
Expand Down Expand Up @@ -179,6 +174,7 @@ protected final IntFunction<InternalAggregations> buildSubAggsForBuckets(long[]
prepareSubAggs(bucketOrdsToCollect);
InternalAggregation[][] aggregations = new InternalAggregation[subAggregators.length][];
for (int i = 0; i < subAggregators.length; i++) {
updateCircuitBreaker("building_sub_aggregation");
aggregations[i] = subAggregators[i].buildAggregations(bucketOrdsToCollect);
}
return subAggsForBucketFunction(aggregations);
Expand Down Expand Up @@ -415,4 +411,15 @@ protected void preGetSubLeafCollectors(LeafReaderContext ctx) throws IOException
// Set LeafReaderContext to the doc_count provider
docCountProvider.setLeafReaderContext(ctx);
}

/**
* This method calls the circuit breaker from time to time in order to give it a chance to check available
* memory in the parent breaker (Which should be a real memory breaker) and break the execution if we are running out.
* To achieve that, we are passing 0 as the estimated bytes every 1024 calls
*/
private void updateCircuitBreaker(String label) {
if ((++callCount & 0x3FF) == 0) {
breaker.addEstimateBytesAndMaybeBreak(0, label);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ private AbstractSearchAsyncAction<SearchPhaseResult> createAction(
SearchResponse.Clusters.EMPTY
) {
@Override
protected SearchPhase getNextPhase(final SearchPhaseResults<SearchPhaseResult> results, SearchPhaseContext context) {
protected SearchPhase getNextPhase() {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ protected void executePhaseOnShard(
}

@Override
protected SearchPhase getNextPhase(SearchPhaseResults<TestSearchPhaseResult> results, SearchPhaseContext context) {
protected SearchPhase getNextPhase() {
return new SearchPhase("test") {
@Override
public void run() {
Expand Down Expand Up @@ -255,7 +255,7 @@ protected void executePhaseOnShard(
}

@Override
protected SearchPhase getNextPhase(SearchPhaseResults<TestSearchPhaseResult> results, SearchPhaseContext context) {
protected SearchPhase getNextPhase() {
return new SearchPhase("test") {
@Override
public void run() {
Expand Down Expand Up @@ -359,7 +359,7 @@ protected void executePhaseOnShard(
}

@Override
protected SearchPhase getNextPhase(SearchPhaseResults<TestSearchPhaseResult> results, SearchPhaseContext context) {
protected SearchPhase getNextPhase() {
return new SearchPhase("test") {
@Override
public void run() {
Expand Down Expand Up @@ -488,7 +488,7 @@ protected void executePhaseOnShard(
}

@Override
protected SearchPhase getNextPhase(SearchPhaseResults<TestSearchPhaseResult> results, SearchPhaseContext context) {
protected SearchPhase getNextPhase() {
return new SearchPhase("test") {
@Override
public void run() {
Expand Down Expand Up @@ -600,7 +600,7 @@ protected void executePhaseOnShard(
}

@Override
protected SearchPhase getNextPhase(SearchPhaseResults<TestSearchPhaseResult> results, SearchPhaseContext context) {
protected SearchPhase getNextPhase() {
return new SearchPhase("test") {
@Override
public void run() {
Expand Down Expand Up @@ -680,7 +680,7 @@ protected void executePhaseOnShard(
}

@Override
protected SearchPhase getNextPhase(SearchPhaseResults<TestSearchPhaseResult> results, SearchPhaseContext context) {
protected SearchPhase getNextPhase() {
return new SearchPhase("test") {
@Override
public void run() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ public void sendExecuteQuery(
null
) {
@Override
protected SearchPhase getNextPhase(SearchPhaseResults<SearchPhaseResult> results, SearchPhaseContext context) {
protected SearchPhase getNextPhase() {
return new SearchPhase("test") {
@Override
public void run() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
package org.elasticsearch.index.query;

import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.ParsingException;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.search.SearchModule;
Expand Down Expand Up @@ -93,4 +94,25 @@ public void testMaybeConvertToBytesRefLongTerm() {
assertThat(e.getMessage(), containsString("term starting with [aaaaa"));
}

public void testMaybeConvertToBytesRefStringCorrectSize() {
int capacity = randomIntBetween(20, 40);
StringBuilder termBuilder = new StringBuilder(capacity);
int correctSize = 0;
for (int i = 0; i < capacity; i++) {
if (i < capacity / 3) {
termBuilder.append((char) randomIntBetween(0, 128));
++correctSize; // use only one byte for char < 128
} else if (i < 2 * capacity / 3) {
termBuilder.append((char) randomIntBetween(128, 2048));
correctSize += 2; // use two bytes for char < 2048
} else {
termBuilder.append((char) randomIntBetween(2048, 4092));
correctSize += 3; // use three bytes for char >= 2048
}
}
BytesRef bytesRef = (BytesRef) AbstractQueryBuilder.maybeConvertToBytesRef(termBuilder.toString());
assertEquals(correctSize, bytesRef.bytes.length);
assertEquals(correctSize, bytesRef.length);
}

}
9 changes: 8 additions & 1 deletion x-pack/qa/rolling-upgrade/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,14 @@ BuildParams.bwcVersions.withWireCompatible { bwcVersion, baseName ->
keystore 'xpack.watcher.encryption_key', file("${project.projectDir}/src/test/resources/system_key")
setting 'xpack.watcher.encrypt_sensitive_data', 'true'

extraConfigFile 'operator/settings.json', file("${project.projectDir}/src/test/resources/operator_defined_role_mappings.json")
// file-based settings processing had a bug around applying role mappings on an unrecovered index
// this was fixed in 8.7.0 (https://github.com/elastic/elasticsearch/pull/92173). To avoid flakiness
// in the test, we only set a role mappings file for higher versions.
// TODO move this out into a separate test suite, since operator settings are not relevant for most BWC tests
// and have some side-effects
if (bwcVersion.onOrAfter('8.7.0')) {
extraConfigFile 'operator/settings.json', file("${project.projectDir}/src/test/resources/operator_defined_role_mappings.json")
}

// Old versions of the code contain an invalid assertion that trips
// during tests. Versions 5.6.9 and 6.2.4 have been fixed by removing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
*/
package org.elasticsearch.upgrades;

import org.elasticsearch.Version;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
Expand All @@ -24,8 +25,14 @@
import static org.hamcrest.Matchers.containsInAnyOrder;

public class SecurityIndexRoleMappingCleanupIT extends AbstractUpgradeTestCase {
private static final Version UPGRADE_FROM_VERSION = Version.fromString(System.getProperty("tests.upgrade_from_version"));

public void testCleanupDuplicateMappings() throws Exception {
// see build.gradle where we set operator/settings.json for more details on this skip
assumeTrue(
"Cluster requires version higher than since operator/settings.json is only set then: " + Version.V_8_7_0,
UPGRADE_FROM_VERSION.onOrAfter(Version.V_8_7_0)
);
if (CLUSTER_TYPE == ClusterType.OLD) {
// If we're in a state where the same operator-defined role mappings can exist both in cluster state and the native store
// (V_8_15_0 transport added to security.role_mapping_cleanup feature added), create a state
Expand Down

0 comments on commit 3bfbec9

Please sign in to comment.