Skip to content

Commit

Permalink
Added stricter range type checks and runtime warnings for ENRICH (ela…
Browse files Browse the repository at this point in the history
…stic#115091)

It has been noted that strange or incorrect error messages are returned if the ENRICH command uses incompatible data types, for example a KEYWORD with value 'foo' using in an int_range match: elastic#107357

This error is thrown at runtime and contradicts the ES|QL policy of only throwing errors at planning time, while at runtime we should instead set results to null and add a warning. However, we could make the planner stricter and block potentially mismatching types earlier.

However runtime parsing of KEYWORD fields has been a feature of ES|QL ENRICH since it's inception, in particular we even have tests asserting that KEYWORD fields containing parsable IP data can be joined to an ip_range ENRICH index.

In order to not create a backwards compatibility problem, we have compromised with the following:

* Strict range type checking at the planner time for incompatible range types, unless the incoming index field is KEYWORD
* For KEYWORD fields, allow runtime parsing of the fields, but when parsing fails, set the result to null and add a warning

Added extra tests to verify behaviour of match policies on non-keyword fields. They all behave as keywords (the enrich field is converted to keyword at policy execution time, and the input data is converted to keyword at lookup time).
  • Loading branch information
craigtaverner committed Nov 20, 2024
1 parent 432e343 commit 7d0e9da
Show file tree
Hide file tree
Showing 19 changed files with 657 additions and 42 deletions.
7 changes: 7 additions & 0 deletions docs/changelog/115091.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
pr: 115091
summary: Added stricter range type checks and runtime warnings for ENRICH
area: ES|QL
type: bug
issues:
- 107357
- 116799
31 changes: 28 additions & 3 deletions docs/reference/esql/esql-enrich-data.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,33 @@ include::{es-ref-dir}/ingest/apis/enrich/execute-enrich-policy.asciidoc[tag=upda

include::../ingest/enrich.asciidoc[tag=update-enrich-policy]

==== Limitations
==== Enrich Policy Types and Limitations
The {esql} `ENRICH` command supports all three enrich policy types:

`geo_match`::
Matches enrich data to incoming documents based on a <<query-dsl-geo-shape-query,`geo_shape` query>>.
For an example, see <<geo-match-enrich-policy-type>>.

`match`::
Matches enrich data to incoming documents based on a <<query-dsl-term-query,`term` query>>.
For an example, see <<match-enrich-policy-type>>.

`range`::
Matches a number, date, or IP address in incoming documents to a range in the
enrich index based on a <<query-dsl-term-query,`term` query>>. For an example,
see <<range-enrich-policy-type>>.

// tag::limitations[]
The {esql} `ENRICH` command only supports enrich policies of type `match`.
Furthermore, `ENRICH` only supports enriching on a column of type `keyword`.
While all three enrich policy types are supported, there are some limitations to be aware of:

* The `geo_match` enrich policy type only supports the `intersects` spatial relation.
* It is required that the `match_field` in the `ENRICH` command is of the correct type.
For example, if the enrich policy is of type `geo_match`, the `match_field` in the `ENRICH`
command must be of type `geo_point` or `geo_shape`.
Likewise, a `range` enrich policy requires a `match_field` of type `integer`, `long`, `date`, or `ip`,
depending on the type of the range field in the original enrich index.
* However, this constraint is relaxed for `range` policies when the `match_field` is of type `KEYWORD`.
In this case the field values will be parsed during query execution, row by row.
If any value fails to parse, the output values for that row will be set to `null`,
an appropriate warning will be produced and the query will continue to execute.
// end::limitations[]
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ static TransportVersion def(int id) {
public static final TransportVersion INDEX_STATS_ADDITIONAL_FIELDS = def(8_793_00_0);
public static final TransportVersion INDEX_STATS_ADDITIONAL_FIELDS_REVERT = def(8_794_00_0);
public static final TransportVersion FAST_REFRESH_RCO_2 = def(8_795_00_0);
public static final TransportVersion ESQL_ENRICH_RUNTIME_WARNINGS = def(8_796_00_0);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
1 change: 1 addition & 0 deletions x-pack/plugin/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -208,5 +208,6 @@ tasks.named("yamlRestTestV7CompatTransform").configure({ task ->
task.skipTest("esql/80_text/reverse text", "The output type changed from TEXT to KEYWORD.")
task.skipTest("esql/80_text/values function", "The output type changed from TEXT to KEYWORD.")
task.skipTest("privileges/11_builtin/Test get builtin privileges" ,"unnecessary to test compatibility")
task.skipTest("esql/61_enrich_ip/Invalid IP strings", "We switched from exceptions to null+warnings for ENRICH runtime errors")
})

Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,13 @@ public DataType noText() {
return isString(this) ? KEYWORD : this;
}

public boolean isDate() {
return switch (this) {
case DATETIME, DATE_NANOS -> true;
default -> false;
};
}

/**
* Named parameters with default values. It's just easier to do this with
* a builder in java....
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.compute.data.IntVector;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.SourceOperator;
import org.elasticsearch.compute.operator.Warnings;
import org.elasticsearch.core.Releasables;

import java.io.IOException;
Expand All @@ -37,17 +38,25 @@ public final class EnrichQuerySourceOperator extends SourceOperator {
private int queryPosition = -1;
private final IndexReader indexReader;
private final IndexSearcher searcher;
private final Warnings warnings;
private final int maxPageSize;

// using smaller pages enables quick cancellation and reduces sorting costs
public static final int DEFAULT_MAX_PAGE_SIZE = 256;

public EnrichQuerySourceOperator(BlockFactory blockFactory, int maxPageSize, QueryList queryList, IndexReader indexReader) {
public EnrichQuerySourceOperator(
BlockFactory blockFactory,
int maxPageSize,
QueryList queryList,
IndexReader indexReader,
Warnings warnings
) {
this.blockFactory = blockFactory;
this.maxPageSize = maxPageSize;
this.queryList = queryList;
this.indexReader = indexReader;
this.searcher = new IndexSearcher(indexReader);
this.warnings = warnings;
}

@Override
Expand All @@ -72,12 +81,18 @@ public Page getOutput() {
}
int totalMatches = 0;
do {
Query query = nextQuery();
if (query == null) {
assert isFinished();
break;
Query query;
try {
query = nextQuery();
if (query == null) {
assert isFinished();
break;
}
query = searcher.rewrite(new ConstantScoreQuery(query));
} catch (Exception e) {
warnings.registerException(e);
continue;
}
query = searcher.rewrite(new ConstantScoreQuery(query));
final var weight = searcher.createWeight(query, ScoreMode.COMPLETE_NO_SCORES, 1.0f);
if (weight == null) {
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import org.elasticsearch.compute.data.IntBlock;
import org.elasticsearch.compute.data.IntVector;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.DriverContext;
import org.elasticsearch.compute.operator.Warnings;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.index.mapper.KeywordFieldMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
Expand Down Expand Up @@ -120,7 +122,8 @@ public void testQueries() throws Exception {
// 3 -> [] -> []
// 4 -> [a1] -> [3]
// 5 -> [] -> []
EnrichQuerySourceOperator queryOperator = new EnrichQuerySourceOperator(blockFactory, 128, queryList, reader);
var warnings = Warnings.createWarnings(DriverContext.WarningsMode.IGNORE, 0, 0, "test enrich");
EnrichQuerySourceOperator queryOperator = new EnrichQuerySourceOperator(blockFactory, 128, queryList, reader, warnings);
Page p0 = queryOperator.getOutput();
assertNotNull(p0);
assertThat(p0.getPositionCount(), equalTo(6));
Expand Down Expand Up @@ -187,7 +190,8 @@ public void testRandomMatchQueries() throws Exception {
MappedFieldType uidField = new KeywordFieldMapper.KeywordFieldType("uid");
var queryList = QueryList.rawTermQueryList(uidField, mock(SearchExecutionContext.class), inputTerms);
int maxPageSize = between(1, 256);
EnrichQuerySourceOperator queryOperator = new EnrichQuerySourceOperator(blockFactory, maxPageSize, queryList, reader);
var warnings = Warnings.createWarnings(DriverContext.WarningsMode.IGNORE, 0, 0, "test enrich");
EnrichQuerySourceOperator queryOperator = new EnrichQuerySourceOperator(blockFactory, maxPageSize, queryList, reader, warnings);
Map<Integer, Set<Integer>> actualPositions = new HashMap<>();
while (queryOperator.isFinished() == false) {
Page page = queryOperator.getOutput();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,8 @@ public void testLookupIndex() throws IOException {
DataType.KEYWORD,
"lookup",
"data",
List.of(new Alias(Source.EMPTY, "l", new ReferenceAttribute(Source.EMPTY, "l", DataType.LONG)))
List.of(new Alias(Source.EMPTY, "l", new ReferenceAttribute(Source.EMPTY, "l", DataType.LONG))),
Source.EMPTY
);
DriverContext driverContext = driverContext();
try (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,11 @@ public enum Cap {
*/
RANGEQUERY_FOR_DATETIME,

/**
* Enforce strict type checking on ENRICH range types, and warnings for KEYWORD parsing at runtime. Done in #115091.
*/
ENRICH_STRICT_RANGE_TYPES,

/**
* Fix for non-unique attribute names in ROW and logical plans.
* https://github.com/elastic/elasticsearch/issues/110541
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.elasticsearch.compute.operator.DriverContext;
import org.elasticsearch.compute.operator.Operator;
import org.elasticsearch.compute.operator.OutputOperator;
import org.elasticsearch.compute.operator.Warnings;
import org.elasticsearch.compute.operator.lookup.EnrichQuerySourceOperator;
import org.elasticsearch.compute.operator.lookup.MergePositionsOperator;
import org.elasticsearch.compute.operator.lookup.QueryList;
Expand Down Expand Up @@ -78,6 +79,7 @@
import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException;
import org.elasticsearch.xpack.esql.core.expression.Alias;
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
import org.elasticsearch.xpack.esql.core.tree.Source;
import org.elasticsearch.xpack.esql.core.type.DataType;
import org.elasticsearch.xpack.esql.planner.EsPhysicalOperationProviders;
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
Expand Down Expand Up @@ -166,6 +168,10 @@ abstract class AbstractLookupService<R extends AbstractLookupService.Request, T
);
}

public ThreadContext getThreadContext() {
return transportService.getThreadPool().getThreadContext();
}

/**
* Convert a request as sent to {@link #lookupAsync} into a transport request after
* preflight checks have been performed.
Expand Down Expand Up @@ -330,11 +336,18 @@ private void doLookup(T request, CancellableTask task, ActionListener<Page> list
releasables.add(mergePositionsOperator);
SearchExecutionContext searchExecutionContext = searchContext.getSearchExecutionContext();
QueryList queryList = queryList(request, searchExecutionContext, inputBlock, request.inputDataType);
var warnings = Warnings.createWarnings(
DriverContext.WarningsMode.COLLECT,
request.source.source().getLineNumber(),
request.source.source().getColumnNumber(),
request.source.text()
);
var queryOperator = new EnrichQuerySourceOperator(
driverContext.blockFactory(),
EnrichQuerySourceOperator.DEFAULT_MAX_PAGE_SIZE,
queryList,
searchExecutionContext.getIndexReader()
searchExecutionContext.getIndexReader(),
warnings
);
releasables.add(queryOperator);
var extractFieldsOperator = extractFieldsOperator(searchContext, driverContext, request.extractFields);
Expand Down Expand Up @@ -450,13 +463,22 @@ abstract static class Request {
final DataType inputDataType;
final Page inputPage;
final List<NamedExpression> extractFields;
final Source source;

Request(String sessionId, String index, DataType inputDataType, Page inputPage, List<NamedExpression> extractFields) {
Request(
String sessionId,
String index,
DataType inputDataType,
Page inputPage,
List<NamedExpression> extractFields,
Source source
) {
this.sessionId = sessionId;
this.index = index;
this.inputDataType = inputDataType;
this.inputPage = inputPage;
this.extractFields = extractFields;
this.source = source;
}
}

Expand All @@ -470,6 +492,7 @@ abstract static class TransportRequest extends org.elasticsearch.transport.Trans
final DataType inputDataType;
final Page inputPage;
final List<NamedExpression> extractFields;
final Source source;
// TODO: Remove this workaround once we have Block RefCount
final Page toRelease;
final RefCounted refs = AbstractRefCounted.of(this::releasePage);
Expand All @@ -480,14 +503,16 @@ abstract static class TransportRequest extends org.elasticsearch.transport.Trans
DataType inputDataType,
Page inputPage,
Page toRelease,
List<NamedExpression> extractFields
List<NamedExpression> extractFields,
Source source
) {
this.sessionId = sessionId;
this.shardId = shardId;
this.inputDataType = inputDataType;
this.inputPage = inputPage;
this.toRelease = toRelease;
this.extractFields = extractFields;
this.source = source;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@
import org.elasticsearch.compute.operator.AsyncOperator;
import org.elasticsearch.compute.operator.DriverContext;
import org.elasticsearch.compute.operator.Operator;
import org.elasticsearch.compute.operator.ResponseHeadersCollector;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
import org.elasticsearch.xpack.esql.core.tree.Source;
import org.elasticsearch.xpack.esql.core.type.DataType;

import java.io.IOException;
Expand All @@ -35,6 +37,8 @@ public final class EnrichLookupOperator extends AsyncOperator {
private final String matchType;
private final String matchField;
private final List<NamedExpression> enrichFields;
private final ResponseHeadersCollector responseHeadersCollector;
private final Source source;
private long totalTerms = 0L;

public record Factory(
Expand All @@ -47,7 +51,8 @@ public record Factory(
String enrichIndex,
String matchType,
String matchField,
List<NamedExpression> enrichFields
List<NamedExpression> enrichFields,
Source source
) implements OperatorFactory {
@Override
public String describe() {
Expand Down Expand Up @@ -75,7 +80,8 @@ public Operator get(DriverContext driverContext) {
enrichIndex,
matchType,
matchField,
enrichFields
enrichFields,
source
);
}
}
Expand All @@ -91,7 +97,8 @@ public EnrichLookupOperator(
String enrichIndex,
String matchType,
String matchField,
List<NamedExpression> enrichFields
List<NamedExpression> enrichFields,
Source source
) {
super(driverContext, maxOutstandingRequests);
this.sessionId = sessionId;
Expand All @@ -103,6 +110,8 @@ public EnrichLookupOperator(
this.matchType = matchType;
this.matchField = matchField;
this.enrichFields = enrichFields;
this.source = source;
this.responseHeadersCollector = new ResponseHeadersCollector(enrichLookupService.getThreadContext());
}

@Override
Expand All @@ -116,9 +125,14 @@ protected void performAsync(Page inputPage, ActionListener<Page> listener) {
matchType,
matchField,
new Page(inputBlock),
enrichFields
enrichFields,
source
);
enrichLookupService.lookupAsync(
request,
parentTask,
ActionListener.runBefore(listener.map(inputPage::appendPage), responseHeadersCollector::collect)
);
enrichLookupService.lookupAsync(request, parentTask, listener.map(inputPage::appendPage));
}

@Override
Expand All @@ -140,6 +154,7 @@ public String toString() {
protected void doClose() {
// TODO: Maybe create a sub-task as the parent task of all the lookup tasks
// then cancel it when this operator terminates early (e.g., have enough result).
responseHeadersCollector.finish();
}

@Override
Expand Down
Loading

0 comments on commit 7d0e9da

Please sign in to comment.