Skip to content

Commit

Permalink
Remove non-breaking builder in element type (elastic#103639)
Browse files Browse the repository at this point in the history
This PR removes the usage of the non-breaking block factory 
when creating Blocks from its element type.
  • Loading branch information
dnhatn authored Dec 21, 2023
1 parent 281ec13 commit 51b5f47
Show file tree
Hide file tree
Showing 14 changed files with 100 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,19 +79,19 @@ public void closeInternal() {
/**
* A builder the for {@link DocBlock}.
*/
public static Builder newBlockBuilder(int estimatedSize, BlockFactory blockFactory) {
return new Builder(estimatedSize, blockFactory);
public static Builder newBlockBuilder(BlockFactory blockFactory, int estimatedSize) {
return new Builder(blockFactory, estimatedSize);
}

public static class Builder implements Block.Builder {
private final IntVector.Builder shards;
private final IntVector.Builder segments;
private final IntVector.Builder docs;

private Builder(int estimatedSize, BlockFactory blockFactory) {
shards = IntVector.newVectorBuilder(estimatedSize, blockFactory);
segments = IntVector.newVectorBuilder(estimatedSize, blockFactory);
docs = IntVector.newVectorBuilder(estimatedSize, blockFactory);
private Builder(BlockFactory blockFactory, int estimatedSize) {
shards = blockFactory.newIntVectorBuilder(estimatedSize);
segments = blockFactory.newIntVectorBuilder(estimatedSize);
docs = blockFactory.newIntVectorBuilder(estimatedSize);
}

public Builder appendShard(int shard) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,16 @@
* The type of elements in {@link Block} and {@link Vector}
*/
public enum ElementType {
BOOLEAN(BooleanBlock::newBlockBuilder),
INT(IntBlock::newBlockBuilder),
LONG(LongBlock::newBlockBuilder),
DOUBLE(DoubleBlock::newBlockBuilder),
BOOLEAN(BlockFactory::newBooleanBlockBuilder),
INT(BlockFactory::newIntBlockBuilder),
LONG(BlockFactory::newLongBlockBuilder),
DOUBLE(BlockFactory::newDoubleBlockBuilder),
/**
* Blocks containing only null values.
*/
NULL((estimatedSize, blockFactory) -> new ConstantNullBlock.Builder(blockFactory)),
NULL((blockFactory, estimatedSize) -> new ConstantNullBlock.Builder(blockFactory)),

BYTES_REF(BytesRefBlock::newBlockBuilder),
BYTES_REF(BlockFactory::newBytesRefBlockBuilder),

/**
* Blocks that reference individual lucene documents.
Expand All @@ -32,10 +32,10 @@ public enum ElementType {
/**
* Intermediate blocks which don't support retrieving elements.
*/
UNKNOWN((estimatedSize, blockFactory) -> { throw new UnsupportedOperationException("can't build null blocks"); });
UNKNOWN((blockFactory, estimatedSize) -> { throw new UnsupportedOperationException("can't build null blocks"); });

interface BuilderSupplier {
Block.Builder newBlockBuilder(int estimatedSize, BlockFactory blockFactory);
private interface BuilderSupplier {
Block.Builder newBlockBuilder(BlockFactory blockFactory, int estimatedSize);
}

private final BuilderSupplier builder;
Expand All @@ -44,20 +44,11 @@ interface BuilderSupplier {
this.builder = builder;
}

/**
* Create a new {@link Block.Builder} for blocks of this type.
* @deprecated use {@link #newBlockBuilder(int, BlockFactory)}
*/
@Deprecated
public Block.Builder newBlockBuilder(int estimatedSize) {
return builder.newBlockBuilder(estimatedSize, BlockFactory.getNonBreakingInstance());
}

/**
* Create a new {@link Block.Builder} for blocks of this type.
*/
public Block.Builder newBlockBuilder(int estimatedSize, BlockFactory blockFactory) {
return builder.newBlockBuilder(estimatedSize, blockFactory);
return builder.newBlockBuilder(blockFactory, estimatedSize);
}

public static ElementType fromJava(Class<?> type) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ protected Block merge(int blockIndex, Block block) {
if (blockIndex != 0) {
return super.merge(blockIndex, block);
}
Block.Builder builder = block.elementType().newBlockBuilder(block.getPositionCount() / 2);
Block.Builder builder = block.elementType().newBlockBuilder(block.getPositionCount() / 2, blockFactory);
for (int p = 0; p + 1 < block.getPositionCount(); p += 2) {
builder.copyFrom(block, p, p + 1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,10 @@ public void testEqualityAndHashCode() throws IOException {
int positions = randomInt(page.getPositionCount() - 1);
for (int blockIndex = 0; blockIndex < blocks.length; blockIndex++) {
Block block = page.getBlock(blockIndex);
blocks[blockIndex] = block.elementType().newBlockBuilder(positions).copyFrom(block, 0, page.getPositionCount() - 1).build();
blocks[blockIndex] = block.elementType()
.newBlockBuilder(positions, BlockFactory.getNonBreakingInstance())
.copyFrom(block, 0, page.getPositionCount() - 1)
.build();
}
return new Page(blocks);
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ private Block randomlyDivideAndMerge(Block block) {
while (block.getPositionCount() > 1 || randomBoolean()) {
int positionCount = block.getPositionCount();
int offset = 0;
Block.Builder builder = block.elementType().newBlockBuilder(randomIntBetween(1, 100));
Block.Builder builder = block.elementType().newBlockBuilder(randomIntBetween(1, 100), BlockFactory.getNonBreakingInstance());
List<Object> expected = new ArrayList<>();
while (offset < positionCount) {
int length = randomIntBetween(1, positionCount - offset);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,15 @@ public void testEvensAllNull() {

private void assertSmall(Block block) {
int smallSize = Math.min(block.getPositionCount(), 10);
Block.Builder builder = elementType.newBlockBuilder(smallSize);
BlockFactory blockFactory = BlockFactory.getNonBreakingInstance();
Block.Builder builder = elementType.newBlockBuilder(smallSize, blockFactory);
builder.copyFrom(block, 0, smallSize);
assertBlockValues(builder.build(), BasicBlockTests.valuesAtPositions(block, 0, smallSize));
}

private void assertEvens(Block block) {
Block.Builder builder = elementType.newBlockBuilder(block.getPositionCount() / 2);
BlockFactory blockFactory = BlockFactory.getNonBreakingInstance();
Block.Builder builder = elementType.newBlockBuilder(block.getPositionCount() / 2, blockFactory);
List<List<Object>> expected = new ArrayList<>();
for (int i = 0; i < block.getPositionCount(); i += 2) {
builder.copyFrom(block, i, i + 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public void testRandomShardSegmentDocMap() {

private void assertShardSegmentDocMap(int[][] data, int[][] expected) {
BlockFactory blockFactory = BlockFactoryTests.blockFactory(ByteSizeValue.ofGb(1));
try (DocBlock.Builder builder = DocBlock.newBlockBuilder(data.length, blockFactory)) {
try (DocBlock.Builder builder = DocBlock.newBlockBuilder(blockFactory, data.length)) {
for (int r = 0; r < data.length; r++) {
builder.appendShard(data[r][0]);
builder.appendSegment(data[r][1]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
Expand Down Expand Up @@ -47,7 +48,7 @@ public static Page mergePages(List<Page> pages) {
Block.Builder[] builders = new Block.Builder[first.getBlockCount()];
try {
for (int b = 0; b < builders.length; b++) {
builders[b] = first.getBlock(b).elementType().newBlockBuilder(totalPositions);
builders[b] = first.getBlock(b).elementType().newBlockBuilder(totalPositions, BlockFactory.getNonBreakingInstance());
}
for (Page p : pages) {
for (int b = 0; b < builders.length; b++) {
Expand Down Expand Up @@ -79,11 +80,12 @@ public static Page mergePages(List<Page> pages) {
*/
public static List<Page> deepCopyOf(List<Page> pages) {
List<Page> out = new ArrayList<>(pages.size());
BlockFactory blockFactory = BlockFactory.getNonBreakingInstance();
for (Page p : pages) {
Block[] blocks = new Block[p.getBlockCount()];
for (int b = 0; b < blocks.length; b++) {
Block orig = p.getBlock(b);
Block.Builder builder = orig.elementType().newBlockBuilder(p.getPositionCount());
Block.Builder builder = orig.elementType().newBlockBuilder(p.getPositionCount(), blockFactory);
builder.copyFrom(orig, 0, p.getPositionCount());
blocks[b] = builder.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ private int assertEncodedPosition(BasicBlockTests.RandomBlock b, BatchEncoder en
* This produces a block with a single value per position, but it's good enough
* for comparison.
*/
Block.Builder builder = elementType.newBlockBuilder(encoder.valueCount(offset));
Block.Builder builder = elementType.newBlockBuilder(encoder.valueCount(offset), BlockFactory.getNonBreakingInstance());
BytesRef[] toDecode = new BytesRef[encoder.valueCount(offset)];
for (int i = 0; i < toDecode.length; i++) {
BytesRefBuilder dest = new BytesRefBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,7 @@
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.xcontent.InstantiatingObjectParser;
import org.elasticsearch.xcontent.ObjectParser;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.ParserConstructor;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentParser;

import java.io.IOException;
import java.util.Collections;
Expand All @@ -39,35 +34,10 @@
import java.util.Objects;
import java.util.Optional;

import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstructorArg;
import static org.elasticsearch.xpack.esql.action.ResponseValueUtils.valuesToPage;

public class EsqlQueryResponse extends ActionResponse implements ChunkedToXContentObject, Releasable {

private final AbstractRefCounted counted = AbstractRefCounted.of(this::closeInternal);

private static final ParseField ID = new ParseField("id");
private static final ParseField IS_RUNNING = new ParseField("is_running");
private static final InstantiatingObjectParser<EsqlQueryResponse, Void> PARSER;
static {
InstantiatingObjectParser.Builder<EsqlQueryResponse, Void> parser = InstantiatingObjectParser.builder(
"esql/query_response",
true,
EsqlQueryResponse.class
);
parser.declareString(optionalConstructorArg(), ID);
parser.declareField(
optionalConstructorArg(),
p -> p.currentToken() == XContentParser.Token.VALUE_NULL ? false : p.booleanValue(),
IS_RUNNING,
ObjectParser.ValueType.BOOLEAN_OR_NULL
);
parser.declareObjectArray(constructorArg(), (p, c) -> ColumnInfo.fromXContent(p), new ParseField("columns"));
parser.declareField(constructorArg(), (p, c) -> p.list(), new ParseField("values"), ObjectParser.ValueType.OBJECT_ARRAY);
PARSER = parser.build();
}

private final List<ColumnInfo> columns;
private final List<Page> pages;
private final Profile profile;
Expand Down Expand Up @@ -99,27 +69,6 @@ public EsqlQueryResponse(List<ColumnInfo> columns, List<Page> pages, @Nullable P
this(columns, pages, profile, columnar, null, false, isAsync);
}

// Used for XContent reconstruction
@ParserConstructor
public EsqlQueryResponse(@Nullable String asyncExecutionId, Boolean isRunning, List<ColumnInfo> columns, List<List<Object>> values) {
this(
columns,
List.of(valuesToPage(columns, values)),
null,
false,
asyncExecutionId,
isRunning != null,
isAsync(asyncExecutionId, isRunning)
);
}

static boolean isAsync(@Nullable String asyncExecutionId, Boolean isRunning) {
if (asyncExecutionId != null || isRunning != null) {
return true;
}
return false;
}

/**
* Build a reader for the response.
*/
Expand Down Expand Up @@ -229,10 +178,6 @@ public boolean isFragment() {
return false;
}

public static EsqlQueryResponse fromXContent(XContentParser parser) {
return PARSER.apply(parser, null);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.BooleanBlock;
import org.elasticsearch.compute.data.BytesRefBlock;
import org.elasticsearch.compute.data.DoubleBlock;
Expand Down Expand Up @@ -122,10 +123,10 @@ private static Object valueAt(String dataType, Block block, int offset, BytesRef
* Converts a list of values to Pages so that we can parse from xcontent. It's not
* super efficient, but it doesn't really have to be.
*/
static Page valuesToPage(List<ColumnInfo> columns, List<List<Object>> values) {
static Page valuesToPage(BlockFactory blockFactory, List<ColumnInfo> columns, List<List<Object>> values) {
List<String> dataTypes = columns.stream().map(ColumnInfo::type).toList();
List<Block.Builder> results = dataTypes.stream()
.map(c -> PlannerUtils.toElementType(EsqlDataTypes.fromName(c)).newBlockBuilder(values.size()))
.map(c -> PlannerUtils.toElementType(EsqlDataTypes.fromName(c)).newBlockBuilder(values.size(), blockFactory))
.toList();

for (List<Object> row : values) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,13 @@
import org.elasticsearch.compute.operator.AbstractPageMappingOperator;
import org.elasticsearch.compute.operator.DriverProfile;
import org.elasticsearch.compute.operator.DriverStatus;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.test.AbstractChunkedSerializingTestCase;
import org.elasticsearch.xcontent.InstantiatingObjectParser;
import org.elasticsearch.xcontent.ObjectParser;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.ParserConstructor;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.xcontent.json.JsonXContent;
Expand All @@ -50,6 +55,9 @@
import java.util.List;
import java.util.stream.Stream;

import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstructorArg;
import static org.elasticsearch.xpack.esql.action.ResponseValueUtils.valuesToPage;
import static org.elasticsearch.xpack.ql.util.SpatialCoordinateTypes.CARTESIAN;
import static org.elasticsearch.xpack.ql.util.SpatialCoordinateTypes.GEO;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -231,7 +239,58 @@ protected Writeable.Reader<EsqlQueryResponse> instanceReader() {

@Override
protected EsqlQueryResponse doParseInstance(XContentParser parser) {
return EsqlQueryResponse.fromXContent(parser);
return ResponseBuilder.fromXContent(parser);
}

public static class ResponseBuilder {
private static final ParseField ID = new ParseField("id");
private static final ParseField IS_RUNNING = new ParseField("is_running");
private static final InstantiatingObjectParser<ResponseBuilder, Void> PARSER;

static {
InstantiatingObjectParser.Builder<ResponseBuilder, Void> parser = InstantiatingObjectParser.builder(
"esql/query_response",
true,
ResponseBuilder.class
);
parser.declareString(optionalConstructorArg(), ID);
parser.declareField(
optionalConstructorArg(),
p -> p.currentToken() == XContentParser.Token.VALUE_NULL ? false : p.booleanValue(),
IS_RUNNING,
ObjectParser.ValueType.BOOLEAN_OR_NULL
);
parser.declareObjectArray(constructorArg(), (p, c) -> ColumnInfo.fromXContent(p), new ParseField("columns"));
parser.declareField(constructorArg(), (p, c) -> p.list(), new ParseField("values"), ObjectParser.ValueType.OBJECT_ARRAY);
PARSER = parser.build();
}

// Used for XContent reconstruction
private final EsqlQueryResponse response;

@ParserConstructor
public ResponseBuilder(@Nullable String asyncExecutionId, Boolean isRunning, List<ColumnInfo> columns, List<List<Object>> values) {
this.response = new EsqlQueryResponse(
columns,
List.of(valuesToPage(BlockFactory.getNonBreakingInstance(), columns, values)),
null,
false,
asyncExecutionId,
isRunning != null,
isAsync(asyncExecutionId, isRunning)
);
}

static boolean isAsync(@Nullable String asyncExecutionId, Boolean isRunning) {
if (asyncExecutionId != null || isRunning != null) {
return true;
}
return false;
}

static EsqlQueryResponse fromXContent(XContentParser parser) {
return PARSER.apply(parser, null).response;
}
}

public void testChunkResponseSizeColumnar() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -434,13 +434,14 @@ public final void testSimpleWithNulls() { // TODO replace this with nulls insert
assumeTrue("All test data types must be representable in order to build fields", testCase.allTypesAreRepresentable());
List<Object> simpleData = testCase.getDataValues();
try (EvalOperator.ExpressionEvaluator eval = evaluator(buildFieldExpression(testCase)).get(driverContext())) {
Block[] orig = BlockUtils.fromListRow(BlockFactory.getNonBreakingInstance(), simpleData);
BlockFactory blockFactory = BlockFactory.getNonBreakingInstance();
Block[] orig = BlockUtils.fromListRow(blockFactory, simpleData);
for (int i = 0; i < orig.length; i++) {
List<Object> data = new ArrayList<>();
Block[] blocks = new Block[orig.length];
for (int b = 0; b < blocks.length; b++) {
if (b == i) {
blocks[b] = orig[b].elementType().newBlockBuilder(1).appendNull().build();
blocks[b] = orig[b].elementType().newBlockBuilder(1, blockFactory).appendNull().build();
data.add(null);
} else {
blocks[b] = orig[b];
Expand Down
Loading

0 comments on commit 51b5f47

Please sign in to comment.