Skip to content

Commit

Permalink
ESQL: Track memory from values loaded from lucene (elastic#101383)
Browse files Browse the repository at this point in the history
This adds memory tracking for values loaded from doc values and stored
fields.
  • Loading branch information
nik9000 authored Oct 27, 2023
1 parent a706288 commit 5365daa
Show file tree
Hide file tree
Showing 40 changed files with 335 additions and 318 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.NumericUtils;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.BytesRefBlock;
import org.elasticsearch.compute.data.BytesRefVector;
import org.elasticsearch.compute.data.DocVector;
Expand Down Expand Up @@ -132,6 +133,7 @@ private static BlockLoader numericBlockLoader(String name, NumberFieldMapper.Num
@OperationsPerInvocation(INDEX_SIZE)
public void benchmark() {
ValuesSourceReaderOperator op = new ValuesSourceReaderOperator(
BlockFactory.getNonBreakingInstance(),
List.of(BlockReaderFactories.loaderToFactory(reader, blockLoader(name))),
0,
name
Expand Down
5 changes: 5 additions & 0 deletions docs/changelog/101383.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 101383
summary: "ESQL: Track memory from values loaded from lucene"
area: ES|QL
type: enhancement
issues: []

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.lucene.index.SortedDocValues;
import org.apache.lucene.index.SortedSetDocValues;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.core.Releasable;

import java.io.IOException;

Expand Down Expand Up @@ -124,13 +125,24 @@ interface BuilderFactory {
// TODO support non-singleton ords
}

/**
* Marker interface for block results. The compute engine has a fleshed
* out implementation.
*/
interface Block {}

/**
* A builder for typed values. For each document you may either call
* {@link #appendNull}, {@code append<Type>}, or
* {@link #beginPositionEntry} followed by two or more {@code append<Type>}
* calls, and then {@link #endPositionEntry}.
*/
interface Builder {
interface Builder extends Releasable {
/**
* Build the actual block.
*/
Block build();

/**
* Insert a null value.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,16 +156,17 @@ public String toString() {
}

@Override
public BlockLoader.Builder readValues(BlockLoader.BuilderFactory factory, BlockLoader.Docs docs) throws IOException {
BlockLoader.Builder blockBuilder = builder(factory, docs.count());
for (int i = 0; i < docs.count(); i++) {
int doc = docs.get(i);
if (doc < this.docID) {
throw new IllegalStateException("docs within same block must be in order");
}
readValuesFromSingleDoc(doc, blockBuilder);
public BlockLoader.Block readValues(BlockLoader.BuilderFactory factory, BlockLoader.Docs docs) throws IOException {
try (BlockLoader.Builder builder = builder(factory, docs.count())) {
for (int i = 0; i < docs.count(); i++) {
int doc = docs.get(i);
if (doc < this.docID) {
throw new IllegalStateException("docs within same block must be in order");
}
readValuesFromSingleDoc(doc, builder);
}
return builder.build();
}
return blockBuilder;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,13 @@ protected BlockStoredFieldsReader(LeafStoredFieldLoader loader) {
}

@Override
public final BlockLoader.Builder readValues(BlockLoader.BuilderFactory factory, BlockLoader.Docs docs) throws IOException {
var builder = builder(factory, docs.count());
for (int i = 0; i < docs.count(); i++) {
readValuesFromSingleDoc(docs.get(i), builder);
public final BlockLoader.Block readValues(BlockLoader.BuilderFactory factory, BlockLoader.Docs docs) throws IOException {
try (BlockLoader.Builder builder = builder(factory, docs.count())) {
for (int i = 0; i < docs.count(); i++) {
readValuesFromSingleDoc(docs.get(i), builder);
}
return builder.build();
}
return builder;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,13 @@ public BlockLoader.BooleanBuilder builder(BlockLoader.BuilderFactory factory, in
}

@Override
public BlockLoader.Builder readValues(BlockLoader.BuilderFactory factory, BlockLoader.Docs docs) {
BlockLoader.BooleanBuilder builder = builder(factory, docs.count());
for (int i = 0; i < docs.count(); i++) {
read(docs.get(i), builder);
public BlockLoader.Block readValues(BlockLoader.BuilderFactory factory, BlockLoader.Docs docs) {
try (BlockLoader.BooleanBuilder builder = builder(factory, docs.count())) {
for (int i = 0; i < docs.count(); i++) {
read(docs.get(i), builder);
}
return builder.build();
}
return builder;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,13 @@ public BlockLoader.LongBuilder builder(BlockLoader.BuilderFactory factory, int e
}

@Override
public BlockLoader.Builder readValues(BlockLoader.BuilderFactory factory, BlockLoader.Docs docs) {
BlockLoader.LongBuilder builder = builder(factory, docs.count());
for (int i = 0; i < docs.count(); i++) {
read(docs.get(i), builder);
public BlockLoader.Block readValues(BlockLoader.BuilderFactory factory, BlockLoader.Docs docs) {
try (BlockLoader.LongBuilder builder = builder(factory, docs.count())) {
for (int i = 0; i < docs.count(); i++) {
read(docs.get(i), builder);
}
return builder.build();
}
return builder;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,13 @@ public BlockLoader.DoubleBuilder builder(BlockLoader.BuilderFactory factory, int
}

@Override
public BlockLoader.Builder readValues(BlockLoader.BuilderFactory factory, BlockLoader.Docs docs) {
BlockLoader.DoubleBuilder builder = builder(factory, docs.count());
for (int i = 0; i < docs.count(); i++) {
read(docs.get(i), builder);
public BlockLoader.Block readValues(BlockLoader.BuilderFactory factory, BlockLoader.Docs docs) {
try (BlockLoader.DoubleBuilder builder = builder(factory, docs.count())) {
for (int i = 0; i < docs.count(); i++) {
read(docs.get(i), builder);
}
return builder.build();
}
return builder;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,13 @@ public BlockLoader.BytesRefBuilder builder(BlockLoader.BuilderFactory factory, i
}

@Override
public BlockLoader.Builder readValues(BlockLoader.BuilderFactory factory, BlockLoader.Docs docs) {
BlockLoader.BytesRefBuilder builder = builder(factory, docs.count());
for (int i = 0; i < docs.count(); i++) {
builder.appendBytesRef(bytes);
public BlockLoader.Block readValues(BlockLoader.BuilderFactory factory, BlockLoader.Docs docs) {
try (BlockLoader.BytesRefBuilder builder = builder(factory, docs.count())) {
for (int i = 0; i < docs.count(); i++) {
builder.appendBytesRef(bytes);
}
return builder.build();
}
return builder;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,13 @@ public BlockLoader.BytesRefBuilder builder(BlockLoader.BuilderFactory factory, i
}

@Override
public BlockLoader.Builder readValues(BlockLoader.BuilderFactory factory, BlockLoader.Docs docs) {
BlockLoader.BytesRefBuilder builder = builder(factory, docs.count());
for (int i = 0; i < docs.count(); i++) {
read(docs.get(i), builder);
public BlockLoader.Block readValues(BlockLoader.BuilderFactory factory, BlockLoader.Docs docs) {
try (BlockLoader.BytesRefBuilder builder = builder(factory, docs.count())) {
for (int i = 0; i < docs.count(); i++) {
read(docs.get(i), builder);
}
return builder.build();
}
return builder;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,13 @@ public BlockLoader.BytesRefBuilder builder(BlockLoader.BuilderFactory factory, i
}

@Override
public BlockLoader.Builder readValues(BlockLoader.BuilderFactory factory, BlockLoader.Docs docs) {
BlockLoader.BytesRefBuilder builder = builder(factory, docs.count());
for (int i = 0; i < docs.count(); i++) {
read(docs.get(i), builder);
public BlockLoader.Block readValues(BlockLoader.BuilderFactory factory, BlockLoader.Docs docs) {
try (BlockLoader.BytesRefBuilder builder = builder(factory, docs.count())) {
for (int i = 0; i < docs.count(); i++) {
read(docs.get(i), builder);
}
return builder.build();
}
return builder;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,13 @@ public BlockLoader.LongBuilder builder(BlockLoader.BuilderFactory factory, int e
}

@Override
public BlockLoader.Builder readValues(BlockLoader.BuilderFactory factory, BlockLoader.Docs docs) {
BlockLoader.LongBuilder builder = builder(factory, docs.count());
for (int i = 0; i < docs.count(); i++) {
read(docs.get(i), builder);
public BlockLoader.Block readValues(BlockLoader.BuilderFactory factory, BlockLoader.Docs docs) {
try (BlockLoader.LongBuilder builder = builder(factory, docs.count())) {
for (int i = 0; i < docs.count(); i++) {
read(docs.get(i), builder);
}
return builder.build();
}
return builder;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ public class TestBlock
BlockLoader.DoubleBuilder,
BlockLoader.IntBuilder,
BlockLoader.LongBuilder,
BlockLoader.SingletonOrdinalsBuilder {
BlockLoader.SingletonOrdinalsBuilder,
BlockLoader.Block {
public static BlockLoader.BuilderFactory FACTORY = new BlockLoader.BuilderFactory() {
@Override
public BlockLoader.BooleanBuilder booleansFromDocValues(int expectedCount) {
Expand Down Expand Up @@ -192,8 +193,18 @@ public TestBlock appendOrd(int value) {
}
}

@Override
public TestBlock build() {
return this;
}

private TestBlock add(Object value) {
(currentPosition == null ? values : currentPosition).add(value);
return this;
}

@Override
public void close() {
// TODO assert that we close the test blocks
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,9 @@ protected final void ensureCapacity() {
return;
}
int newSize = calculateNewArraySize(valuesLength);
adjustBreaker((long) (newSize - valuesLength) * elementSize());
adjustBreaker(newSize * elementSize());
growValuesArray(newSize);
adjustBreaker(-valuesLength * elementSize());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,9 @@
* or dense data. A Block can represent either single or multi valued data. A Block that represents
* dense single-valued data can be viewed as a {@link Vector}.
*
* TODO: update comment
* <p> All Blocks share the same set of data retrieval methods, but actual concrete implementations
* effectively support a subset of these, throwing {@code UnsupportedOperationException} where a
* particular data retrieval method is not supported. For example, a Block of primitive longs may
* not support retrieval as an integer, {code getInt}. This greatly simplifies Block usage and
* avoids cumbersome use-site casting.
*
* <p> Block are immutable and can be passed between threads.
*/
public interface Block extends Accountable, NamedWriteable, Releasable {
public interface Block extends Accountable, BlockLoader.Block, NamedWriteable, Releasable {

/**
* {@return an efficient dense single-value view of this block}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ public static Block[] fromList(BlockFactory blockFactory, List<List<Object>> lis
public static Block deepCopyOf(Block block, BlockFactory blockFactory) {
try (Block.Builder builder = block.elementType().newBlockBuilder(block.getPositionCount(), blockFactory)) {
builder.copyFrom(block, 0, block.getPositionCount());
builder.mvOrdering(block.mvOrdering());
return builder.build();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,12 @@ public Block.Builder appendAllValuesToCurrentPosition(Block block) {

@Override
public Block.Builder mvOrdering(MvOrdering mvOrdering) {
throw new UnsupportedOperationException();
/*
* This is called when copying but otherwise doesn't do
* anything because there aren't multivalue fields in a
* block containing only nulls.
*/
return this;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,13 @@ public Block.Builder appendAllValuesToCurrentPosition(Block block) {

@Override
public Block.Builder mvOrdering(MvOrdering mvOrdering) {
throw new UnsupportedOperationException("doc blocks only contain one value per position");
/*
* This is called when copying but otherwise doesn't do
* anything because there aren't multivalue fields in a
* block containing doc references. Every position can
* only reference one doc.
*/
return this;
}

@Override
Expand Down
Loading

0 comments on commit 5365daa

Please sign in to comment.