Skip to content

Commit

Permalink
[ML] Add indices_options to datafeed config and update (elastic#52793)
Browse files Browse the repository at this point in the history
This adds a new configurable field called `indices_options`. This allows users to create or update the indices_options used when a datafeed reads from an index. 

This is necessary for the following use cases:
 - Reading from frozen indices
 - Allowing certain indices in multiple index patterns to not exist yet

These index options are available on datafeed creation and update. Users may specify them as URL parameters or within the configuration object.
 
closes elastic#48056
  • Loading branch information
benwtrent authored Feb 27, 2020
1 parent e39eade commit d7a6333
Show file tree
Hide file tree
Showing 47 changed files with 742 additions and 122 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.elasticsearch.client.ml.datafeed;

import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.ml.job.config.Job;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.bytes.BytesArray;
Expand Down Expand Up @@ -63,6 +64,7 @@ public class DatafeedConfig implements ToXContentObject {
public static final ParseField CHUNKING_CONFIG = new ParseField("chunking_config");
public static final ParseField DELAYED_DATA_CHECK_CONFIG = new ParseField("delayed_data_check_config");
public static final ParseField MAX_EMPTY_SEARCHES = new ParseField("max_empty_searches");
public static final ParseField INDICES_OPTIONS = new ParseField("indices_options");

public static final ConstructingObjectParser<Builder, Void> PARSER = new ConstructingObjectParser<>(
"datafeed_config", true, a -> new Builder((String)a[0], (String)a[1]));
Expand Down Expand Up @@ -90,6 +92,9 @@ public class DatafeedConfig implements ToXContentObject {
PARSER.declareObject(Builder::setChunkingConfig, ChunkingConfig.PARSER, CHUNKING_CONFIG);
PARSER.declareObject(Builder::setDelayedDataCheckConfig, DelayedDataCheckConfig.PARSER, DELAYED_DATA_CHECK_CONFIG);
PARSER.declareInt(Builder::setMaxEmptySearches, MAX_EMPTY_SEARCHES);
PARSER.declareObject(Builder::setIndicesOptions,
(p, c) -> IndicesOptions.fromMap(p.map(), new IndicesOptions(IndicesOptions.Option.NONE, IndicesOptions.WildcardStates.NONE)),
INDICES_OPTIONS);
}

private static BytesReference parseBytes(XContentParser parser) throws IOException {
Expand All @@ -110,11 +115,12 @@ private static BytesReference parseBytes(XContentParser parser) throws IOExcepti
private final ChunkingConfig chunkingConfig;
private final DelayedDataCheckConfig delayedDataCheckConfig;
private final Integer maxEmptySearches;
private final IndicesOptions indicesOptions;

private DatafeedConfig(String id, String jobId, TimeValue queryDelay, TimeValue frequency, List<String> indices, BytesReference query,
BytesReference aggregations, List<SearchSourceBuilder.ScriptField> scriptFields, Integer scrollSize,
ChunkingConfig chunkingConfig, DelayedDataCheckConfig delayedDataCheckConfig,
Integer maxEmptySearches) {
Integer maxEmptySearches, IndicesOptions indicesOptions) {
this.id = id;
this.jobId = jobId;
this.queryDelay = queryDelay;
Expand All @@ -127,6 +133,7 @@ private DatafeedConfig(String id, String jobId, TimeValue queryDelay, TimeValue
this.chunkingConfig = chunkingConfig;
this.delayedDataCheckConfig = delayedDataCheckConfig;
this.maxEmptySearches = maxEmptySearches;
this.indicesOptions = indicesOptions;
}

public String getId() {
Expand Down Expand Up @@ -177,6 +184,10 @@ public Integer getMaxEmptySearches() {
return maxEmptySearches;
}

public IndicesOptions getIndicesOptions() {
return indicesOptions;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
Expand Down Expand Up @@ -216,6 +227,11 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (maxEmptySearches != null) {
builder.field(MAX_EMPTY_SEARCHES.getPreferredName(), maxEmptySearches);
}
if (indicesOptions != null) {
builder.startObject(INDICES_OPTIONS.getPreferredName());
indicesOptions.toXContent(builder, params);
builder.endObject();
}

builder.endObject();
return builder;
Expand Down Expand Up @@ -257,7 +273,8 @@ public boolean equals(Object other) {
&& Objects.equals(this.scriptFields, that.scriptFields)
&& Objects.equals(this.chunkingConfig, that.chunkingConfig)
&& Objects.equals(this.delayedDataCheckConfig, that.delayedDataCheckConfig)
&& Objects.equals(this.maxEmptySearches, that.maxEmptySearches);
&& Objects.equals(this.maxEmptySearches, that.maxEmptySearches)
&& Objects.equals(this.indicesOptions, that.indicesOptions);
}

/**
Expand All @@ -268,7 +285,7 @@ public boolean equals(Object other) {
@Override
public int hashCode() {
return Objects.hash(id, jobId, frequency, queryDelay, indices, asMap(query), scrollSize, asMap(aggregations), scriptFields,
chunkingConfig, delayedDataCheckConfig, maxEmptySearches);
chunkingConfig, delayedDataCheckConfig, maxEmptySearches, indicesOptions);
}

public static Builder builder(String id, String jobId) {
Expand All @@ -289,6 +306,7 @@ public static class Builder {
private ChunkingConfig chunkingConfig;
private DelayedDataCheckConfig delayedDataCheckConfig;
private Integer maxEmptySearches;
private IndicesOptions indicesOptions;

public Builder(String id, String jobId) {
this.id = Objects.requireNonNull(id, ID.getPreferredName());
Expand All @@ -308,6 +326,7 @@ public Builder(DatafeedConfig config) {
this.chunkingConfig = config.chunkingConfig;
this.delayedDataCheckConfig = config.getDelayedDataCheckConfig();
this.maxEmptySearches = config.getMaxEmptySearches();
this.indicesOptions = config.indicesOptions;
}

public Builder setIndices(List<String> indices) {
Expand Down Expand Up @@ -395,9 +414,14 @@ public Builder setMaxEmptySearches(int maxEmptySearches) {
return this;
}

public Builder setIndicesOptions(IndicesOptions indicesOptions) {
this.indicesOptions = indicesOptions;
return this;
}

public DatafeedConfig build() {
return new DatafeedConfig(id, jobId, queryDelay, frequency, indices, query, aggregations, scriptFields, scrollSize,
chunkingConfig, delayedDataCheckConfig, maxEmptySearches);
chunkingConfig, delayedDataCheckConfig, maxEmptySearches, indicesOptions);
}

private static BytesReference xContentToBytes(ToXContentObject object) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.elasticsearch.client.ml.datafeed;

import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
Expand Down Expand Up @@ -78,6 +79,9 @@ public class DatafeedUpdate implements ToXContentObject {
DelayedDataCheckConfig.PARSER,
DatafeedConfig.DELAYED_DATA_CHECK_CONFIG);
PARSER.declareInt(Builder::setMaxEmptySearches, DatafeedConfig.MAX_EMPTY_SEARCHES);
PARSER.declareObject(Builder::setIndicesOptions,
(p, c) -> IndicesOptions.fromMap(p.map(), new IndicesOptions(IndicesOptions.Option.NONE, IndicesOptions.WildcardStates.NONE)),
DatafeedConfig.INDICES_OPTIONS);
}

private static BytesReference parseBytes(XContentParser parser) throws IOException {
Expand All @@ -97,11 +101,12 @@ private static BytesReference parseBytes(XContentParser parser) throws IOExcepti
private final ChunkingConfig chunkingConfig;
private final DelayedDataCheckConfig delayedDataCheckConfig;
private final Integer maxEmptySearches;
private final IndicesOptions indicesOptions;

private DatafeedUpdate(String id, TimeValue queryDelay, TimeValue frequency, List<String> indices, BytesReference query,
BytesReference aggregations, List<SearchSourceBuilder.ScriptField> scriptFields, Integer scrollSize,
ChunkingConfig chunkingConfig, DelayedDataCheckConfig delayedDataCheckConfig,
Integer maxEmptySearches) {
Integer maxEmptySearches, IndicesOptions indicesOptions) {
this.id = id;
this.queryDelay = queryDelay;
this.frequency = frequency;
Expand All @@ -113,6 +118,7 @@ private DatafeedUpdate(String id, TimeValue queryDelay, TimeValue frequency, Lis
this.chunkingConfig = chunkingConfig;
this.delayedDataCheckConfig = delayedDataCheckConfig;
this.maxEmptySearches = maxEmptySearches;
this.indicesOptions = indicesOptions;
}

/**
Expand Down Expand Up @@ -152,6 +158,11 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
addOptionalField(builder, DatafeedConfig.SCROLL_SIZE, scrollSize);
addOptionalField(builder, DatafeedConfig.CHUNKING_CONFIG, chunkingConfig);
addOptionalField(builder, DatafeedConfig.MAX_EMPTY_SEARCHES, maxEmptySearches);
if (indicesOptions != null) {
builder.startObject(DatafeedConfig.INDICES_OPTIONS.getPreferredName());
indicesOptions.toXContent(builder, params);
builder.endObject();
}
builder.endObject();
return builder;
}
Expand Down Expand Up @@ -202,6 +213,10 @@ public Integer getMaxEmptySearches() {
return maxEmptySearches;
}

public IndicesOptions getIndicesOptions() {
return indicesOptions;
}

private static Map<String, Object> asMap(BytesReference bytesReference) {
return bytesReference == null ? null : XContentHelper.convertToMap(bytesReference, true, XContentType.JSON).v2();
}
Expand Down Expand Up @@ -237,7 +252,8 @@ public boolean equals(Object other) {
&& Objects.equals(this.delayedDataCheckConfig, that.delayedDataCheckConfig)
&& Objects.equals(this.scriptFields, that.scriptFields)
&& Objects.equals(this.chunkingConfig, that.chunkingConfig)
&& Objects.equals(this.maxEmptySearches, that.maxEmptySearches);
&& Objects.equals(this.maxEmptySearches, that.maxEmptySearches)
&& Objects.equals(this.indicesOptions, that.indicesOptions);
}

/**
Expand All @@ -248,7 +264,7 @@ public boolean equals(Object other) {
@Override
public int hashCode() {
return Objects.hash(id, frequency, queryDelay, indices, asMap(query), scrollSize, asMap(aggregations), scriptFields,
chunkingConfig, delayedDataCheckConfig, maxEmptySearches);
chunkingConfig, delayedDataCheckConfig, maxEmptySearches, indicesOptions);
}

public static Builder builder(String id) {
Expand All @@ -268,6 +284,7 @@ public static class Builder {
private ChunkingConfig chunkingConfig;
private DelayedDataCheckConfig delayedDataCheckConfig;
private Integer maxEmptySearches;
private IndicesOptions indicesOptions;

public Builder(String id) {
this.id = Objects.requireNonNull(id, DatafeedConfig.ID.getPreferredName());
Expand All @@ -285,6 +302,7 @@ public Builder(DatafeedUpdate config) {
this.chunkingConfig = config.chunkingConfig;
this.delayedDataCheckConfig = config.delayedDataCheckConfig;
this.maxEmptySearches = config.maxEmptySearches;
this.indicesOptions = config.indicesOptions;
}

public Builder setIndices(List<String> indices) {
Expand Down Expand Up @@ -363,9 +381,14 @@ public Builder setMaxEmptySearches(int maxEmptySearches) {
return this;
}

public Builder setIndicesOptions(IndicesOptions indicesOptions) {
this.indicesOptions = indicesOptions;
return this;
}

public DatafeedUpdate build() {
return new DatafeedUpdate(id, queryDelay, frequency, indices, query, aggregations, scriptFields, scrollSize,
chunkingConfig, delayedDataCheckConfig, maxEmptySearches);
chunkingConfig, delayedDataCheckConfig, maxEmptySearches, indicesOptions);
}

private static BytesReference xContentToBytes(ToXContentObject object) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.elasticsearch.client.ml.datafeed;

import com.carrotsearch.randomizedtesting.generators.CodepointSetGenerator;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
Expand Down Expand Up @@ -109,6 +110,13 @@ public static DatafeedConfig.Builder createRandomBuilder() {
if (randomBoolean()) {
builder.setMaxEmptySearches(randomIntBetween(10, 100));
}
if (randomBoolean()) {
builder.setIndicesOptions(IndicesOptions.fromOptions(randomBoolean(),
randomBoolean(),
randomBoolean(),
randomBoolean(),
randomBoolean()));
}
return builder;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.elasticsearch.client.ml.datafeed;

import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.query.QueryBuilders;
Expand Down Expand Up @@ -83,6 +84,13 @@ public static DatafeedUpdate createRandom() {
if (randomBoolean()) {
builder.setMaxEmptySearches(randomIntBetween(10, 100));
}
if (randomBoolean()) {
builder.setIndicesOptions(IndicesOptions.fromOptions(randomBoolean(),
randomBoolean(),
randomBoolean(),
randomBoolean(),
randomBoolean()));
}
return builder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ include::{docdir}/ml/ml-shared.asciidoc[tag=script-fields]
(Optional, unsigned integer)
include::{docdir}/ml/ml-shared.asciidoc[tag=scroll-size]

`indices_options`::
(Optional, object)
include::{docdir}/ml/ml-shared.asciidoc[tag=indices-options]


[[ml-put-datafeed-example]]
==== {api-examples-title}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ include::{docdir}/ml/ml-shared.asciidoc[tag=script-fields]
(Optional, unsigned integer)
include::{docdir}/ml/ml-shared.asciidoc[tag=scroll-size]

`indices_options`::
(Optional, object)
include::{docdir}/ml/ml-shared.asciidoc[tag=indices-options]

[[ml-update-datafeed-example]]
==== {api-examples-title}

Expand Down
13 changes: 13 additions & 0 deletions docs/reference/ml/ml-shared.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,19 @@ not be set to `false` on any {ml} nodes.
--
end::indices[]

tag::indices-options[]
Object specifying index expansion options used during search.
For example:
```
{
"expand_wildcards": ["all"],
"ignore_unavailable": true,
"allow_no_indices": "false",
"ignore_throttled": true
}
```
end::indices-options[]

tag::influencers[]
A comma separated list of influencer field names. Typically these can be the by,
over, or partition fields that are used in the detector configuration. You might
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
Expand All @@ -32,8 +33,11 @@ private PutDatafeedAction() {

public static class Request extends AcknowledgedRequest<Request> implements ToXContentObject {

public static Request parseRequest(String datafeedId, XContentParser parser) {
public static Request parseRequest(String datafeedId, IndicesOptions indicesOptions, XContentParser parser) {
DatafeedConfig.Builder datafeed = DatafeedConfig.STRICT_PARSER.apply(parser, null);
if (datafeed.getIndicesOptions() == null) {
datafeed.setIndicesOptions(indicesOptions);
}
datafeed.setId(datafeedId);
return new Request(datafeed.build());
}
Expand Down
Loading

0 comments on commit d7a6333

Please sign in to comment.