Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8.x] ESQL: Enable async get to support formatting (#111104) #118257

Merged
merged 1 commit into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/111104.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 111104
summary: "ESQL: Enable async get to support formatting"
area: ES|QL
type: feature
issues:
- 110926
4 changes: 4 additions & 0 deletions docs/reference/esql/esql-async-query-get-api.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ parameter is `true`.
[[esql-async-query-get-api-query-params]]
==== {api-query-parms-title}

The API accepts the same parameters as the synchronous
<<esql-query-api-query-params,query API>>, along with the following
parameters:

`wait_for_completion_timeout`::
(Optional, <<time-units,time value>>)
Timeout duration to wait for the request to finish. Defaults to no timeout,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ private ActionListener<Response> wrapStoringListener(
ActionListener<Response> listener
) {
AtomicReference<ActionListener<Response>> exclusiveListener = new AtomicReference<>(listener);
// This is will performed in case of timeout
// This will be performed in case of timeout
Scheduler.ScheduledCancellable timeoutHandler = threadPool.schedule(() -> {
ActionListener<Response> acquiredListener = exclusiveListener.getAndSet(null);
if (acquiredListener != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,21 +350,21 @@ public void testTextMode() throws IOException {
int count = randomIntBetween(0, 100);
bulkLoadTestData(count);
var builder = requestObjectBuilder().query(fromIndex() + " | keep keyword, integer | sort integer asc | limit 100");
assertEquals(expectedTextBody("txt", count, null), runEsqlAsTextWithFormat(builder, "txt", null));
assertEquals(expectedTextBody("txt", count, null), runEsqlAsTextWithFormat(builder, "txt", null, mode));
}

public void testCSVMode() throws IOException {
int count = randomIntBetween(0, 100);
bulkLoadTestData(count);
var builder = requestObjectBuilder().query(fromIndex() + " | keep keyword, integer | sort integer asc | limit 100");
assertEquals(expectedTextBody("csv", count, '|'), runEsqlAsTextWithFormat(builder, "csv", '|'));
assertEquals(expectedTextBody("csv", count, '|'), runEsqlAsTextWithFormat(builder, "csv", '|', mode));
}

public void testTSVMode() throws IOException {
int count = randomIntBetween(0, 100);
bulkLoadTestData(count);
var builder = requestObjectBuilder().query(fromIndex() + " | keep keyword, integer | sort integer asc | limit 100");
assertEquals(expectedTextBody("tsv", count, null), runEsqlAsTextWithFormat(builder, "tsv", null));
assertEquals(expectedTextBody("tsv", count, null), runEsqlAsTextWithFormat(builder, "tsv", null, mode));
}

public void testCSVNoHeaderMode() throws IOException {
Expand Down Expand Up @@ -1004,53 +1004,35 @@ public static Map<String, Object> runEsqlSync(RequestObjectBuilder requestObject
}

public static Map<String, Object> runEsqlAsync(RequestObjectBuilder requestObject) throws IOException {
return runEsqlAsync(requestObject, new AssertWarnings.NoWarnings());
return runEsqlAsync(requestObject, randomBoolean(), new AssertWarnings.NoWarnings());
}

static Map<String, Object> runEsql(RequestObjectBuilder requestObject, AssertWarnings assertWarnings, Mode mode) throws IOException {
if (mode == ASYNC) {
return runEsqlAsync(requestObject, assertWarnings);
return runEsqlAsync(requestObject, randomBoolean(), assertWarnings);
} else {
return runEsqlSync(requestObject, assertWarnings);
}
}

public static Map<String, Object> runEsqlSync(RequestObjectBuilder requestObject, AssertWarnings assertWarnings) throws IOException {
requestObject.build();
Request request = prepareRequest(SYNC);
String mediaType = attachBody(requestObject, request);

RequestOptions.Builder options = request.getOptions().toBuilder();
options.setWarningsHandler(WarningsHandler.PERMISSIVE); // We assert the warnings ourselves
options.addHeader("Content-Type", mediaType);

if (randomBoolean()) {
options.addHeader("Accept", mediaType);
} else {
request.addParameter("format", requestObject.contentType().queryParameter());
}
request.setOptions(options);
Request request = prepareRequestWithOptions(requestObject, SYNC);

HttpEntity entity = performRequest(request, assertWarnings);
return entityToMap(entity, requestObject.contentType());
}

public static Map<String, Object> runEsqlAsync(RequestObjectBuilder requestObject, AssertWarnings assertWarnings) throws IOException {
addAsyncParameters(requestObject);
requestObject.build();
Request request = prepareRequest(ASYNC);
String mediaType = attachBody(requestObject, request);

RequestOptions.Builder options = request.getOptions().toBuilder();
options.setWarningsHandler(WarningsHandler.PERMISSIVE); // We assert the warnings ourselves
options.addHeader("Content-Type", mediaType);
return runEsqlAsync(requestObject, randomBoolean(), assertWarnings);
}

if (randomBoolean()) {
options.addHeader("Accept", mediaType);
} else {
request.addParameter("format", requestObject.contentType().queryParameter());
}
request.setOptions(options);
public static Map<String, Object> runEsqlAsync(
RequestObjectBuilder requestObject,
boolean keepOnCompletion,
AssertWarnings assertWarnings
) throws IOException {
addAsyncParameters(requestObject, keepOnCompletion);
Request request = prepareRequestWithOptions(requestObject, ASYNC);

if (shouldLog()) {
LOGGER.info("REQUEST={}", request);
Expand All @@ -1062,7 +1044,7 @@ public static Map<String, Object> runEsqlAsync(RequestObjectBuilder requestObjec
Object initialColumns = null;
Object initialValues = null;
var json = entityToMap(entity, requestObject.contentType());
checkKeepOnCompletion(requestObject, json);
checkKeepOnCompletion(requestObject, json, keepOnCompletion);
String id = (String) json.get("id");

var supportsAsyncHeaders = clusterHasCapability("POST", "/_query", List.of(), List.of("async_query_status_headers")).orElse(false);
Expand Down Expand Up @@ -1102,7 +1084,7 @@ public static Map<String, Object> runEsqlAsync(RequestObjectBuilder requestObjec

// issue a second request to "async get" the results
Request getRequest = prepareAsyncGetRequest(id);
getRequest.setOptions(options);
getRequest.setOptions(request.getOptions());
response = performRequest(getRequest);
entity = response.getEntity();
}
Expand All @@ -1120,6 +1102,66 @@ public static Map<String, Object> runEsqlAsync(RequestObjectBuilder requestObjec
return removeAsyncProperties(result);
}

public void testAsyncGetWithoutContentType() throws IOException {
int count = randomIntBetween(0, 100);
bulkLoadTestData(count);
var requestObject = requestObjectBuilder().query(fromIndex() + " | keep keyword, integer | sort integer asc | limit 100");

addAsyncParameters(requestObject, true);
Request request = prepareRequestWithOptions(requestObject, ASYNC);

if (shouldLog()) {
LOGGER.info("REQUEST={}", request);
}

Response response = performRequest(request);
HttpEntity entity = response.getEntity();

var json = entityToMap(entity, requestObject.contentType());
checkKeepOnCompletion(requestObject, json, true);
String id = (String) json.get("id");
// results won't be returned since keepOnCompletion is true
assertThat(id, is(not(emptyOrNullString())));

// issue an "async get" request with no Content-Type
Request getRequest = prepareAsyncGetRequest(id);
response = performRequest(getRequest);
entity = response.getEntity();
var result = entityToMap(entity, XContentType.JSON);

ListMatcher values = matchesList();
for (int i = 0; i < count; i++) {
values = values.item(matchesList().item("keyword" + i).item(i));
}
assertMap(
result,
matchesMap().entry(
"columns",
matchesList().item(matchesMap().entry("name", "keyword").entry("type", "keyword"))
.item(matchesMap().entry("name", "integer").entry("type", "integer"))
).entry("values", values).entry("took", greaterThanOrEqualTo(0)).entry("id", id).entry("is_running", false)
);

}

static Request prepareRequestWithOptions(RequestObjectBuilder requestObject, Mode mode) throws IOException {
requestObject.build();
Request request = prepareRequest(mode);
String mediaType = attachBody(requestObject, request);

RequestOptions.Builder options = request.getOptions().toBuilder();
options.setWarningsHandler(WarningsHandler.PERMISSIVE); // We assert the warnings ourselves
options.addHeader("Content-Type", mediaType);

if (randomBoolean()) {
options.addHeader("Accept", mediaType);
} else {
request.addParameter("format", requestObject.contentType().queryParameter());
}
request.setOptions(options);
return request;
}

// Removes async properties, otherwise consuming assertions would need to handle sync and async differences
static Map<String, Object> removeAsyncProperties(Map<String, Object> map) {
Map<String, Object> copy = new HashMap<>(map);
Expand All @@ -1140,17 +1182,20 @@ protected static Map<String, Object> entityToMap(HttpEntity entity, XContentType
}
}

static void addAsyncParameters(RequestObjectBuilder requestObject) throws IOException {
static void addAsyncParameters(RequestObjectBuilder requestObject, boolean keepOnCompletion) throws IOException {
// deliberately short in order to frequently trigger return without results
requestObject.waitForCompletion(TimeValue.timeValueNanos(randomIntBetween(1, 100)));
requestObject.keepOnCompletion(randomBoolean());
requestObject.keepOnCompletion(keepOnCompletion);
requestObject.keepAlive(TimeValue.timeValueDays(randomIntBetween(1, 10)));
}

// If keep_on_completion is set then an id must always be present, regardless of the value of any other property.
static void checkKeepOnCompletion(RequestObjectBuilder requestObject, Map<String, Object> json) {
static void checkKeepOnCompletion(RequestObjectBuilder requestObject, Map<String, Object> json, boolean keepOnCompletion) {
if (requestObject.keepOnCompletion()) {
assertTrue(keepOnCompletion);
assertThat((String) json.get("id"), not(emptyOrNullString()));
} else {
assertFalse(keepOnCompletion);
}
}

Expand All @@ -1168,14 +1213,19 @@ static void deleteNonExistent(Request request) throws IOException {
assertEquals(404, response.getStatusLine().getStatusCode());
}

static String runEsqlAsTextWithFormat(RequestObjectBuilder builder, String format, @Nullable Character delimiter) throws IOException {
Request request = prepareRequest(SYNC);
static String runEsqlAsTextWithFormat(RequestObjectBuilder builder, String format, @Nullable Character delimiter, Mode mode)
throws IOException {
Request request = prepareRequest(mode);
if (mode == ASYNC) {
addAsyncParameters(builder, randomBoolean());
}
String mediaType = attachBody(builder.build(), request);

RequestOptions.Builder options = request.getOptions().toBuilder();
options.addHeader("Content-Type", mediaType);

if (randomBoolean()) {
boolean addParam = randomBoolean();
if (addParam) {
request.addParameter("format", format);
} else {
switch (format) {
Expand All @@ -1189,8 +1239,75 @@ static String runEsqlAsTextWithFormat(RequestObjectBuilder builder, String forma
}
request.setOptions(options);

HttpEntity entity = performRequest(request, new AssertWarnings.NoWarnings());
return Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8));
if (shouldLog()) {
LOGGER.info("REQUEST={}", request);
}

Response response = performRequest(request);
HttpEntity entity = assertWarnings(response, new AssertWarnings.NoWarnings());

// get the content, it could be empty because the request might have not completed
String initialValue = Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8));
String id = response.getHeader("X-Elasticsearch-Async-Id");

if (mode == SYNC) {
assertThat(id, is(emptyOrNullString()));
return initialValue;
}

if (id == null) {
// no id returned from an async call, must have completed immediately and without keep_on_completion
assertThat(builder.keepOnCompletion(), either(nullValue()).or(is(false)));
assertNull(response.getHeader("is_running"));
// the content cant be empty
assertThat(initialValue, not(emptyOrNullString()));
return initialValue;
} else {
// async may not return results immediately, so may need an async get
assertThat(id, is(not(emptyOrNullString())));
String isRunning = response.getHeader("X-Elasticsearch-Async-Is-Running");
if ("?0".equals(isRunning)) {
// must have completed immediately so keep_on_completion must be true
assertThat(builder.keepOnCompletion(), is(true));
} else {
// did not return results immediately, so we will need an async get
// Also, different format modes return different results.
switch (format) {
case "txt" -> assertThat(initialValue, emptyOrNullString());
case "csv" -> {
assertEquals(initialValue, "\r\n");
initialValue = "";
}
case "tsv" -> {
assertEquals(initialValue, "\n");
initialValue = "";
}
}
}
// issue a second request to "async get" the results
Request getRequest = prepareAsyncGetRequest(id);
if (delimiter != null) {
getRequest.addParameter("delimiter", String.valueOf(delimiter));
}
// If the `format` parameter is not added, the GET request will return a response
// with the `Content-Type` type due to the lack of an `Accept` header.
if (addParam) {
getRequest.addParameter("format", format);
}
// if `addParam` is false, `options` will already have an `Accept` header
getRequest.setOptions(options);
response = performRequest(getRequest);
entity = assertWarnings(response, new AssertWarnings.NoWarnings());
}
String newValue = Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8));

// assert initial contents, if any, are the same as async get contents
if (initialValue != null && initialValue.isEmpty() == false) {
assertEquals(initialValue, newValue);
}

assertDeletable(id);
return newValue;
}

private static Request prepareRequest(Mode mode) {
Expand Down
Loading
Loading