diff --git a/common/src/main/java/org/opensearch/sdk/SdkClient.java b/common/src/main/java/org/opensearch/sdk/SdkClient.java index 8b6e38eb9d..a946344fdf 100644 --- a/common/src/main/java/org/opensearch/sdk/SdkClient.java +++ b/common/src/main/java/org/opensearch/sdk/SdkClient.java @@ -19,18 +19,14 @@ import static org.opensearch.sdk.SdkClientUtils.unwrapAndConvertToException; -public class SdkClient implements SettingsChangeListener { +public class SdkClient { private final SdkClientDelegate delegate; - private volatile Boolean isMultiTenancyEnabled; + private final Boolean isMultiTenancyEnabled; - public SdkClient(SdkClientDelegate delegate) { + public SdkClient(SdkClientDelegate delegate, Boolean multiTenancy) { this.delegate = delegate; - } - - @Override - public void onMultiTenancyEnabledChanged(boolean isEnabled) { - this.isMultiTenancyEnabled = isEnabled; + this.isMultiTenancyEnabled = multiTenancy; } /** diff --git a/plugin/build.gradle b/plugin/build.gradle index 2024299221..913007e1b1 100644 --- a/plugin/build.gradle +++ b/plugin/build.gradle @@ -91,7 +91,7 @@ dependencies { implementation("software.amazon.awssdk:utils:2.25.40") // AWS OpenSearch Service dependency implementation("software.amazon.awssdk:apache-client:2.25.40") - + configurations.all { resolutionStrategy.force 'org.apache.httpcomponents.core5:httpcore5:5.2.4' resolutionStrategy.force 'org.apache.httpcomponents.core5:httpcore5-h2:5.2.4' @@ -104,7 +104,8 @@ dependencies { publishing { publications { - pluginZip(MavenPublication) { publication -> + pluginZip(MavenPublication) { + publication -> pom { name = opensearchplugin.name description = opensearchplugin.description @@ -173,7 +174,9 @@ task integTest(type: RestIntegTestTask) { testClassesDirs = sourceSets.test.output.classesDirs classpath = sourceSets.test.runtimeClasspath } -tasks.named("check").configure { dependsOn(integTest) } +tasks.named("check").configure { + dependsOn(integTest) +} integTest { dependsOn "bundlePlugin" @@ -246,6 +249,10 @@ testClusters.integTest { environment "AWS_SECRET_ACCESS_KEY", System.getenv("AWS_SECRET_ACCESS_KEY"); environment "AWS_SESSION_TOKEN", System.getenv("AWS_SESSION_TOKEN"); + if (System.getProperty("tests.rest.tenantaware") != null) { + environment "plugins.ml_commons.multi_tenancy_enabled", "true" + } + testDistribution = "ARCHIVE" // Cluster shrink exception thrown if we try to set numberOfNodes to 1, so only apply if > 1 if (_numNodes > 1) numberOfNodes = _numNodes diff --git a/plugin/src/main/java/org/opensearch/ml/plugin/MachineLearningPlugin.java b/plugin/src/main/java/org/opensearch/ml/plugin/MachineLearningPlugin.java index 9f4e83b8c1..6f6f883d8d 100644 --- a/plugin/src/main/java/org/opensearch/ml/plugin/MachineLearningPlugin.java +++ b/plugin/src/main/java/org/opensearch/ml/plugin/MachineLearningPlugin.java @@ -613,8 +613,6 @@ public Collection createComponents( memoryFactoryMap, mlFeatureEnabledSetting.isMultiTenancyEnabled() ); - // Register the sdkClient as a listener - mlFeatureEnabledSetting.addListener(sdkClient); // Register the agentExecutor as a listener mlFeatureEnabledSetting.addListener(agentExecutor); diff --git a/plugin/src/main/java/org/opensearch/ml/sdkclient/DDBOpenSearchClient.java b/plugin/src/main/java/org/opensearch/ml/sdkclient/DDBOpenSearchClient.java index dd81a88179..c283fab450 100644 --- a/plugin/src/main/java/org/opensearch/ml/sdkclient/DDBOpenSearchClient.java +++ b/plugin/src/main/java/org/opensearch/ml/sdkclient/DDBOpenSearchClient.java @@ -357,7 +357,7 @@ public CompletionStage searchDataObjectAsync( private String getIndexName(String index) { // System index is not supported in remote index. Replacing '.' from index name. - return index.replaceAll("\\.", ""); + return (index.length() > 1 && index.charAt(0) == '.') ? index.substring(1) : index; } private XContentParser createParser(String json) throws IOException { diff --git a/plugin/src/main/java/org/opensearch/ml/sdkclient/RemoteClusterIndicesClient.java b/plugin/src/main/java/org/opensearch/ml/sdkclient/RemoteClusterIndicesClient.java index c0086f2585..438c682365 100644 --- a/plugin/src/main/java/org/opensearch/ml/sdkclient/RemoteClusterIndicesClient.java +++ b/plugin/src/main/java/org/opensearch/ml/sdkclient/RemoteClusterIndicesClient.java @@ -231,7 +231,7 @@ public CompletionStage searchDataObjectAsync( ) { return CompletableFuture.supplyAsync(() -> AccessController.doPrivileged((PrivilegedAction) () -> { try { - log.info("Searching {}", Arrays.toString(request.indices()), null); + log.info("Searching {}", Arrays.toString(request.indices())); // work around https://github.com/opensearch-project/opensearch-java/issues/1150 String json = SdkClientUtils .lowerCaseEnumValues( @@ -254,6 +254,8 @@ public CompletionStage searchDataObjectAsync( .filter(tenantIdFilterQuery.toQuery()) .build(); searchRequest = searchRequest.toBuilder().index(Arrays.asList(request.indices())).query(boolQuery.toQuery()).build(); + } else { + searchRequest = searchRequest.toBuilder().index(Arrays.asList(request.indices())).build(); } SearchResponse searchResponse = openSearchClient.search(searchRequest, MAP_DOCTYPE); log.info("Search returned {} hits", searchResponse.hits().total().value()); diff --git a/plugin/src/main/java/org/opensearch/ml/sdkclient/SdkClientFactory.java b/plugin/src/main/java/org/opensearch/ml/sdkclient/SdkClientFactory.java index 43f31515c5..cfaeadae12 100644 --- a/plugin/src/main/java/org/opensearch/ml/sdkclient/SdkClientFactory.java +++ b/plugin/src/main/java/org/opensearch/ml/sdkclient/SdkClientFactory.java @@ -8,6 +8,7 @@ */ package org.opensearch.ml.sdkclient; +import static org.opensearch.ml.settings.MLCommonsSettings.ML_COMMONS_MULTI_TENANCY_ENABLED; import static org.opensearch.sdk.SdkClientSettings.AWS_DYNAMO_DB; import static org.opensearch.sdk.SdkClientSettings.AWS_OPENSEARCH_SERVICE; import static org.opensearch.sdk.SdkClientSettings.REMOTE_METADATA_ENDPOINT; @@ -84,6 +85,7 @@ public static SdkClient createSdkClient(Client client, NamedXContentRegistry xCo String remoteMetadataEndpoint = REMOTE_METADATA_ENDPOINT.get(settings); String region = REMOTE_METADATA_REGION.get(settings); String serviceName = REMOTE_METADATA_SERVICE_NAME.get(settings); + Boolean multiTenancy = ML_COMMONS_MULTI_TENANCY_ENABLED.get(settings); switch (remoteMetadataType) { case REMOTE_OPENSEARCH: @@ -91,12 +93,13 @@ public static SdkClient createSdkClient(Client client, NamedXContentRegistry xCo throw new OpenSearchException("Remote Opensearch client requires a metadata endpoint."); } log.info("Using remote opensearch cluster as metadata store"); - return new SdkClient(new RemoteClusterIndicesClient(createOpenSearchClient(remoteMetadataEndpoint))); + return new SdkClient(new RemoteClusterIndicesClient(createOpenSearchClient(remoteMetadataEndpoint)), multiTenancy); case AWS_OPENSEARCH_SERVICE: validateAwsParams(remoteMetadataType, remoteMetadataEndpoint, region, serviceName); log.info("Using remote AWS Opensearch Service cluster as metadata store"); return new SdkClient( - new RemoteClusterIndicesClient(createAwsOpenSearchServiceClient(remoteMetadataEndpoint, region, serviceName)) + new RemoteClusterIndicesClient(createAwsOpenSearchServiceClient(remoteMetadataEndpoint, region, serviceName)), + multiTenancy ); case AWS_DYNAMO_DB: validateAwsParams(remoteMetadataType, remoteMetadataEndpoint, region, serviceName); @@ -105,11 +108,12 @@ public static SdkClient createSdkClient(Client client, NamedXContentRegistry xCo new DDBOpenSearchClient( createDynamoDbClient(region), new RemoteClusterIndicesClient(createAwsOpenSearchServiceClient(remoteMetadataEndpoint, region, serviceName)) - ) + ), + multiTenancy ); default: log.info("Using local opensearch cluster as metadata store"); - return new SdkClient(new LocalClusterIndicesClient(client, xContentRegistry)); + return new SdkClient(new LocalClusterIndicesClient(client, xContentRegistry), multiTenancy); } } @@ -123,8 +127,8 @@ private static void validateAwsParams(String clientType, String remoteMetadataEn } // Package private for testing - static SdkClient wrapSdkClientDelegate(SdkClientDelegate delegate) { - return new SdkClient(delegate); + static SdkClient wrapSdkClientDelegate(SdkClientDelegate delegate, Boolean multiTenancy) { + return new SdkClient(delegate, multiTenancy); } private static DynamoDbClient createDynamoDbClient(String region) { diff --git a/plugin/src/main/java/org/opensearch/ml/settings/MLCommonsSettings.java b/plugin/src/main/java/org/opensearch/ml/settings/MLCommonsSettings.java index f588fded99..37088b514b 100644 --- a/plugin/src/main/java/org/opensearch/ml/settings/MLCommonsSettings.java +++ b/plugin/src/main/java/org/opensearch/ml/settings/MLCommonsSettings.java @@ -185,6 +185,18 @@ private MLCommonsSettings() {} public static final Setting ML_COMMONS_AGENT_FRAMEWORK_ENABLED = Setting .boolSetting("plugins.ml_commons.agent_framework_enabled", true, Setting.Property.NodeScope, Setting.Property.Dynamic); + // Whether multi-tenancy is enabled in ML Commons. + // This is a static setting which must be set before starting OpenSearch by (in priority order): + // 1. As a command-line argument using the -E flag (overrides other options): + // ./bin/opensearch -Eplugins.ml_commons.multi_tenancy_enabled=true + // 2. As a system property using OPENSEARCH_JAVA_OPTS (overrides opensearch.yml): + // export OPENSEARCH_JAVA_OPTS="-Dplugins.ml_commons.multi_tenancy_enabled=true" + // ./bin/opensearch + // Or inline when starting OpenSearch: + // OPENSEARCH_JAVA_OPTS="-Dplugins.ml_commons.multi_tenancy_enabled=true" ./bin/opensearch + // 3. In the opensearch.yml configuration file: + // plugins.ml_commons.multi_tenancy_enabled: true + // After setting it, a full cluster restart is required for the changes to take effect. public static final Setting ML_COMMONS_MULTI_TENANCY_ENABLED = Setting - .boolSetting("plugins.ml_commons.multi_tenancy_enabled", false, Setting.Property.NodeScope, Setting.Property.Dynamic); + .boolSetting("plugins.ml_commons.multi_tenancy_enabled", false, Setting.Property.NodeScope); } diff --git a/plugin/src/main/java/org/opensearch/ml/settings/MLFeatureEnabledSetting.java b/plugin/src/main/java/org/opensearch/ml/settings/MLFeatureEnabledSetting.java index c23696f499..8a96db3adc 100644 --- a/plugin/src/main/java/org/opensearch/ml/settings/MLFeatureEnabledSetting.java +++ b/plugin/src/main/java/org/opensearch/ml/settings/MLFeatureEnabledSetting.java @@ -46,10 +46,6 @@ public MLFeatureEnabledSetting(ClusterService clusterService, Settings settings) .getClusterSettings() .addSettingsUpdateConsumer(ML_COMMONS_AGENT_FRAMEWORK_ENABLED, it -> isAgentFrameworkEnabled = it); clusterService.getClusterSettings().addSettingsUpdateConsumer(ML_COMMONS_LOCAL_MODEL_ENABLED, it -> isLocalModelEnabled = it); - clusterService.getClusterSettings().addSettingsUpdateConsumer(ML_COMMONS_MULTI_TENANCY_ENABLED, it -> { - isMultiTenancyEnabled = it; - notifyMultiTenancyListeners(it); - }); } /** diff --git a/plugin/src/test/java/org/opensearch/ml/rest/MLCommonsRestTestCase.java b/plugin/src/test/java/org/opensearch/ml/rest/MLCommonsRestTestCase.java index d680a0e7d4..5ea1770908 100644 --- a/plugin/src/test/java/org/opensearch/ml/rest/MLCommonsRestTestCase.java +++ b/plugin/src/test/java/org/opensearch/ml/rest/MLCommonsRestTestCase.java @@ -176,16 +176,6 @@ public void setupSettings() throws IOException { response = TestHelper .makeRequest(client(), "PUT", "_cluster/settings", ImmutableMap.of(), TestHelper.toHttpEntity(jsonEntity), null); assertEquals(200, response.getStatusLine().getStatusCode()); - - String multiTenancyEntity = "{\n" - + " \"persistent\" : {\n" - + " \"plugins.ml_commons.multi_tenancy_enabled\" : false \n" - + " }\n" - + "}"; - - response = TestHelper - .makeRequest(client(), "PUT", "_cluster/settings", ImmutableMap.of(), TestHelper.toHttpEntity(multiTenancyEntity), null); - assertEquals(200, response.getStatusLine().getStatusCode()); } @Override diff --git a/plugin/src/test/java/org/opensearch/ml/sdkclient/DDBOpenSearchClientTests.java b/plugin/src/test/java/org/opensearch/ml/sdkclient/DDBOpenSearchClientTests.java index 0b9e97f25f..22cf432597 100644 --- a/plugin/src/test/java/org/opensearch/ml/sdkclient/DDBOpenSearchClientTests.java +++ b/plugin/src/test/java/org/opensearch/ml/sdkclient/DDBOpenSearchClientTests.java @@ -129,8 +129,7 @@ public static void cleanup() { public void setup() { MockitoAnnotations.openMocks(this); - sdkClient = SdkClientFactory.wrapSdkClientDelegate(new DDBOpenSearchClient(dynamoDbClient, remoteClusterIndicesClient)); - sdkClient.onMultiTenancyEnabledChanged(true); + sdkClient = SdkClientFactory.wrapSdkClientDelegate(new DDBOpenSearchClient(dynamoDbClient, remoteClusterIndicesClient), true); testDataObject = new TestDataObject("foo"); } diff --git a/plugin/src/test/java/org/opensearch/ml/sdkclient/RemoteClusterIndicesClientTests.java b/plugin/src/test/java/org/opensearch/ml/sdkclient/RemoteClusterIndicesClientTests.java index f5c1faf82c..d73a741f7e 100644 --- a/plugin/src/test/java/org/opensearch/ml/sdkclient/RemoteClusterIndicesClientTests.java +++ b/plugin/src/test/java/org/opensearch/ml/sdkclient/RemoteClusterIndicesClientTests.java @@ -122,8 +122,7 @@ public void setup() { .setSerializationInclusion(JsonInclude.Include.NON_NULL) ) ); - sdkClient = SdkClientFactory.wrapSdkClientDelegate(new RemoteClusterIndicesClient(mockedOpenSearchClient)); - sdkClient.onMultiTenancyEnabledChanged(true); + sdkClient = SdkClientFactory.wrapSdkClientDelegate(new RemoteClusterIndicesClient(mockedOpenSearchClient), true); testDataObject = new TestDataObject("foo"); } @@ -592,8 +591,6 @@ public void testSearchDataObject_Exception() throws IOException { public void testSearchDataObject_NullTenant() throws IOException { // Tests exception if multitenancy enabled - sdkClient.onMultiTenancyEnabledChanged(true); - SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); SearchDataObjectRequest searchRequest = SearchDataObjectRequest .builder() @@ -612,4 +609,27 @@ public void testSearchDataObject_NullTenant() throws IOException { assertEquals(OpenSearchStatusException.class, cause.getClass()); assertEquals("Tenant ID is required when multitenancy is enabled.", cause.getMessage()); } + + public void testSearchDataObject_NullTenantNoMultitenancy() throws IOException { + // Tests no status exception if multitenancy not enabled + SdkClient sdkClientNoTenant = SdkClientFactory.wrapSdkClientDelegate(new RemoteClusterIndicesClient(mockedOpenSearchClient), false); + + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + SearchDataObjectRequest searchRequest = SearchDataObjectRequest + .builder() + .indices(TEST_INDEX) + // null tenant Id + .searchSourceBuilder(searchSourceBuilder) + .build(); + + when(mockedOpenSearchClient.search(any(SearchRequest.class), any())).thenThrow(new UnsupportedOperationException("test")); + CompletableFuture future = sdkClientNoTenant + .searchDataObjectAsync(searchRequest, testThreadPool.executor(GENERAL_THREAD_POOL)) + .toCompletableFuture(); + + CompletionException ce = assertThrows(CompletionException.class, () -> future.join()); + Throwable cause = ce.getCause(); + assertEquals(UnsupportedOperationException.class, cause.getClass()); + assertEquals("test", cause.getMessage()); + } }