Skip to content

Commit

Permalink
Refactored SearchStats to interface, to reduce subtle bugs in tests
Browse files Browse the repository at this point in the history
The older approach with test implementations of SearchStats extending the original with an empty list lead to subtle bugs when adding more methods to SearchStats and not also adding overriding methods to the various test implementations.
This refactoring forces test implementations to always implement their own versions of all methods. There was, in fact, no test implementation that actually used the underlying classes capabilities, so this was a very easy and natural move.
  • Loading branch information
craigtaverner committed Oct 31, 2024
1 parent 4d1a5a1 commit ad6771f
Show file tree
Hide file tree
Showing 6 changed files with 371 additions and 333 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -210,10 +210,7 @@ public static EsRelation relation() {
return new EsRelation(EMPTY, new EsIndex(randomAlphaOfLength(8), emptyMap()), IndexMode.STANDARD, randomBoolean());
}

public static class TestSearchStats extends SearchStats {
public TestSearchStats() {
super(emptyList());
}
public static class TestSearchStats implements SearchStats {

@Override
public long count() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.elasticsearch.xpack.esql.planner.mapper.Mapper;
import org.elasticsearch.xpack.esql.session.Configuration;
import org.elasticsearch.xpack.esql.stats.SearchStats;
import org.elasticsearch.xpack.esql.stats.SearchStatsFromContexts;

import java.util.ArrayList;
import java.util.LinkedHashSet;
Expand Down Expand Up @@ -138,7 +139,7 @@ public static String[] planOriginalIndices(PhysicalPlan plan) {
}

public static PhysicalPlan localPlan(List<SearchExecutionContext> searchContexts, Configuration configuration, PhysicalPlan plan) {
return localPlan(configuration, plan, new SearchStats(searchContexts));
return localPlan(configuration, plan, new SearchStatsFromContexts(searchContexts));
}

public static PhysicalPlan localPlan(Configuration configuration, PhysicalPlan plan, SearchStats searchStats) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,336 +7,31 @@

package org.elasticsearch.xpack.esql.stats;

import org.apache.lucene.index.DocValuesType;
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.FieldInfos;
import org.apache.lucene.index.IndexOptions;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.PointValues;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.Terms;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.index.mapper.ConstantFieldType;
import org.elasticsearch.index.mapper.DocCountFieldMapper.DocCountFieldType;
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.NumberFieldMapper.NumberFieldType;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.mapper.TextFieldMapper;
import org.elasticsearch.index.query.SearchExecutionContext;
import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException;
import org.elasticsearch.xpack.esql.core.type.DataType;

import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

import static org.elasticsearch.index.mapper.DataStreamTimestampFieldMapper.TimestampFieldType;
import static org.elasticsearch.index.mapper.DateFieldMapper.DateFieldType;
import static org.elasticsearch.index.mapper.KeywordFieldMapper.KeywordFieldType;

public class SearchStats {

private final List<SearchExecutionContext> contexts;

private record FieldStatsConfig(boolean exists, boolean hasIdenticalDelegate, boolean indexed, boolean hasDocValues) {}

private static class FieldStat {
private Long count;
private Object min, max;
private Boolean singleValue;
private FieldStatsConfig config;
}

private static final int CACHE_SIZE = 32;

// simple non-thread-safe cache for avoiding unnecessary IO (which while fast it still I/O)
private final Map<String, FieldStat> cache = new LinkedHashMap<>(CACHE_SIZE, 0.75f, true) {
@Override
protected boolean removeEldestEntry(Map.Entry<String, FieldStat> eldest) {
return size() > CACHE_SIZE;
}
};

public SearchStats(List<SearchExecutionContext> contexts) {
this.contexts = contexts;
}

public long count() {
var count = new long[] { 0 };
boolean completed = doWithContexts(r -> {
count[0] += r.numDocs();
return true;
}, false);
return completed ? count[0] : -1;
}

public long count(String field) {
var stat = cache.computeIfAbsent(field, this::makeFieldStat);
if (stat.count == null) {
var count = new long[] { 0 };
boolean completed = doWithContexts(r -> {
count[0] += countEntries(r, field);
return true;
}, false);
stat.count = completed ? count[0] : -1;
}
return stat.count;
}

public long count(String field, BytesRef value) {
var count = new long[] { 0 };
Term term = new Term(field, value);
boolean completed = doWithContexts(r -> {
count[0] += r.docFreq(term);
return true;
}, false);
return completed ? count[0] : -1;
}

public boolean exists(String field) {
var stat = cache.computeIfAbsent(field, this::makeFieldStat);
return stat.config.exists;
}

private FieldStat makeFieldStat(String field) {
var stat = new FieldStat();
stat.config = determineFieldStatsConfig(field);
return stat;
}

private FieldStatsConfig determineFieldStatsConfig(String field) {
boolean exists = false;
boolean hasIdenticalDelegate = true;
boolean indexed = true;
boolean hasDocValues = true;
// even if there are deleted documents, check the existence of a field
// since if it's missing, deleted documents won't change that
for (SearchExecutionContext context : contexts) {
if (context.isFieldMapped(field)) {
exists = exists || true;
MappedFieldType type = context.getFieldType(field);
indexed = indexed && type.isIndexed();
hasDocValues = hasDocValues && type.hasDocValues();
if (type instanceof TextFieldMapper.TextFieldType t) {
hasIdenticalDelegate = hasIdenticalDelegate && t.canUseSyntheticSourceDelegateForQuerying();
} else {
hasIdenticalDelegate = false;
}
} else {
indexed = false;
hasDocValues = false;
hasIdenticalDelegate = false;
}
}
if (exists == false) {
// if it does not exist on any context, no other settings are valid
return new FieldStatsConfig(false, false, false, false);
} else {
return new FieldStatsConfig(exists, hasIdenticalDelegate, indexed, hasDocValues);
}
}

public boolean hasIdenticalDelegate(String field) {
var stat = cache.computeIfAbsent(field, this::makeFieldStat);
return stat.config.hasIdenticalDelegate;
}

public boolean isIndexed(String field) {
var stat = cache.computeIfAbsent(field, this::makeFieldStat);
return stat.config.indexed;
}

public boolean hasDocValues(String field) {
var stat = cache.computeIfAbsent(field, this::makeFieldStat);
return stat.config.hasDocValues;
}

public byte[] min(String field, DataType dataType) {
var stat = cache.computeIfAbsent(field, this::makeFieldStat);
if (stat.min == null) {
var min = new byte[][] { null };
doWithContexts(r -> {
byte[] localMin = PointValues.getMinPackedValue(r, field);
// TODO: how to compare with the previous min
if (localMin != null) {
if (min[0] == null) {
min[0] = localMin;
} else {
throw new EsqlIllegalArgumentException("Don't know how to compare with previous min");
}
}
return true;
}, true);
stat.min = min[0];
}
// return stat.min;
return null;
}

public byte[] max(String field, DataType dataType) {
var stat = cache.computeIfAbsent(field, this::makeFieldStat);
if (stat.max == null) {
var max = new byte[][] { null };
doWithContexts(r -> {
byte[] localMax = PointValues.getMaxPackedValue(r, field);
// TODO: how to compare with the previous max
if (localMax != null) {
if (max[0] == null) {
max[0] = localMax;
} else {
throw new EsqlIllegalArgumentException("Don't know how to compare with previous max");
}
}
return true;
}, true);
stat.max = max[0];
}
// return stat.max;
return null;
}

public boolean isSingleValue(String field) {
var stat = cache.computeIfAbsent(field, this::makeFieldStat);
if (stat.singleValue == null) {
// there's no such field so no need to worry about multi-value fields
if (exists(field) == false) {
stat.singleValue = true;
} else {
// fields are MV per default
var sv = new boolean[] { false };
for (SearchExecutionContext context : contexts) {
MappedFieldType mappedType = context.isFieldMapped(field) ? context.getFieldType(field) : null;
if (mappedType != null) {
sv[0] = true;
doWithContexts(r -> {
sv[0] &= detectSingleValue(r, mappedType, field);
return sv[0];
}, true);
break;
}
}
stat.singleValue = sv[0];
}
}
return stat.singleValue;
}

private boolean detectSingleValue(IndexReader r, MappedFieldType fieldType, String name) throws IOException {
// types that are always single value (and are accessible through instanceof)
if (fieldType instanceof ConstantFieldType || fieldType instanceof DocCountFieldType || fieldType instanceof TimestampFieldType) {
return true;
}

var typeName = fieldType.typeName();

// non-visible fields, check their names
boolean found = switch (typeName) {
case IdFieldMapper.NAME, SeqNoFieldMapper.NAME -> true;
default -> false;
};
/**
* Interface for determining information about fields in the index.
* This is used by the optimizer to make decisions about how to optimize queries.
*/
public interface SearchStats {
long count();

if (found) {
return true;
}
long count(String field);

// check against doc size
DocCountTester tester = null;
if (fieldType instanceof DateFieldType || fieldType instanceof NumberFieldType) {
tester = lr -> {
PointValues values = lr.getPointValues(name);
return values == null || values.size() == values.getDocCount();
};
} else if (fieldType instanceof KeywordFieldType) {
tester = lr -> {
Terms terms = lr.terms(name);
return terms == null || terms.size() == terms.getDocCount();
};
}
long count(String field, BytesRef value);

if (tester != null) {
// check each leaf
for (LeafReaderContext context : r.leaves()) {
if (tester.test(context.reader()) == false) {
return false;
}
}
// field is missing or single value
return true;
}
boolean exists(String field);

// unsupported type - default to MV
return false;
}
byte[] min(String field, DataType dataType);

private interface DocCountTester {
Boolean test(LeafReader leafReader) throws IOException;
}
byte[] max(String field, DataType dataType);

//
// @see org.elasticsearch.search.query.QueryPhaseCollectorManager#shortcutTotalHitCount(IndexReader, Query)
//
private static long countEntries(IndexReader indexReader, String field) {
long count = 0;
try {
for (LeafReaderContext context : indexReader.leaves()) {
LeafReader reader = context.reader();
FieldInfos fieldInfos = reader.getFieldInfos();
FieldInfo fieldInfo = fieldInfos.fieldInfo(field);
boolean isSingleValue(String field);

if (fieldInfo != null) {
if (fieldInfo.getDocValuesType() == DocValuesType.NONE) {
// no shortcut possible: it's a text field, empty values are counted as no value.
return -1;
}
if (fieldInfo.getPointIndexDimensionCount() > 0) {
PointValues points = reader.getPointValues(field);
if (points != null) {
count += points.size();
}
} else if (fieldInfo.getIndexOptions() != IndexOptions.NONE) {
Terms terms = reader.terms(field);
if (terms != null) {
count += terms.getSumTotalTermFreq();
}
} else {
return -1; // no shortcut possible for fields that are not indexed
}
}
}
} catch (IOException ex) {
throw new EsqlIllegalArgumentException("Cannot access data storage", ex);
}
return count;
}
boolean isIndexed(String field);

private interface IndexReaderConsumer {
/**
* Returns true if the consumer should keep on going, false otherwise.
*/
boolean consume(IndexReader reader) throws IOException;
}
boolean hasDocValues(String field);

private boolean doWithContexts(IndexReaderConsumer consumer, boolean acceptsDeletions) {
try {
for (SearchExecutionContext context : contexts) {
for (LeafReaderContext leafContext : context.searcher().getLeafContexts()) {
var reader = leafContext.reader();
if (acceptsDeletions == false && reader.hasDeletions()) {
return false;
}
// check if the looping continues or not
if (consumer.consume(reader) == false) {
return false;
}
}
}
return true;
} catch (IOException ex) {
throw new EsqlIllegalArgumentException("Cannot access data storage", ex);
}
}
boolean hasIdenticalDelegate(String field);
}
Loading

0 comments on commit ad6771f

Please sign in to comment.