Skip to content

Commit

Permalink
ESQL: Tests for loading many fields (elastic#100363) (elastic#100403)
Browse files Browse the repository at this point in the history
Turns out we weren't turning on the tracking `BigArrays`. That helps,
but for this test to fully pass we'd have to use the `BlockFactory` to
build the loaded fields.

Co-authored-by: Alexander Spies <[email protected]>
  • Loading branch information
nik9000 and alex-spies authored Oct 6, 2023
1 parent 13df225 commit 89e8c65
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.test.ListMatcher;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.json.JsonXContent;
import org.elasticsearch.xpack.esql.qa.rest.EsqlSpecTestCase;
import org.junit.After;
Expand Down Expand Up @@ -91,14 +94,7 @@ private Response sortByManyLongs(int count) throws IOException {
query.append(", i").append(i);
}
query.append("\\n| KEEP a, b | LIMIT 10000\"}");
Request request = new Request("POST", "/_query");
request.setJsonEntity(query.toString());
request.addParameter("error_trace", "");
request.setOptions(
RequestOptions.DEFAULT.toBuilder()
.setRequestConfig(RequestConfig.custom().setSocketTimeout(Math.toIntExact(TimeValue.timeValueMinutes(5).millis())).build())
);
return client().performRequest(request);
return query(query.toString(), null);
}

/**
Expand Down Expand Up @@ -139,14 +135,7 @@ private Response groupOnManyLongs(int count) throws IOException {
query.append(", i").append(i);
}
query.append("\\n| STATS MAX(a)\"}");
Request request = new Request("POST", "/_query");
request.setJsonEntity(query.toString());
request.addParameter("error_trace", "");
request.setOptions(
RequestOptions.DEFAULT.toBuilder()
.setRequestConfig(RequestConfig.custom().setSocketTimeout(Math.toIntExact(TimeValue.timeValueMinutes(5).millis())).build())
);
return client().performRequest(request);
return query(query.toString(), null);
}

private StringBuilder makeManyLongs(int count) {
Expand Down Expand Up @@ -182,10 +171,7 @@ private Response concat(int evals) throws IOException {
.append(")");
}
query.append("\"}");
Request request = new Request("POST", "/_query");
request.addParameter("error_trace", "");
request.setJsonEntity(query.toString().replace("\n", "\\n"));
return client().performRequest(request);
return query(query.toString(), null);
}

/**
Expand Down Expand Up @@ -240,10 +226,7 @@ private Response manyConcat(int strings) throws IOException {
query.append("str").append(s);
}
query.append("\"}");
Request request = new Request("POST", "/_query");
request.addParameter("error_trace", "");
request.setJsonEntity(query.toString().replace("\n", "\\n"));
return client().performRequest(request);
return query(query.toString(), null);
}

public void testManyEval() throws IOException {
Expand Down Expand Up @@ -280,12 +263,47 @@ private Response manyEval(int evalLines) throws IOException {
}
}
query.append("\n| LIMIT 10000\"}");
return query(query.toString(), null);
}

private Response query(String query, String filterPath) throws IOException {
Request request = new Request("POST", "/_query");
request.addParameter("error_trace", "");
if (filterPath != null) {
request.addParameter("filter_path", filterPath);
}
request.setJsonEntity(query.toString().replace("\n", "\\n"));
request.setOptions(
RequestOptions.DEFAULT.toBuilder()
.setRequestConfig(RequestConfig.custom().setSocketTimeout(Math.toIntExact(TimeValue.timeValueMinutes(5).millis())).build())
);
return client().performRequest(request);
}

public void testFetchManyBigFields() throws IOException {
initManyBigFieldsIndex(100);
fetchManyBigFields(100);
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/99826")
public void testFetchTooManyBigFields() throws IOException {
initManyBigFieldsIndex(500);
assertCircuitBreaks(() -> fetchManyBigFields(500));
}

/**
* Fetches documents containing 1000 fields which are {@code 1kb} each.
*/
private void fetchManyBigFields(int docs) throws IOException {
Response response = query("{\"query\": \"FROM manybigfields | SORT f000 | LIMIT " + docs + "\"}", "columns");
Map<?, ?> map = XContentHelper.convertToMap(JsonXContent.jsonXContent, EntityUtils.toString(response.getEntity()), false);
ListMatcher columns = matchesList();
for (int f = 0; f < 1000; f++) {
columns = columns.item(matchesMap().entry("name", "f" + String.format(Locale.ROOT, "%03d", f)).entry("type", "keyword"));
}
assertMap(map, matchesMap().entry("columns", columns));
}

private void initManyLongs() throws IOException {
logger.info("loading many documents with longs");
StringBuilder bulk = new StringBuilder();
Expand Down Expand Up @@ -314,13 +332,66 @@ private void initSingleDocIndex() throws IOException {
""");
}

private void initIndex(String name, String bulk) throws IOException {
private void initManyBigFieldsIndex(int docs) throws IOException {
logger.info("loading many documents with many big fields");
int docsPerBulk = 5;
int fields = 1000;
int fieldSize = Math.toIntExact(ByteSizeValue.ofKb(1).getBytes());

Request request = new Request("PUT", "/manybigfields");
XContentBuilder config = JsonXContent.contentBuilder().startObject();
config.startObject("settings").field("index.mapping.total_fields.limit", 10000).endObject();
config.startObject("mappings").startObject("properties");
for (int f = 0; f < fields; f++) {
config.startObject("f" + String.format(Locale.ROOT, "%03d", f)).field("type", "keyword").endObject();
}
config.endObject().endObject();
request.setJsonEntity(Strings.toString(config.endObject()));
Response response = client().performRequest(request);
assertThat(
EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8),
equalTo("{\"acknowledged\":true,\"shards_acknowledged\":true,\"index\":\"manybigfields\"}")
);

StringBuilder bulk = new StringBuilder();
for (int d = 0; d < docs; d++) {
bulk.append("{\"create\":{}}\n");
for (int f = 0; f < fields; f++) {
if (f == 0) {
bulk.append('{');
} else {
bulk.append(", ");
}
bulk.append('"').append("f").append(String.format(Locale.ROOT, "%03d", f)).append("\": \"");
bulk.append(Integer.toString(f % 10).repeat(fieldSize));
bulk.append('"');
}
bulk.append("}\n");
if (d % docsPerBulk == docsPerBulk - 1 && d != docs - 1) {
bulk("manybigfields", bulk.toString());
bulk.setLength(0);
}
}
initIndex("manybigfields", bulk.toString());
}

private void bulk(String name, String bulk) throws IOException {
Request request = new Request("POST", "/" + name + "/_bulk");
request.addParameter("refresh", "true");
request.addParameter("filter_path", "errors");
request.setJsonEntity(bulk.toString());
Response response = client().performRequest(request);
assertThat(EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8), equalTo("{\"errors\":false}"));
}

private void initIndex(String name, String bulk) throws IOException {
bulk(name, bulk);

Request request = new Request("POST", "/" + name + "/_refresh");
Response response = client().performRequest(request);
assertThat(
EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8),
equalTo("{\"_shards\":{\"total\":2,\"successful\":1,\"failed\":0}}")
);

request = new Request("POST", "/" + name + "/_forcemerge");
request.addParameter("max_num_segments", "1");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public Collection<Object> createComponents(
) {
CircuitBreaker circuitBreaker = indicesService.getBigArrays().breakerService().getBreaker("request");
Objects.requireNonNull(circuitBreaker, "request circuit breaker wasn't set");
BlockFactory blockFactory = new BlockFactory(circuitBreaker, indicesService.getBigArrays());
BlockFactory blockFactory = new BlockFactory(circuitBreaker, indicesService.getBigArrays().withCircuitBreaking());
return List.of(
new PlanExecutor(new IndexResolver(client, clusterService.getClusterName().value(), EsqlDataTypeRegistry.INSTANCE, Set::of)),
new ExchangeService(clusterService.getSettings(), threadPool, EsqlPlugin.ESQL_THREAD_POOL_NAME, blockFactory),
Expand Down

0 comments on commit 89e8c65

Please sign in to comment.