Skip to content

Commit

Permalink
Fix potential huge allocations when reading TermsQueryBuilder.BinaryV…
Browse files Browse the repository at this point in the history
…alues from the network (#105235)

We should be reading a single `BytesReference` (that would be backed by a single large `byte[]`)
here when we care about the individual values in the list only.
Without breaking the behavior of only serializing once when sending to multiple targets this change:
* lazy serializes as needed and keeps the original terms, so we don't needlessly go through serialization in e.g. a single node situation
or or requests that are handled on the coordinator directly (concurrency should be fine here, we serialize on the same thread in practice and
should we ever not be on the same thread at all times this will worst case lead to serializing multiple times).
* stops allocating a potentially huge byte[] when receiving these things over the wire
  • Loading branch information
original-brownbear authored Feb 8, 2024
1 parent ffc711b commit 8e42535
Showing 1 changed file with 36 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@
import org.elasticsearch.common.ParsingException;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.mapper.ConstantFieldType;
import org.elasticsearch.index.mapper.MappedFieldType;
Expand Down Expand Up @@ -419,15 +421,25 @@ protected QueryBuilder doIndexMetadataRewrite(QueryRewriteContext context) throw
@SuppressWarnings("rawtypes")
public static final class BinaryValues extends AbstractCollection implements Writeable {

private final BytesReference valueRef;
private final int size;
@Nullable
private BytesReference valueRef;

private final boolean convert;

private final Collection<?> values;

private BinaryValues(StreamInput in) throws IOException {
this(in.readBytesReference());
this.valueRef = null;
this.convert = false;
int ignored = in.readVInt();
int ignored2 = in.readByte();
assert ignored2 == StreamOutput.GENERIC_LIST_HEADER;
values = in.readCollectionAsImmutableList(StreamInput::readGenericValue);
}

private BinaryValues(Collection<?> values, boolean convert) {
this(serialize(values, convert));
this.convert = convert;
this.values = values;
}

private static BytesReference serialize(Collection<?> values, boolean convert) {
Expand All @@ -449,15 +461,6 @@ private static BytesReference serialize(Collection<?> values, boolean convert) {
}
}

private BinaryValues(BytesReference bytesRef) {
this.valueRef = bytesRef;
try (StreamInput in = valueRef.streamInput()) {
size = consumerHeadersAndGetListSize(in);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

@Override
public boolean remove(Object o) {
throw new UnsupportedOperationException();
Expand Down Expand Up @@ -490,64 +493,45 @@ public void clear() {

@Override
public int size() {
return size;
return values.size();
}

@Override
public Iterator iterator() {
return new Iterator<>() {
private final StreamInput in;
private int pos = 0;

{
try {
in = valueRef.streamInput();
consumerHeadersAndGetListSize(in);
} catch (IOException e) {
throw new UncheckedIOException("failed to deserialize TermsQueryBuilder", e);
}
}

@Override
public boolean hasNext() {
return pos < size;
}

@Override
public Object next() {
try {
pos++;
return in.readGenericValue();
} catch (IOException e) {
throw new UncheckedIOException("failed to deserialize TermsQueryBuilder", e);
}
}
};
public Iterator<?> iterator() {
var iter = values.iterator();
return convert ? Iterators.map(iter, AbstractQueryBuilder::maybeConvertToBytesRef) : iter;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeBytesReference(valueRef);
out.writeBytesReference(asBytes());
}

private BytesReference asBytes() {
var ref = valueRef;
if (ref == null) {
ref = serialize(values, convert);
valueRef = ref;
}
return ref;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
BinaryValues that = (BinaryValues) o;
return Objects.equals(valueRef, that.valueRef);
return Iterators.equals(iterator(), ((BinaryValues) o).iterator(), Objects::equals);
}

@Override
public int hashCode() {
return Objects.hash(valueRef);
int hash = 1;
for (Object o : this) {
hash = 31 * hash + o.hashCode();
}
return hash;
}

private static int consumerHeadersAndGetListSize(StreamInput in) throws IOException {
byte genericSign = in.readByte();
assert genericSign == StreamOutput.GENERIC_LIST_HEADER;
return in.readVInt();
}
}

@Override
Expand Down

0 comments on commit 8e42535

Please sign in to comment.