From 89e8c6579b0044b8a9809a063001542589766eb0 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Fri, 6 Oct 2023 09:47:39 -0400 Subject: [PATCH] ESQL: Tests for loading many fields (#100363) (#100403) Turns out we weren't turning on the tracking `BigArrays`. That helps, but for this test to fully pass we'd have to use the `BlockFactory` to build the loaded fields. Co-authored-by: Alexander Spies --- .../esql/qa/single_node/HeapAttackIT.java | 123 ++++++++++++++---- .../xpack/esql/plugin/EsqlPlugin.java | 2 +- 2 files changed, 98 insertions(+), 27 deletions(-) diff --git a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/HeapAttackIT.java b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/HeapAttackIT.java index 84c654f8946fb..e6dc165a75509 100644 --- a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/HeapAttackIT.java +++ b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/HeapAttackIT.java @@ -13,10 +13,13 @@ import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.Response; import org.elasticsearch.client.ResponseException; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.core.TimeValue; import org.elasticsearch.test.ListMatcher; import org.elasticsearch.test.rest.ESRestTestCase; +import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.json.JsonXContent; import org.elasticsearch.xpack.esql.qa.rest.EsqlSpecTestCase; import org.junit.After; @@ -91,14 +94,7 @@ private Response sortByManyLongs(int count) throws IOException { query.append(", i").append(i); } query.append("\\n| KEEP a, b | LIMIT 10000\"}"); - Request request = new Request("POST", "/_query"); - request.setJsonEntity(query.toString()); - request.addParameter("error_trace", ""); - request.setOptions( - RequestOptions.DEFAULT.toBuilder() - .setRequestConfig(RequestConfig.custom().setSocketTimeout(Math.toIntExact(TimeValue.timeValueMinutes(5).millis())).build()) - ); - return client().performRequest(request); + return query(query.toString(), null); } /** @@ -139,14 +135,7 @@ private Response groupOnManyLongs(int count) throws IOException { query.append(", i").append(i); } query.append("\\n| STATS MAX(a)\"}"); - Request request = new Request("POST", "/_query"); - request.setJsonEntity(query.toString()); - request.addParameter("error_trace", ""); - request.setOptions( - RequestOptions.DEFAULT.toBuilder() - .setRequestConfig(RequestConfig.custom().setSocketTimeout(Math.toIntExact(TimeValue.timeValueMinutes(5).millis())).build()) - ); - return client().performRequest(request); + return query(query.toString(), null); } private StringBuilder makeManyLongs(int count) { @@ -182,10 +171,7 @@ private Response concat(int evals) throws IOException { .append(")"); } query.append("\"}"); - Request request = new Request("POST", "/_query"); - request.addParameter("error_trace", ""); - request.setJsonEntity(query.toString().replace("\n", "\\n")); - return client().performRequest(request); + return query(query.toString(), null); } /** @@ -240,10 +226,7 @@ private Response manyConcat(int strings) throws IOException { query.append("str").append(s); } query.append("\"}"); - Request request = new Request("POST", "/_query"); - request.addParameter("error_trace", ""); - request.setJsonEntity(query.toString().replace("\n", "\\n")); - return client().performRequest(request); + return query(query.toString(), null); } public void testManyEval() throws IOException { @@ -280,12 +263,47 @@ private Response manyEval(int evalLines) throws IOException { } } query.append("\n| LIMIT 10000\"}"); + return query(query.toString(), null); + } + + private Response query(String query, String filterPath) throws IOException { Request request = new Request("POST", "/_query"); request.addParameter("error_trace", ""); + if (filterPath != null) { + request.addParameter("filter_path", filterPath); + } request.setJsonEntity(query.toString().replace("\n", "\\n")); + request.setOptions( + RequestOptions.DEFAULT.toBuilder() + .setRequestConfig(RequestConfig.custom().setSocketTimeout(Math.toIntExact(TimeValue.timeValueMinutes(5).millis())).build()) + ); return client().performRequest(request); } + public void testFetchManyBigFields() throws IOException { + initManyBigFieldsIndex(100); + fetchManyBigFields(100); + } + + @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/99826") + public void testFetchTooManyBigFields() throws IOException { + initManyBigFieldsIndex(500); + assertCircuitBreaks(() -> fetchManyBigFields(500)); + } + + /** + * Fetches documents containing 1000 fields which are {@code 1kb} each. + */ + private void fetchManyBigFields(int docs) throws IOException { + Response response = query("{\"query\": \"FROM manybigfields | SORT f000 | LIMIT " + docs + "\"}", "columns"); + Map map = XContentHelper.convertToMap(JsonXContent.jsonXContent, EntityUtils.toString(response.getEntity()), false); + ListMatcher columns = matchesList(); + for (int f = 0; f < 1000; f++) { + columns = columns.item(matchesMap().entry("name", "f" + String.format(Locale.ROOT, "%03d", f)).entry("type", "keyword")); + } + assertMap(map, matchesMap().entry("columns", columns)); + } + private void initManyLongs() throws IOException { logger.info("loading many documents with longs"); StringBuilder bulk = new StringBuilder(); @@ -314,13 +332,66 @@ private void initSingleDocIndex() throws IOException { """); } - private void initIndex(String name, String bulk) throws IOException { + private void initManyBigFieldsIndex(int docs) throws IOException { + logger.info("loading many documents with many big fields"); + int docsPerBulk = 5; + int fields = 1000; + int fieldSize = Math.toIntExact(ByteSizeValue.ofKb(1).getBytes()); + + Request request = new Request("PUT", "/manybigfields"); + XContentBuilder config = JsonXContent.contentBuilder().startObject(); + config.startObject("settings").field("index.mapping.total_fields.limit", 10000).endObject(); + config.startObject("mappings").startObject("properties"); + for (int f = 0; f < fields; f++) { + config.startObject("f" + String.format(Locale.ROOT, "%03d", f)).field("type", "keyword").endObject(); + } + config.endObject().endObject(); + request.setJsonEntity(Strings.toString(config.endObject())); + Response response = client().performRequest(request); + assertThat( + EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8), + equalTo("{\"acknowledged\":true,\"shards_acknowledged\":true,\"index\":\"manybigfields\"}") + ); + + StringBuilder bulk = new StringBuilder(); + for (int d = 0; d < docs; d++) { + bulk.append("{\"create\":{}}\n"); + for (int f = 0; f < fields; f++) { + if (f == 0) { + bulk.append('{'); + } else { + bulk.append(", "); + } + bulk.append('"').append("f").append(String.format(Locale.ROOT, "%03d", f)).append("\": \""); + bulk.append(Integer.toString(f % 10).repeat(fieldSize)); + bulk.append('"'); + } + bulk.append("}\n"); + if (d % docsPerBulk == docsPerBulk - 1 && d != docs - 1) { + bulk("manybigfields", bulk.toString()); + bulk.setLength(0); + } + } + initIndex("manybigfields", bulk.toString()); + } + + private void bulk(String name, String bulk) throws IOException { Request request = new Request("POST", "/" + name + "/_bulk"); - request.addParameter("refresh", "true"); request.addParameter("filter_path", "errors"); request.setJsonEntity(bulk.toString()); Response response = client().performRequest(request); assertThat(EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8), equalTo("{\"errors\":false}")); + } + + private void initIndex(String name, String bulk) throws IOException { + bulk(name, bulk); + + Request request = new Request("POST", "/" + name + "/_refresh"); + Response response = client().performRequest(request); + assertThat( + EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8), + equalTo("{\"_shards\":{\"total\":2,\"successful\":1,\"failed\":0}}") + ); request = new Request("POST", "/" + name + "/_forcemerge"); request.addParameter("max_num_segments", "1"); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java index 2608d4525b153..25802894e2832 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java @@ -105,7 +105,7 @@ public Collection createComponents( ) { CircuitBreaker circuitBreaker = indicesService.getBigArrays().breakerService().getBreaker("request"); Objects.requireNonNull(circuitBreaker, "request circuit breaker wasn't set"); - BlockFactory blockFactory = new BlockFactory(circuitBreaker, indicesService.getBigArrays()); + BlockFactory blockFactory = new BlockFactory(circuitBreaker, indicesService.getBigArrays().withCircuitBreaking()); return List.of( new PlanExecutor(new IndexResolver(client, clusterService.getClusterName().value(), EsqlDataTypeRegistry.INSTANCE, Set::of)), new ExchangeService(clusterService.getSettings(), threadPool, EsqlPlugin.ESQL_THREAD_POOL_NAME, blockFactory),