From ed748a632cf0dc653322a7e36cd6a9fb095adf12 Mon Sep 17 00:00:00 2001 From: Ruirui Zhang Date: Thu, 10 Oct 2024 13:12:22 -0700 Subject: [PATCH] add workload management IT --- .../cancellation/WorkloadManagementIT.java | 124 ++++++++++++++++++ 1 file changed, 124 insertions(+) create mode 100644 server/src/test/java/org/opensearch/wlm/cancellation/WorkloadManagementIT.java diff --git a/server/src/test/java/org/opensearch/wlm/cancellation/WorkloadManagementIT.java b/server/src/test/java/org/opensearch/wlm/cancellation/WorkloadManagementIT.java new file mode 100644 index 0000000000000..ea147313f99dc --- /dev/null +++ b/server/src/test/java/org/opensearch/wlm/cancellation/WorkloadManagementIT.java @@ -0,0 +1,124 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.wlm.cancellation; + +import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; +import org.junit.After; +import org.junit.Before; +import org.opensearch.client.Request; +import org.opensearch.client.Response; +import org.opensearch.common.settings.Settings; +import org.opensearch.plugin.wlm.WorkloadManagementPlugin; +import org.opensearch.plugins.Plugin; +import org.opensearch.search.backpressure.settings.NodeDuressSettings; +import org.opensearch.search.backpressure.settings.SearchShardTaskSettings; +import org.opensearch.search.backpressure.settings.SearchTaskSettings; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.test.ParameterizedStaticSettingsOpenSearchIntegTestCase; +import org.opensearch.test.rest.OpenSearchRestTestCase; +import org.opensearch.wlm.WorkloadManagementSettings; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; + +import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; + +public class WorkloadManagementIT extends OpenSearchRestTestCase { + + public WorkloadManagementIT(Settings nodeSettings) { + super(nodeSettings); + } + @ParametersFactory + public static Collection parameters() { + return Arrays.asList( + new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), false).build() }, + new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build() } + ); + } + + @Override + protected Collection> nodePlugins() { + final List> plugins = new ArrayList<>(super.nodePlugins()); + plugins.add(WorkloadManagementPlugin.class); + return plugins; + } + + @Before + public final void setupNodeSettings() { + Settings request = Settings.builder() + .put(WorkloadManagementSettings.NODE_LEVEL_MEMORY_REJECTION_THRESHOLD.getKey(), 0.8) + .put(WorkloadManagementSettings.NODE_LEVEL_MEMORY_CANCELLATION_THRESHOLD.getKey(), 0.9) + .put(WorkloadManagementSettings.NODE_LEVEL_CPU_REJECTION_THRESHOLD.getKey(), 0.8) + .put(WorkloadManagementSettings.NODE_LEVEL_CPU_CANCELLATION_THRESHOLD.getKey(), 0.9) + .build(); + assertAcked(adminClient().cluster().prepareUpdateSettings().setPersistentSettings(request).get()); + } + + @After + public final void cleanupNodeSettings() { + assertAcked( + client().admin() + .cluster() + .prepareUpdateSettings() + .setPersistentSettings(Settings.builder().putNull("*")) + .setTransientSettings(Settings.builder().putNull("*")) + ); + } + + public void test() { + Response response = performOperation("PUT", "_wlm/query_group", getCreateJson("analytics", "enforced", 0.4, 0.2)); + assertEquals(response.getStatusLine().getStatusCode(), 200); + } + + static String getCreateJson(String name, String resiliencyMode, double cpu, double memory) { + return "{\n" + + " \"name\": \"" + + name + + "\",\n" + + " \"resiliency_mode\": \"" + + resiliencyMode + + "\",\n" + + " \"resource_limits\": {\n" + + " \"cpu\" : " + + cpu + + ",\n" + + " \"memory\" : " + + memory + + "\n" + + " }\n" + + "}"; + } + + static String getUpdateJson(String resiliencyMode, double cpu, double memory) { + return "{\n" + + " \"resiliency_mode\": \"" + + resiliencyMode + + "\",\n" + + " \"resource_limits\": {\n" + + " \"cpu\" : " + + cpu + + ",\n" + + " \"memory\" : " + + memory + + "\n" + + " }\n" + + "}"; + } + + Response performOperation(String method, String uriPath, String json) throws IOException { + Request request = new Request(method, uriPath); + if (json != null) { + request.setJsonEntity(json); + } + return client().performRequest(request); + } +}