Skip to content

Commit

Permalink
Avoid humongous blocks (elastic#103340)
Browse files Browse the repository at this point in the history
The HeapAttackIT#testFetchTooManyMvLongs was OOM, even though plenty of 
memory was available and all memory usage was accurately tracked. The
issue lies in the wasteful space occupied by humongous objects,
specifically blocks with large backing arrays. The space left in regions
by these humongous objects remains unused. This PR addresses the problem
by forcing blocks to use BigArrays once the memory used by their
primitive backing array exceeds a specified threshold - 512KB in this
PR.
  • Loading branch information
dnhatn authored Dec 18, 2023
1 parent 642ddea commit 4a583d9
Show file tree
Hide file tree
Showing 33 changed files with 1,359 additions and 166 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/103340.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 103340
summary: Avoid humongous blocks
area: ES|QL
type: enhancement
issues: []
22 changes: 22 additions & 0 deletions x-pack/plugin/esql/compute/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,28 @@ tasks.named('stringTemplates').configure {
it.inputFile = arrayBlockInputFile
it.outputFile = "org/elasticsearch/compute/data/BooleanArrayBlock.java"
}
// BigArray block implementations
File bigArrayBlockInputFile = new File("${projectDir}/src/main/java/org/elasticsearch/compute/data/X-BigArrayBlock.java.st")
template {
it.properties = intProperties
it.inputFile = bigArrayBlockInputFile
it.outputFile = "org/elasticsearch/compute/data/IntBigArrayBlock.java"
}
template {
it.properties = longProperties
it.inputFile = bigArrayBlockInputFile
it.outputFile = "org/elasticsearch/compute/data/LongBigArrayBlock.java"
}
template {
it.properties = doubleProperties
it.inputFile = bigArrayBlockInputFile
it.outputFile = "org/elasticsearch/compute/data/DoubleBigArrayBlock.java"
}
template {
it.properties = booleanProperties
it.inputFile = bigArrayBlockInputFile
it.outputFile = "org/elasticsearch/compute/data/BooleanBigArrayBlock.java"
}
// vector blocks
File vectorBlockInputFile = new File("${projectDir}/src/main/java/org/elasticsearch/compute/data/X-VectorBlock.java.st")
template {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.compute.data;

import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.util.BitArray;
import org.elasticsearch.core.Releasables;

import java.util.BitSet;

/**
* Block implementation that stores values in a BooleanArray.
* This class is generated. Do not edit it.
*/
public final class BooleanBigArrayBlock extends AbstractArrayBlock implements BooleanBlock {

private static final long BASE_RAM_BYTES_USED = 0; // TODO: fix this
private final BitArray values;

public BooleanBigArrayBlock(
BitArray values,
int positionCount,
int[] firstValueIndexes,
BitSet nulls,
MvOrdering mvOrdering,
BlockFactory blockFactory
) {
super(positionCount, firstValueIndexes, nulls, mvOrdering, blockFactory);
this.values = values;
}

@Override
public BooleanVector asVector() {
return null;
}

@Override
public boolean getBoolean(int valueIndex) {
return values.get(valueIndex);
}

@Override
public BooleanBlock filter(int... positions) {
try (var builder = blockFactory().newBooleanBlockBuilder(positions.length)) {
for (int pos : positions) {
if (isNull(pos)) {
builder.appendNull();
continue;
}
int valueCount = getValueCount(pos);
int first = getFirstValueIndex(pos);
if (valueCount == 1) {
builder.appendBoolean(getBoolean(getFirstValueIndex(pos)));
} else {
builder.beginPositionEntry();
for (int c = 0; c < valueCount; c++) {
builder.appendBoolean(getBoolean(first + c));
}
builder.endPositionEntry();
}
}
return builder.mvOrdering(mvOrdering()).build();
}
}

@Override
public ElementType elementType() {
return ElementType.BOOLEAN;
}

@Override
public BooleanBlock expand() {
if (firstValueIndexes == null) {
incRef();
return this;
}
// TODO use reference counting to share the values
try (var builder = blockFactory().newBooleanBlockBuilder(firstValueIndexes[getPositionCount()])) {
for (int pos = 0; pos < getPositionCount(); pos++) {
if (isNull(pos)) {
builder.appendNull();
continue;
}
int first = getFirstValueIndex(pos);
int end = first + getValueCount(pos);
for (int i = first; i < end; i++) {
builder.appendBoolean(getBoolean(i));
}
}
return builder.mvOrdering(MvOrdering.DEDUPLICATED_AND_SORTED_ASCENDING).build();
}
}

@Override
public long ramBytesUsed() {
return BASE_RAM_BYTES_USED + RamUsageEstimator.sizeOf(values) + BlockRamUsageEstimator.sizeOf(firstValueIndexes)
+ BlockRamUsageEstimator.sizeOfBitSet(nullsMask);
}

@Override
public boolean equals(Object obj) {
if (obj instanceof BooleanBlock that) {
return BooleanBlock.equals(this, that);
}
return false;
}

@Override
public int hashCode() {
return BooleanBlock.hash(this);
}

@Override
public String toString() {
return getClass().getSimpleName()
+ "[positions="
+ getPositionCount()
+ ", mvOrdering="
+ mvOrdering()
+ ", ramBytesUsed="
+ values.ramBytesUsed()
+ ']';
}

@Override
public void closeInternal() {
blockFactory().adjustBreaker(-ramBytesUsed() + RamUsageEstimator.sizeOf(values), true);
Releasables.closeExpectNoException(values);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/
public final class BooleanBigArrayVector extends AbstractVector implements BooleanVector, Releasable {

private static final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(BooleanBigArrayVector.class);
private static final long BASE_RAM_BYTES_USED = 0; // FIXME

private final BitArray values;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
* Block that stores boolean values.
* This class is generated. Do not edit it.
*/
public sealed interface BooleanBlock extends Block permits BooleanArrayBlock, BooleanVectorBlock, ConstantNullBlock {
public sealed interface BooleanBlock extends Block permits BooleanArrayBlock, BooleanVectorBlock, ConstantNullBlock, BooleanBigArrayBlock {

/**
* Retrieves the boolean value stored at the given value index.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.util.BitArray;

import java.util.Arrays;

Expand Down Expand Up @@ -179,6 +180,31 @@ public BooleanBlockBuilder mvOrdering(Block.MvOrdering mvOrdering) {
return this;
}

private BooleanBlock buildBigArraysBlock() {
final BooleanBlock theBlock;
final BitArray array = new BitArray(valueCount, blockFactory.bigArrays());
for (int i = 0; i < valueCount; i++) {
if (values[i]) {
array.set(i);
}
}
if (isDense() && singleValued()) {
theBlock = new BooleanBigArrayVector(array, positionCount, blockFactory).asBlock();
} else {
theBlock = new BooleanBigArrayBlock(array, positionCount, firstValueIndexes, nullsMask, mvOrdering, blockFactory);
}
/*
* Update the breaker with the actual bytes used.
* We pass false below even though we've used the bytes. That's weird,
* but if we break here we will throw away the used memory, letting
* it be deallocated. The exception will bubble up and the builder will
* still technically be open, meaning the calling code should close it
* which will return all used memory to the breaker.
*/
blockFactory.adjustBreaker(theBlock.ramBytesUsed() - estimatedBytes - array.ramBytesUsed(), false);
return theBlock;
}

@Override
public BooleanBlock build() {
try {
Expand All @@ -187,20 +213,26 @@ public BooleanBlock build() {
if (hasNonNullValue && positionCount == 1 && valueCount == 1) {
theBlock = blockFactory.newConstantBooleanBlockWith(values[0], 1, estimatedBytes);
} else {
if (values.length - valueCount > 1024 || valueCount < (values.length / 2)) {
values = Arrays.copyOf(values, valueCount);
}
if (isDense() && singleValued()) {
theBlock = blockFactory.newBooleanArrayVector(values, positionCount, estimatedBytes).asBlock();
if (estimatedBytes > blockFactory.maxPrimitiveArrayBytes()) {
theBlock = buildBigArraysBlock();
} else {
theBlock = blockFactory.newBooleanArrayBlock(
values,
positionCount,
firstValueIndexes,
nullsMask,
mvOrdering,
estimatedBytes
);
if (values.length - valueCount > 1024 || valueCount < (values.length / 2)) {
adjustBreaker(valueCount * elementSize());
values = Arrays.copyOf(values, valueCount);
adjustBreaker(-values.length * elementSize());
}
if (isDense() && singleValued()) {
theBlock = blockFactory.newBooleanArrayVector(values, positionCount, estimatedBytes).asBlock();
} else {
theBlock = blockFactory.newBooleanArrayBlock(
values,
positionCount,
firstValueIndexes,
nullsMask,
mvOrdering,
estimatedBytes
);
}
}
}
built();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,40 +190,46 @@ public BytesRefBlockBuilder mvOrdering(Block.MvOrdering mvOrdering) {
return this;
}

private BytesRefBlock buildFromBytesArray() {
assert estimatedBytes == 0 || firstValueIndexes != null;
final BytesRefBlock theBlock;
if (hasNonNullValue && positionCount == 1 && valueCount == 1) {
theBlock = new ConstantBytesRefVector(BytesRef.deepCopyOf(values.get(0, new BytesRef())), 1, blockFactory).asBlock();
/*
* Update the breaker with the actual bytes used.
* We pass false below even though we've used the bytes. That's weird,
* but if we break here we will throw away the used memory, letting
* it be deallocated. The exception will bubble up and the builder will
* still technically be open, meaning the calling code should close it
* which will return all used memory to the breaker.
*/
blockFactory.adjustBreaker(theBlock.ramBytesUsed() - estimatedBytes, false);
Releasables.closeExpectNoException(values);
} else {
if (isDense() && singleValued()) {
theBlock = new BytesRefArrayVector(values, positionCount, blockFactory).asBlock();
} else {
theBlock = new BytesRefArrayBlock(values, positionCount, firstValueIndexes, nullsMask, mvOrdering, blockFactory);
}
/*
* Update the breaker with the actual bytes used.
* We pass false below even though we've used the bytes. That's weird,
* but if we break here we will throw away the used memory, letting
* it be deallocated. The exception will bubble up and the builder will
* still technically be open, meaning the calling code should close it
* which will return all used memory to the breaker.
*/
blockFactory.adjustBreaker(theBlock.ramBytesUsed() - estimatedBytes - values.bigArraysRamBytesUsed(), false);
}
return theBlock;
}

@Override
public BytesRefBlock build() {
try {
finish();
BytesRefBlock theBlock;
assert estimatedBytes == 0 || firstValueIndexes != null;
if (hasNonNullValue && positionCount == 1 && valueCount == 1) {
theBlock = new ConstantBytesRefVector(BytesRef.deepCopyOf(values.get(0, new BytesRef())), 1, blockFactory).asBlock();
/*
* Update the breaker with the actual bytes used.
* We pass false below even though we've used the bytes. That's weird,
* but if we break here we will throw away the used memory, letting
* it be deallocated. The exception will bubble up and the builder will
* still technically be open, meaning the calling code should close it
* which will return all used memory to the breaker.
*/
blockFactory.adjustBreaker(theBlock.ramBytesUsed() - estimatedBytes, false);
Releasables.closeExpectNoException(values);
} else {
if (isDense() && singleValued()) {
theBlock = new BytesRefArrayVector(values, positionCount, blockFactory).asBlock();
} else {
theBlock = new BytesRefArrayBlock(values, positionCount, firstValueIndexes, nullsMask, mvOrdering, blockFactory);
}
/*
* Update the breaker with the actual bytes used.
* We pass false below even though we've used the bytes. That's weird,
* but if we break here we will throw away the used memory, letting
* it be deallocated. The exception will bubble up and the builder will
* still technically be open, meaning the calling code should close it
* which will return all used memory to the breaker.
*/
blockFactory.adjustBreaker(theBlock.ramBytesUsed() - estimatedBytes - values.bigArraysRamBytesUsed(), false);
}
theBlock = buildFromBytesArray();
values = null;
built();
return theBlock;
Expand Down
Loading

0 comments on commit 4a583d9

Please sign in to comment.