Skip to content

Commit

Permalink
iter
Browse files Browse the repository at this point in the history
  • Loading branch information
pmpailis committed Sep 17, 2024
1 parent 71b30ce commit 933d4b5
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ static TransportVersion def(int id) {
public static final TransportVersion CCS_TELEMETRY_STATS = def(8_739_00_0);
public static final TransportVersion GLOBAL_RETENTION_TELEMETRY = def(8_740_00_0);
public static final TransportVersion ROUTING_TABLE_VERSION_REMOVED = def(8_741_00_0);
public static final TransportVersion SEARCH_REQUEST_SHARD_REORDERING = def(8_742_00_0);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,7 @@ private GroupShardsIterator<SearchShardIterator> getIterator(
iter.skip(true);
}
}
if (shouldSortShards(results.minAndMaxes) == false) {
if (false == request.allowShardReordering() || false == shouldSortShards(results.minAndMaxes)) {
return shardsIts;
}
FieldSortBuilder fieldSort = FieldSortBuilder.getPrimaryFieldSortOrNull(request.source());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ public class SearchRequest extends ActionRequest implements IndicesRequest.Repla
*/
private boolean forceSyntheticSource = false;

// specify whether we allow shards to be re-order (e.g. based on their timestamp)
// or to explicitly iterate them based on their natural ordering
private boolean allowShardReordering = true;

public SearchRequest() {
this((Version) null);
}
Expand Down Expand Up @@ -223,6 +227,7 @@ private SearchRequest(
this.waitForCheckpoints = searchRequest.waitForCheckpoints;
this.waitForCheckpointsTimeout = searchRequest.waitForCheckpointsTimeout;
this.forceSyntheticSource = searchRequest.forceSyntheticSource;
this.allowShardReordering = searchRequest.allowShardReordering;
}

/**
Expand Down Expand Up @@ -275,6 +280,11 @@ public SearchRequest(StreamInput in) throws IOException {
} else {
forceSyntheticSource = false;
}
if (in.getTransportVersion().onOrAfter(TransportVersions.SEARCH_REQUEST_SHARD_REORDERING)) {
allowShardReordering = in.readBoolean();
} else {
allowShardReordering = true;
}
}

@Override
Expand Down Expand Up @@ -315,6 +325,9 @@ public void writeTo(StreamOutput out) throws IOException {
throw new IllegalArgumentException("force_synthetic_source is not supported before 8.4.0");
}
}
if (out.getTransportVersion().onOrAfter(TransportVersions.SEARCH_REQUEST_SHARD_REORDERING)) {
out.writeBoolean(allowShardReordering);
}
}

@Override
Expand Down Expand Up @@ -729,6 +742,15 @@ public void setForceSyntheticSource(boolean forceSyntheticSource) {
this.forceSyntheticSource = forceSyntheticSource;
}

public boolean allowShardReordering() {
return allowShardReordering;
}

public SearchRequest allowShardReordering(boolean allowShardReordering) {
this.allowShardReordering = allowShardReordering;
return this;
}

@Override
public SearchRequest rewrite(QueryRewriteContext ctx) throws IOException {
if (source == null) {
Expand Down Expand Up @@ -819,7 +841,8 @@ public boolean equals(Object o) {
&& absoluteStartMillis == that.absoluteStartMillis
&& ccsMinimizeRoundtrips == that.ccsMinimizeRoundtrips
&& Objects.equals(minCompatibleShardNode, that.minCompatibleShardNode)
&& forceSyntheticSource == that.forceSyntheticSource;
&& forceSyntheticSource == that.forceSyntheticSource
&& allowShardReordering == that.allowShardReordering;
}

@Override
Expand All @@ -841,7 +864,8 @@ public int hashCode() {
absoluteStartMillis,
ccsMinimizeRoundtrips,
minCompatibleShardNode,
forceSyntheticSource
forceSyntheticSource,
allowShardReordering
);
}

Expand Down Expand Up @@ -878,6 +902,8 @@ public String toString() {
+ absoluteStartMillis
+ ", ccsMinimizeRoundtrips="
+ ccsMinimizeRoundtrips
+ ", allowShardReordering="
+ allowShardReordering
+ ", source="
+ source
+ '}';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,103 @@ public void testCanMatchFilteringOnCoordinatorThatCanBeSkippedUsingTimestamp() t
doCanMatchFilteringOnCoordinatorThatCanBeSkipped(DataStream.TIMESTAMP_FIELD_NAME);
}

public void testRequestDisabledShardSorting() throws InterruptedException {
// this is similar to testSortShards, but should be disabled by the request level `allowShardReordering` param
final TransportSearchAction.SearchTimeProvider timeProvider = new TransportSearchAction.SearchTimeProvider(
0,
System.nanoTime(),
System::nanoTime
);

Map<String, Transport.Connection> lookup = new ConcurrentHashMap<>();
DiscoveryNode primaryNode = DiscoveryNodeUtils.create("node_1");
DiscoveryNode replicaNode = DiscoveryNodeUtils.create("node_2");
lookup.put("node1", new SearchAsyncActionTests.MockConnection(primaryNode));
lookup.put("node2", new SearchAsyncActionTests.MockConnection(replicaNode));

for (SortOrder order : SortOrder.values()) {
List<ShardId> shardIds = new ArrayList<>();
List<MinAndMax<?>> minAndMaxes = new ArrayList<>();
Set<ShardId> shardToSkip = new HashSet<>();

SearchTransportService searchTransportService = new SearchTransportService(null, null, null) {
@Override
public void sendCanMatch(
Transport.Connection connection,
CanMatchNodeRequest request,
SearchTask task,
ActionListener<CanMatchNodeResponse> listener
) {
final List<ResponseOrFailure> responses = new ArrayList<>();
for (CanMatchNodeRequest.Shard shard : request.getShardLevelRequests()) {
Long min = rarely() ? null : randomLong();
Long max = min == null ? null : randomLongBetween(min, Long.MAX_VALUE);
MinAndMax<?> minMax = min == null ? null : new MinAndMax<>(min, max);
boolean canMatch = frequently();
synchronized (shardIds) {
shardIds.add(shard.shardId());
minAndMaxes.add(minMax);
if (canMatch == false) {
shardToSkip.add(shard.shardId());
}
}

responses.add(new ResponseOrFailure(new CanMatchShardResponse(canMatch, minMax)));
}

new Thread(() -> listener.onResponse(new CanMatchNodeResponse(responses))).start();
}
};

AtomicReference<GroupShardsIterator<SearchShardIterator>> result = new AtomicReference<>();
CountDownLatch latch = new CountDownLatch(1);
GroupShardsIterator<SearchShardIterator> shardsIter = getShardsIter(
"logs",
new OriginalIndices(new String[] { "logs" }, SearchRequest.DEFAULT_INDICES_OPTIONS),
randomIntBetween(2, 20),
randomBoolean(),
primaryNode,
replicaNode
);
final SearchRequest searchRequest = new SearchRequest();
searchRequest.source(new SearchSourceBuilder().sort(SortBuilders.fieldSort("timestamp").order(order)));
searchRequest.allowPartialSearchResults(true);
searchRequest.allowShardReordering(false);

CanMatchPreFilterSearchPhase canMatchPhase = new CanMatchPreFilterSearchPhase(
logger,
searchTransportService,
(clusterAlias, node) -> lookup.get(node),
Collections.singletonMap("_na_", AliasFilter.EMPTY),
Collections.emptyMap(),
threadPool.executor(ThreadPool.Names.SEARCH_COORDINATION),
searchRequest,
shardsIter,
timeProvider,
null,
true,
EMPTY_CONTEXT_PROVIDER,
ActionTestUtils.assertNoFailureListener(iter -> {
result.set(iter);
latch.countDown();
})
);

canMatchPhase.start();
latch.await();
ShardId[] expected = IntStream.range(0, shardIds.size()).boxed().map(shardIds::get).toArray(ShardId[]::new);
if (shardToSkip.size() == expected.length) {
// we need at least one shard to produce the empty result for aggs
shardToSkip.remove(new ShardId("logs", "_na_", 0));
}
int shardId = 0;
for (SearchShardIterator i : result.get()) {
assertThat(i.shardId().id(), equalTo(shardId++));
assertEquals(shardToSkip.contains(i.shardId()), i.skip());
}
}
}

// test using event.ingested
public void testCanMatchFilteringOnCoordinatorThatCanBeSkippedUsingEventIngested() throws Exception {
doCanMatchFilteringOnCoordinatorThatCanBeSkipped(IndexMetadata.EVENT_INGESTED_FIELD_NAME);
Expand Down

0 comments on commit 933d4b5

Please sign in to comment.