Skip to content

Commit

Permalink
Add shard_failures to the response
Browse files Browse the repository at this point in the history
  • Loading branch information
luigidellaquila committed Nov 7, 2024
1 parent 4aa4a97 commit 95f9a1b
Show file tree
Hide file tree
Showing 14 changed files with 214 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;

public class PartialSearchResultsIT extends AbstractEqlIntegTestCase {

Expand Down Expand Up @@ -96,6 +97,7 @@ public void testPartialResults() throws Exception {
for (int i = 0; i < 10; i++) {
assertThat(response.hits().events().get(i).toString(), containsString("\"value\" : " + i));
}
assertThat(response.shardFailures().length, is(0));

// sequence query on both shards
request = new EqlSearchRequest().indices("test-*")
Expand All @@ -106,6 +108,7 @@ public void testPartialResults() throws Exception {
EqlSearchResponse.Sequence sequence = response.hits().sequences().get(0);
assertThat(sequence.events().get(0).toString(), containsString("\"value\" : 1"));
assertThat(sequence.events().get(1).toString(), containsString("\"value\" : 2"));
assertThat(response.shardFailures().length, is(0));

// sequence query on the available shard only
request = new EqlSearchRequest().indices("test-*")
Expand All @@ -116,6 +119,7 @@ public void testPartialResults() throws Exception {
sequence = response.hits().sequences().get(0);
assertThat(sequence.events().get(0).toString(), containsString("\"value\" : 1"));
assertThat(sequence.events().get(1).toString(), containsString("\"value\" : 3"));
assertThat(response.shardFailures().length, is(0));

// sequence query on the unavailable shard only
request = new EqlSearchRequest().indices("test-*")
Expand All @@ -126,13 +130,15 @@ public void testPartialResults() throws Exception {
sequence = response.hits().sequences().get(0);
assertThat(sequence.events().get(0).toString(), containsString("\"value\" : 0"));
assertThat(sequence.events().get(1).toString(), containsString("\"value\" : 2"));
assertThat(response.shardFailures().length, is(0));

// sequence query with missing event on unavailable shard
request = new EqlSearchRequest().indices("test-*")
.query("sequence with maxspan=10s [process where value == 1] ![process where value == 2] [process where value == 3]")
.allowPartialSearchResults(true);
response = client().execute(EqlSearchAction.INSTANCE, request).get();
assertThat(response.hits().sequences().size(), equalTo(0));
assertThat(response.shardFailures().length, is(0));

// sample query on both shards
request = new EqlSearchRequest().indices("test-*")
Expand All @@ -143,6 +149,7 @@ public void testPartialResults() throws Exception {
EqlSearchResponse.Sequence sample = response.hits().sequences().get(0);
assertThat(sample.events().get(0).toString(), containsString("\"value\" : 2"));
assertThat(sample.events().get(1).toString(), containsString("\"value\" : 1"));
assertThat(response.shardFailures().length, is(0));

// sample query on the available shard only
request = new EqlSearchRequest().indices("test-*")
Expand All @@ -153,6 +160,7 @@ public void testPartialResults() throws Exception {
sample = response.hits().sequences().get(0);
assertThat(sample.events().get(0).toString(), containsString("\"value\" : 3"));
assertThat(sample.events().get(1).toString(), containsString("\"value\" : 1"));
assertThat(response.shardFailures().length, is(0));

// sample query on the unavailable shard only
request = new EqlSearchRequest().indices("test-*")
Expand All @@ -163,6 +171,7 @@ public void testPartialResults() throws Exception {
sample = response.hits().sequences().get(0);
assertThat(sample.events().get(0).toString(), containsString("\"value\" : 2"));
assertThat(sample.events().get(1).toString(), containsString("\"value\" : 0"));
assertThat(response.shardFailures().length, is(0));

// ------------------------------------------------------------------------
// stop one of the nodes, make one of the shards unavailable
Expand All @@ -181,13 +190,17 @@ public void testPartialResults() throws Exception {
for (int i = 0; i < 5; i++) {
assertThat(response.hits().events().get(i).toString(), containsString("\"value\" : " + (i * 2 + 1)));
}
assertThat(response.shardFailures().length, is(1));

// sequence query on both shards
request = new EqlSearchRequest().indices("test-*")
.query("sequence [process where value == 1] [process where value == 2]")
.allowPartialSearchResults(true);
response = client().execute(EqlSearchAction.INSTANCE, request).get();
assertThat(response.hits().sequences().size(), equalTo(0));
assertThat(response.shardFailures().length, is(1));
assertThat(response.shardFailures()[0].index(), is("test-1"));
assertThat(response.shardFailures()[0].reason(), containsString("NoShardAvailableActionException"));

// sequence query on the available shard only
request = new EqlSearchRequest().indices("test-*")
Expand All @@ -198,13 +211,19 @@ public void testPartialResults() throws Exception {
sequence = response.hits().sequences().get(0);
assertThat(sequence.events().get(0).toString(), containsString("\"value\" : 1"));
assertThat(sequence.events().get(1).toString(), containsString("\"value\" : 3"));
assertThat(response.shardFailures().length, is(1));
assertThat(response.shardFailures()[0].index(), is("test-1"));
assertThat(response.shardFailures()[0].reason(), containsString("NoShardAvailableActionException"));

// sequence query on the unavailable shard only
request = new EqlSearchRequest().indices("test-*")
.query("sequence [process where value == 0] [process where value == 2]")
.allowPartialSearchResults(true);
response = client().execute(EqlSearchAction.INSTANCE, request).get();
assertThat(response.hits().sequences().size(), equalTo(0));
assertThat(response.shardFailures().length, is(1));
assertThat(response.shardFailures()[0].index(), is("test-1"));
assertThat(response.shardFailures()[0].reason(), containsString("NoShardAvailableActionException"));

// sequence query with missing event on unavailable shard. THIS IS A FALSE POSITIVE
request = new EqlSearchRequest().indices("test-*")
Expand All @@ -215,13 +234,19 @@ public void testPartialResults() throws Exception {
sequence = response.hits().sequences().get(0);
assertThat(sequence.events().get(0).toString(), containsString("\"value\" : 1"));
assertThat(sequence.events().get(2).toString(), containsString("\"value\" : 3"));
assertThat(response.shardFailures().length, is(1));
assertThat(response.shardFailures()[0].index(), is("test-1"));
assertThat(response.shardFailures()[0].reason(), containsString("NoShardAvailableActionException"));

// sample query on both shards
request = new EqlSearchRequest().indices("test-*")
.query("sample by key [process where value == 2] [process where value == 1]")
.allowPartialSearchResults(true);
response = client().execute(EqlSearchAction.INSTANCE, request).get();
assertThat(response.hits().sequences().size(), equalTo(0));
assertThat(response.shardFailures().length, is(1));
assertThat(response.shardFailures()[0].index(), is("test-1"));
assertThat(response.shardFailures()[0].reason(), containsString("NoShardAvailableActionException"));

// sample query on the available shard only
request = new EqlSearchRequest().indices("test-*")
Expand All @@ -232,14 +257,19 @@ public void testPartialResults() throws Exception {
sample = response.hits().sequences().get(0);
assertThat(sample.events().get(0).toString(), containsString("\"value\" : 3"));
assertThat(sample.events().get(1).toString(), containsString("\"value\" : 1"));
assertThat(response.shardFailures().length, is(1));
assertThat(response.shardFailures()[0].index(), is("test-1"));
assertThat(response.shardFailures()[0].reason(), containsString("NoShardAvailableActionException"));

// sample query on the unavailable shard only
request = new EqlSearchRequest().indices("test-*")
.query("sample by key [process where value == 2] [process where value == 0]")
.allowPartialSearchResults(true);
response = client().execute(EqlSearchAction.INSTANCE, request).get();
assertThat(response.hits().sequences().size(), equalTo(0));

assertThat(response.shardFailures().length, is(1));
assertThat(response.shardFailures()[0].index(), is("test-1"));
assertThat(response.shardFailures()[0].reason(), containsString("NoShardAvailableActionException"));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@
package org.elasticsearch.xpack.eql.action;

import org.apache.lucene.search.TotalHits;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
Expand All @@ -17,6 +20,7 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParserUtils;
import org.elasticsearch.core.Nullable;
Expand All @@ -42,6 +46,7 @@
import java.util.Map;
import java.util.Objects;

import static org.elasticsearch.action.search.ShardSearchFailure.readShardSearchFailure;
import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstructorArg;
import static org.elasticsearch.xpack.eql.util.SearchHitUtils.qualifiedIndex;
Expand All @@ -54,6 +59,7 @@ public class EqlSearchResponse extends ActionResponse implements ToXContentObjec
private final String asyncExecutionId;
private final boolean isRunning;
private final boolean isPartial;
private final ShardSearchFailure[] shardFailures;

private static final class Fields {
static final String TOOK = "took";
Expand All @@ -62,6 +68,7 @@ private static final class Fields {
static final String ID = "id";
static final String IS_RUNNING = "is_running";
static final String IS_PARTIAL = "is_partial";
static final String SHARD_FAILURES = "shard_failures";
}

private static final ParseField TOOK = new ParseField(Fields.TOOK);
Expand All @@ -70,8 +77,10 @@ private static final class Fields {
private static final ParseField ID = new ParseField(Fields.ID);
private static final ParseField IS_RUNNING = new ParseField(Fields.IS_RUNNING);
private static final ParseField IS_PARTIAL = new ParseField(Fields.IS_PARTIAL);
private static final ParseField SHARD_FAILURES = new ParseField(Fields.SHARD_FAILURES);

private static final InstantiatingObjectParser<EqlSearchResponse, Void> PARSER;

static {
InstantiatingObjectParser.Builder<EqlSearchResponse, Void> parser = InstantiatingObjectParser.builder(
"eql/search_response",
Expand All @@ -84,11 +93,12 @@ private static final class Fields {
parser.declareString(optionalConstructorArg(), ID);
parser.declareBoolean(constructorArg(), IS_RUNNING);
parser.declareBoolean(constructorArg(), IS_PARTIAL);
parser.declareObjectArray(optionalConstructorArg(), (p, c) -> ShardSearchFailure.EMPTY_ARRAY, SHARD_FAILURES); // TODO fix this
PARSER = parser.build();
}

public EqlSearchResponse(Hits hits, long tookInMillis, boolean isTimeout) {
this(hits, tookInMillis, isTimeout, null, false, false);
public EqlSearchResponse(Hits hits, long tookInMillis, boolean isTimeout, ShardSearchFailure[] shardFailures) {
this(hits, tookInMillis, isTimeout, null, false, false, shardFailures);
}

public EqlSearchResponse(
Expand All @@ -97,7 +107,8 @@ public EqlSearchResponse(
boolean isTimeout,
String asyncExecutionId,
boolean isRunning,
boolean isPartial
boolean isPartial,
ShardSearchFailure[] shardFailures
) {
super();
this.hits = hits == null ? Hits.EMPTY : hits;
Expand All @@ -106,6 +117,7 @@ public EqlSearchResponse(
this.asyncExecutionId = asyncExecutionId;
this.isRunning = isRunning;
this.isPartial = isPartial;
this.shardFailures = shardFailures;
}

public EqlSearchResponse(StreamInput in) throws IOException {
Expand All @@ -116,6 +128,19 @@ public EqlSearchResponse(StreamInput in) throws IOException {
asyncExecutionId = in.readOptionalString();
isPartial = in.readBoolean();
isRunning = in.readBoolean();
if (in.getTransportVersion().onOrAfter(TransportVersions.EQL_ALLOW_PARTIAL_SEARCH_RESULTS)) {
int size = in.readVInt();
if (size == 0) {
shardFailures = ShardSearchFailure.EMPTY_ARRAY;
} else {
shardFailures = new ShardSearchFailure[size];
for (int i = 0; i < shardFailures.length; i++) {
shardFailures[i] = readShardSearchFailure(in);
}
}
} else {
shardFailures = ShardSearchFailure.EMPTY_ARRAY;
}
}

public static EqlSearchResponse fromXContent(XContentParser parser) {
Expand All @@ -130,6 +155,12 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalString(asyncExecutionId);
out.writeBoolean(isPartial);
out.writeBoolean(isRunning);
if (out.getTransportVersion().onOrAfter(TransportVersions.EQL_ALLOW_PARTIAL_SEARCH_RESULTS)) {
out.writeVInt(shardFailures.length);
for (ShardSearchFailure shardSearchFailure : shardFailures) {
shardSearchFailure.writeTo(out);
}
}
}

@Override
Expand All @@ -147,6 +178,13 @@ private XContentBuilder innerToXContent(XContentBuilder builder, Params params)
builder.field(IS_RUNNING.getPreferredName(), isRunning);
builder.field(TOOK.getPreferredName(), tookInMillis);
builder.field(TIMED_OUT.getPreferredName(), isTimeout);
if (CollectionUtils.isEmpty(shardFailures) == false) {
builder.startArray(SHARD_FAILURES.getPreferredName());
for (ShardOperationFailedException shardFailure : ExceptionsHelper.groupBy(shardFailures)) {
shardFailure.toXContent(builder, params);
}
builder.endArray();
}
hits.toXContent(builder, params);
return builder;
}
Expand Down Expand Up @@ -178,6 +216,10 @@ public boolean isPartial() {
return isPartial;
}

public ShardSearchFailure[] shardFailures() {
return shardFailures;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -190,12 +232,13 @@ public boolean equals(Object o) {
return Objects.equals(hits, that.hits)
&& Objects.equals(tookInMillis, that.tookInMillis)
&& Objects.equals(isTimeout, that.isTimeout)
&& Objects.equals(asyncExecutionId, that.asyncExecutionId);
&& Objects.equals(asyncExecutionId, that.asyncExecutionId)
&& Objects.equals(shardFailures, that.shardFailures);
}

@Override
public int hashCode() {
return Objects.hash(hits, tookInMillis, isTimeout, asyncExecutionId);
return Objects.hash(hits, tookInMillis, isTimeout, asyncExecutionId, shardFailures);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

package org.elasticsearch.xpack.eql.action;

import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.xpack.core.async.AsyncExecutionId;
Expand Down Expand Up @@ -39,7 +40,8 @@ public EqlSearchResponse getCurrentResult() {
false,
getExecutionId().getEncoded(),
true,
true
true,
ShardSearchFailure.EMPTY_ARRAY
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,20 @@

package org.elasticsearch.xpack.eql.execution.payload;

import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.xpack.eql.session.Payload;

public abstract class AbstractPayload implements Payload {

private final boolean timedOut;
private final TimeValue timeTook;
private ShardSearchFailure[] shardFailures;

protected AbstractPayload(boolean timedOut, TimeValue timeTook) {
protected AbstractPayload(boolean timedOut, TimeValue timeTook, ShardSearchFailure[] shardFailures) {
this.timedOut = timedOut;
this.timeTook = timeTook;
this.shardFailures = shardFailures;
}

@Override
Expand All @@ -29,4 +32,9 @@ public boolean timedOut() {
public TimeValue timeTook() {
return timeTook;
}

@Override
public ShardSearchFailure[] shardFailures() {
return shardFailures;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public class EventPayload extends AbstractPayload {
private final List<Event> values;

public EventPayload(SearchResponse response) {
super(response.isTimedOut(), response.getTook());
super(response.isTimedOut(), response.getTook(), response.getShardFailures());

SearchHits hits = response.getHits();
values = new ArrayList<>(hits.getHits().length);
Expand Down
Loading

0 comments on commit 95f9a1b

Please sign in to comment.