Skip to content

Commit

Permalink
Merge branch 'main' into fips_enforce_provider
Browse files Browse the repository at this point in the history
  • Loading branch information
jakelandis authored Dec 14, 2023
2 parents a7e1a42 + 90ae215 commit b10999b
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 6 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/103474.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 103474
summary: Fix now in millis for ESQL search contexts
area: ES|QL
type: bug
issues:
- 103455
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.action;

import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.query.RangeQueryBuilder;

import java.util.List;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList;
import static org.hamcrest.Matchers.hasSize;

public class TimeBasedIndicesIT extends AbstractEsqlIntegTestCase {

public void testFilter() {
long epoch = System.currentTimeMillis();
assertAcked(client().admin().indices().prepareCreate("test").setMapping("@timestamp", "type=date", "value", "type=long"));
BulkRequestBuilder bulk = client().prepareBulk("test").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
int oldDocs = between(10, 100);
for (int i = 0; i < oldDocs; i++) {
long timestamp = epoch - TimeValue.timeValueHours(between(1, 2)).millis();
bulk.add(new IndexRequest().source("@timestamp", timestamp, "value", -i));
}
int newDocs = between(10, 100);
for (int i = 0; i < newDocs; i++) {
long timestamp = epoch + TimeValue.timeValueHours(between(1, 2)).millis();
bulk.add(new IndexRequest().source("@timestamp", timestamp, "value", i));
}
bulk.get();
{
EsqlQueryRequest request = new EsqlQueryRequest();
request.query("FROM test | limit 1000");
request.filter(new RangeQueryBuilder("@timestamp").from(epoch - TimeValue.timeValueHours(3).millis()).to("now"));
try (var resp = run(request)) {
List<List<Object>> values = getValuesList(resp);
assertThat(values, hasSize(oldDocs));
}
}
{
EsqlQueryRequest request = new EsqlQueryRequest();
request.query("FROM test | limit 1000");
request.filter(new RangeQueryBuilder("@timestamp").from("now").to(epoch + TimeValue.timeValueHours(3).millis()));
try (var resp = run(request)) {
List<List<Object>> values = getValuesList(resp);
assertThat(values, hasSize(newDocs));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ void runCompute(CancellableTask task, ComputeContext context, PhysicalPlan plan,

private void acquireSearchContexts(
List<ShardId> shardIds,
EsqlConfiguration configuration,
Map<Index, AliasFilter> aliasFilters,
ActionListener<List<SearchContext>> listener
) {
Expand All @@ -351,11 +352,12 @@ private void acquireSearchContexts(
try {
for (IndexShard shard : targetShards) {
var aliasFilter = aliasFilters.getOrDefault(shard.shardId().getIndex(), AliasFilter.EMPTY);
ShardSearchRequest shardSearchLocalRequest = new ShardSearchRequest(shard.shardId(), 0, aliasFilter);
SearchContext context = searchService.createSearchContext(
shardSearchLocalRequest,
SearchService.NO_TIMEOUT
var shardRequest = new ShardSearchRequest(
shard.shardId(),
configuration.absoluteStartedTimeInMillis(),
aliasFilter
);
SearchContext context = searchService.createSearchContext(shardRequest, SearchService.NO_TIMEOUT);
searchContexts.add(context);
}
for (SearchContext searchContext : searchContexts) {
Expand Down Expand Up @@ -501,8 +503,9 @@ public void messageReceived(DataNodeRequest request, TransportChannel channel, T
final var exchangeSink = exchangeService.getSinkHandler(sessionId);
parentTask.addListener(() -> exchangeService.finishSinkHandler(sessionId, new TaskCancelledException("task cancelled")));
final ActionListener<DataNodeResponse> listener = new OwningChannelActionListener<>(channel);
acquireSearchContexts(request.shardIds(), request.aliasFilters(), ActionListener.wrap(searchContexts -> {
var computeContext = new ComputeContext(sessionId, searchContexts, request.configuration(), null, exchangeSink);
final EsqlConfiguration configuration = request.configuration();
acquireSearchContexts(request.shardIds(), configuration, request.aliasFilters(), ActionListener.wrap(searchContexts -> {
var computeContext = new ComputeContext(sessionId, searchContexts, configuration, null, exchangeSink);
runCompute(parentTask, computeContext, request.plan(), ActionListener.wrap(driverProfiles -> {
// don't return until all pages are fetched
exchangeSink.addCompletionListener(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,15 @@ public String query() {
return query;
}

/**
* Returns the current time in milliseconds from the time epoch for the execution of this request.
* It ensures consistency by using the same value on all nodes involved in the search request.
* Note: Currently, it returns {@link System#currentTimeMillis()}, but this value will be serialized between nodes.
*/
public long absoluteStartedTimeInMillis() {
return System.currentTimeMillis();
}

/**
* Enable profiling, sacrificing performance to return information about
* what operations are taking the most time.
Expand Down

0 comments on commit b10999b

Please sign in to comment.