From c0f07da37a303be230b73146cd6462191da4f07c Mon Sep 17 00:00:00 2001 From: Owais Kazi Date: Wed, 21 Jun 2023 16:52:31 -0700 Subject: [PATCH] [Extensions] REST API to initialize an extension and dynamically load it (#8029) * Implemented REST API for initializing extension Signed-off-by: Owais Kazi * Cleanup extensions.yml design Signed-off-by: Owais Kazi * Added tests for RestInitializeExtensionAction Signed-off-by: Owais Kazi * Pulled extensions REST request in extensions directory Signed-off-by: Owais Kazi * Removed forbidden APIs from rest action and modified tests Signed-off-by: Owais Kazi * Added entry in changelog Signed-off-by: Owais Kazi * Added test for parse Signed-off-by: Owais Kazi * Addressed PR comments Signed-off-by: Owais Kazi * Addressed PR comments Signed-off-by: Owais Kazi * Spotless Fixed Signed-off-by: Owais Kazi * Handled exceptions Signed-off-by: Owais Kazi * Handled test failure Signed-off-by: Owais Kazi --------- Signed-off-by: Owais Kazi Signed-off-by: Shivansh Arora --- CHANGELOG.md | 1 + .../org/opensearch/action/ActionModule.java | 14 +- .../org/opensearch/bootstrap/Security.java | 1 - .../java/org/opensearch/env/Environment.java | 4 - .../extensions/ExtensionDependency.java | 39 +++ .../extensions/ExtensionsManager.java | 234 ++++---------- .../extensions/NoopExtensionsManager.java | 8 +- .../rest/RestActionsRequestHandler.java | 1 - .../rest/RestInitializeExtensionAction.java | 141 +++++++++ .../rest}/RestSendToExtensionAction.java | 5 +- .../main/java/org/opensearch/node/Node.java | 6 +- .../rest/extensions/package-info.java | 10 - .../opensearch/action/ActionModuleTests.java | 9 +- .../action/DynamicActionRegistryTests.java | 2 +- .../extensions/ExtensionsManagerTests.java | 294 +++++++++--------- .../RestInitializeExtensionActionTests.java | 124 ++++++++ .../rest/RestSendToExtensionActionTests.java | 5 +- 17 files changed, 535 insertions(+), 363 deletions(-) create mode 100644 server/src/main/java/org/opensearch/extensions/rest/RestInitializeExtensionAction.java rename server/src/main/java/org/opensearch/{rest/extensions => extensions/rest}/RestSendToExtensionAction.java (98%) delete mode 100644 server/src/main/java/org/opensearch/rest/extensions/package-info.java create mode 100644 server/src/test/java/org/opensearch/extensions/rest/RestInitializeExtensionActionTests.java diff --git a/CHANGELOG.md b/CHANGELOG.md index cb67e56c7845f..75287db6b4f07 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -82,6 +82,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add descending order search optimization through reverse segment read. ([#7967](https://github.com/opensearch-project/OpenSearch/pull/7967)) - Update components of segrep backpressure to support remote store. ([#8020](https://github.com/opensearch-project/OpenSearch/pull/8020)) - Make remote cluster connection setup in async ([#8038](https://github.com/opensearch-project/OpenSearch/pull/8038)) +- Add API to initialize extensions ([#8029]()https://github.com/opensearch-project/OpenSearch/pull/8029) ### Dependencies - Bump `com.azure:azure-storage-common` from 12.21.0 to 12.21.1 (#7566, #7814) diff --git a/server/src/main/java/org/opensearch/action/ActionModule.java b/server/src/main/java/org/opensearch/action/ActionModule.java index 168fbae84fdf4..902ae7cc54e3f 100644 --- a/server/src/main/java/org/opensearch/action/ActionModule.java +++ b/server/src/main/java/org/opensearch/action/ActionModule.java @@ -294,8 +294,10 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.settings.SettingsFilter; import org.opensearch.common.util.FeatureFlags; +import org.opensearch.extensions.ExtensionsManager; import org.opensearch.extensions.action.ExtensionProxyAction; import org.opensearch.extensions.action.ExtensionProxyTransportAction; +import org.opensearch.extensions.rest.RestInitializeExtensionAction; import org.opensearch.index.seqno.RetentionLeaseActions; import org.opensearch.identity.IdentityService; import org.opensearch.indices.SystemIndices; @@ -453,7 +455,7 @@ import org.opensearch.rest.action.search.RestPutSearchPipelineAction; import org.opensearch.rest.action.search.RestSearchAction; import org.opensearch.rest.action.search.RestSearchScrollAction; -import org.opensearch.rest.extensions.RestSendToExtensionAction; +import org.opensearch.extensions.rest.RestSendToExtensionAction; import org.opensearch.tasks.Task; import org.opensearch.threadpool.ThreadPool; import org.opensearch.usage.UsageService; @@ -508,6 +510,7 @@ public class ActionModule extends AbstractModule { private final RequestValidators mappingRequestValidators; private final RequestValidators indicesAliasesRequestRequestValidators; private final ThreadPool threadPool; + private final ExtensionsManager extensionsManager; public ActionModule( Settings settings, @@ -521,7 +524,8 @@ public ActionModule( CircuitBreakerService circuitBreakerService, UsageService usageService, SystemIndices systemIndices, - IdentityService identityService + IdentityService identityService, + ExtensionsManager extensionsManager ) { this.settings = settings; this.indexNameExpressionResolver = indexNameExpressionResolver; @@ -530,6 +534,7 @@ public ActionModule( this.settingsFilter = settingsFilter; this.actionPlugins = actionPlugins; this.threadPool = threadPool; + this.extensionsManager = extensionsManager; actions = setupActions(actionPlugins); actionFilters = setupActionFilters(actionPlugins); dynamicActionRegistry = new DynamicActionRegistry(); @@ -947,6 +952,11 @@ public void initRestHandlers(Supplier nodesInCluster) { registerHandler.accept(new RestDeleteSearchPipelineAction()); } + // Extensions API + if (FeatureFlags.isEnabled(FeatureFlags.EXTENSIONS)) { + registerHandler.accept(new RestInitializeExtensionAction(extensionsManager)); + } + for (ActionPlugin plugin : actionPlugins) { for (RestHandler handler : plugin.getRestHandlers( settings, diff --git a/server/src/main/java/org/opensearch/bootstrap/Security.java b/server/src/main/java/org/opensearch/bootstrap/Security.java index 37c7fd5c0a96c..749c146de4f16 100644 --- a/server/src/main/java/org/opensearch/bootstrap/Security.java +++ b/server/src/main/java/org/opensearch/bootstrap/Security.java @@ -316,7 +316,6 @@ static void addFilePermissions(Permissions policy, Environment environment) thro addDirectoryPath(policy, Environment.PATH_HOME_SETTING.getKey(), environment.libDir(), "read,readlink", false); addDirectoryPath(policy, Environment.PATH_HOME_SETTING.getKey(), environment.modulesDir(), "read,readlink", false); addDirectoryPath(policy, Environment.PATH_HOME_SETTING.getKey(), environment.pluginsDir(), "read,readlink", false); - addDirectoryPath(policy, Environment.PATH_HOME_SETTING.getKey(), environment.extensionDir(), "read,readlink", false); addDirectoryPath(policy, "path.conf'", environment.configDir(), "read,readlink", false); // read-write dirs addDirectoryPath(policy, "java.io.tmpdir", environment.tmpDir(), "read,readlink,write,delete", false); diff --git a/server/src/main/java/org/opensearch/env/Environment.java b/server/src/main/java/org/opensearch/env/Environment.java index 938bca58c7081..a1e467ad1ba48 100644 --- a/server/src/main/java/org/opensearch/env/Environment.java +++ b/server/src/main/java/org/opensearch/env/Environment.java @@ -311,10 +311,6 @@ public Path pluginsDir() { return pluginsDir; } - public Path extensionDir() { - return extensionsDir; - } - public Path binDir() { return binDir; } diff --git a/server/src/main/java/org/opensearch/extensions/ExtensionDependency.java b/server/src/main/java/org/opensearch/extensions/ExtensionDependency.java index d2106cf8d399c..1423a30bbe307 100644 --- a/server/src/main/java/org/opensearch/extensions/ExtensionDependency.java +++ b/server/src/main/java/org/opensearch/extensions/ExtensionDependency.java @@ -16,6 +16,10 @@ import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; import org.opensearch.common.io.stream.Writeable; +import org.opensearch.core.common.Strings; +import org.opensearch.core.xcontent.XContentParser; + +import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken; /** * This class handles the dependent extensions information @@ -25,6 +29,8 @@ public class ExtensionDependency implements Writeable { private String uniqueId; private Version version; + private static final String UNIQUE_ID = "uniqueId"; + private static final String VERSION = "version"; public ExtensionDependency(String uniqueId, Version version) { this.uniqueId = uniqueId; @@ -54,6 +60,39 @@ public void writeTo(StreamOutput out) throws IOException { out.writeVersion(version); } + public static ExtensionDependency parse(XContentParser parser) throws IOException { + String uniqueId = null; + Version version = null; + ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser); + while (parser.nextToken() != XContentParser.Token.END_OBJECT) { + String fieldName = parser.currentName(); + parser.nextToken(); + + switch (fieldName) { + case UNIQUE_ID: + uniqueId = parser.text(); + break; + case VERSION: + try { + version = Version.fromString(parser.text()); + } catch (IllegalArgumentException e) { + throw e; + } + break; + default: + parser.skipChildren(); + break; + } + } + if (Strings.isNullOrEmpty(uniqueId)) { + throw new IOException("Required field [uniqueId] is missing in the request for the dependent extension"); + } else if (version == null) { + throw new IOException("Required field [version] is missing in the request for the dependent extension"); + } + return new ExtensionDependency(uniqueId, version); + + } + /** * The uniqueId of the dependency extension * diff --git a/server/src/main/java/org/opensearch/extensions/ExtensionsManager.java b/server/src/main/java/org/opensearch/extensions/ExtensionsManager.java index 5c6b9f3141aa0..9987497b5fac0 100644 --- a/server/src/main/java/org/opensearch/extensions/ExtensionsManager.java +++ b/server/src/main/java/org/opensearch/extensions/ExtensionsManager.java @@ -9,29 +9,18 @@ package org.opensearch.extensions; import java.io.IOException; -import java.io.InputStream; import java.net.InetAddress; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.ArrayList; -import java.util.Collection; import java.util.HashMap; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; -import java.util.Arrays; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; -import org.opensearch.OpenSearchException; import org.opensearch.Version; import org.opensearch.action.ActionModule; import org.opensearch.action.ActionModule.DynamicActionRegistry; @@ -40,7 +29,8 @@ import org.opensearch.cluster.ClusterSettingsResponse; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.Setting; -import org.opensearch.core.util.FileSystemUtils; +import org.opensearch.core.common.Strings; +import org.opensearch.common.util.concurrent.AbstractRunnable; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.settings.Settings; import org.opensearch.common.settings.SettingsModule; @@ -65,7 +55,6 @@ import org.opensearch.transport.TransportResponse; import org.opensearch.transport.TransportResponseHandler; import org.opensearch.transport.TransportService; -import org.yaml.snakeyaml.Yaml; import org.opensearch.env.EnvironmentSettingsResponse; /** @@ -100,7 +89,6 @@ public static enum OpenSearchRequestType { REQUEST_OPENSEARCH_NAMED_WRITEABLE_REGISTRY } - private final Path extensionsPath; private ExtensionTransportActionsHandler extensionTransportActionsHandler; private Map extensionSettingsMap; private Map initializedExtensions; @@ -117,13 +105,11 @@ public static enum OpenSearchRequestType { /** * Instantiate a new ExtensionsManager object to handle requests and responses from extensions. This is called during Node bootstrap. * - * @param extensionsPath Path to a directory containing extensions. * @param additionalSettings Additional settings to read in from extensions.yml * @throws IOException If the extensions discovery file is not properly retrieved. */ - public ExtensionsManager(Path extensionsPath, Set> additionalSettings) throws IOException { + public ExtensionsManager(Set> additionalSettings) throws IOException { logger.info("ExtensionsManager initialized"); - this.extensionsPath = extensionsPath; this.initializedExtensions = new HashMap(); this.extensionIdMap = new HashMap(); this.extensionSettingsMap = new HashMap(); @@ -137,12 +123,6 @@ public ExtensionsManager(Path extensionsPath, Set> additionalSettings } this.client = null; this.extensionTransportActionsHandler = null; - - /* - * Now Discover extensions - */ - discover(); - } /** @@ -307,60 +287,42 @@ private void registerRequestHandler(DynamicActionRegistry dynamicActionRegistry) ); } - /* - * Load and populate all extensions + /** + * Loads a single extension + * @param extension The extension to be loaded */ - protected void discover() throws IOException { - logger.info("Loading extensions : {}", extensionsPath); - if (!FileSystemUtils.isAccessibleDirectory(extensionsPath, logger)) { - return; - } + public void loadExtension(Extension extension) throws IOException { + validateExtension(extension); + DiscoveryExtensionNode discoveryExtensionNode = new DiscoveryExtensionNode( + extension.getName(), + extension.getUniqueId(), + new TransportAddress(InetAddress.getByName(extension.getHostAddress()), Integer.parseInt(extension.getPort())), + new HashMap(), + Version.fromString(extension.getOpensearchVersion()), + Version.fromString(extension.getMinimumCompatibleVersion()), + extension.getDependencies() + ); + extensionIdMap.put(extension.getUniqueId(), discoveryExtensionNode); + extensionSettingsMap.put(extension.getUniqueId(), extension); + logger.info("Loaded extension with uniqueId " + extension.getUniqueId() + ": " + extension); + } - List extensions = new ArrayList(); - if (Files.exists(extensionsPath.resolve("extensions.yml"))) { - try { - extensions = readFromExtensionsYml(extensionsPath.resolve("extensions.yml")).getExtensions(); - } catch (IOException e) { - throw new IOException("Could not read from extensions.yml", e); - } - for (Extension extension : extensions) { - loadExtension(extension); - } - if (!extensionIdMap.isEmpty()) { - logger.info("Loaded all extensions"); - } - } else { - logger.warn("Extensions.yml file is not present. No extensions will be loaded."); + private void validateField(String fieldName, String value) throws IOException { + if (Strings.isNullOrEmpty(value)) { + throw new IOException("Required field [" + fieldName + "] is missing in the request"); } } - /** - * Loads a single extension - * @param extension The extension to be loaded - */ - private void loadExtension(Extension extension) throws IOException { + private void validateExtension(Extension extension) throws IOException { + validateField("extension name", extension.getName()); + validateField("extension uniqueId", extension.getUniqueId()); + validateField("extension host address", extension.getHostAddress()); + validateField("extension port", extension.getPort()); + validateField("extension version", extension.getVersion()); + validateField("opensearch version", extension.getOpensearchVersion()); + validateField("minimum opensearch version", extension.getMinimumCompatibleVersion()); if (extensionIdMap.containsKey(extension.getUniqueId())) { - logger.info("Duplicate uniqueId " + extension.getUniqueId() + ". Did not load extension: " + extension); - } else { - try { - DiscoveryExtensionNode discoveryExtensionNode = new DiscoveryExtensionNode( - extension.getName(), - extension.getUniqueId(), - new TransportAddress(InetAddress.getByName(extension.getHostAddress()), Integer.parseInt(extension.getPort())), - new HashMap(), - Version.fromString(extension.getOpensearchVersion()), - Version.fromString(extension.getMinimumCompatibleVersion()), - extension.getDependencies() - ); - - extensionIdMap.put(extension.getUniqueId(), discoveryExtensionNode); - extensionSettingsMap.put(extension.getUniqueId(), extension); - logger.info("Loaded extension with uniqueId " + extension.getUniqueId() + ": " + extension); - } catch (OpenSearchException e) { - logger.error("Could not load extension with uniqueId " + extension.getUniqueId() + " due to " + e); - } catch (IllegalArgumentException e) { - throw e; - } + throw new IOException("Duplicate uniqueId [" + extension.getUniqueId() + "]. Did not load extension: " + extension); } } @@ -408,27 +370,35 @@ public String executor() { return ThreadPool.Names.GENERIC; } }; - try { - logger.info("Sending extension request type: " + REQUEST_EXTENSION_ACTION_NAME); - transportService.connectToExtensionNode(extension); - transportService.sendRequest( - extension, - REQUEST_EXTENSION_ACTION_NAME, - new InitializeExtensionRequest(transportService.getLocalNode(), extension), - initializeExtensionResponseHandler - ); - inProgressFuture.orTimeout(EXTENSION_REQUEST_WAIT_TIMEOUT, TimeUnit.SECONDS).join(); - } catch (CompletionException | ConnectTransportException e) { - if (e.getCause() instanceof TimeoutException || e instanceof ConnectTransportException) { - logger.info("No response from extension to request.", e); - } else if (e.getCause() instanceof RuntimeException) { - throw (RuntimeException) e.getCause(); - } else if (e.getCause() instanceof Error) { - throw (Error) e.getCause(); - } else { - throw new RuntimeException(e.getCause()); + + logger.info("Sending extension request type: " + REQUEST_EXTENSION_ACTION_NAME); + transportService.getThreadPool().generic().execute(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + extensionIdMap.remove(extension.getId()); + if (e.getCause() instanceof ConnectTransportException) { + logger.info("No response from extension to request.", e); + throw (ConnectTransportException) e.getCause(); + } else if (e.getCause() instanceof RuntimeException) { + throw (RuntimeException) e.getCause(); + } else if (e.getCause() instanceof Error) { + throw (Error) e.getCause(); + } else { + throw new RuntimeException(e.getCause()); + } } - } + + @Override + protected void doRun() throws Exception { + transportService.connectToExtensionNode(extension); + transportService.sendRequest( + extension, + REQUEST_EXTENSION_ACTION_NAME, + new InitializeExtensionRequest(transportService.getLocalNode(), extension), + initializeExtensionResponseHandler + ); + } + }); } /** @@ -467,84 +437,6 @@ TransportResponse handleExtensionRequest(ExtensionRequest extensionRequest) thro } } - private ExtensionsSettings readFromExtensionsYml(Path filePath) throws IOException { - Yaml yaml = new Yaml(); - try (InputStream inputStream = Files.newInputStream(filePath)) { - Map obj = yaml.load(inputStream); - if (obj == null) { - inputStream.close(); - throw new IOException("extensions.yml is empty"); - } - List> unreadExtensions = new ArrayList<>((Collection>) obj.get("extensions")); - List readExtensions = new ArrayList(); - Set additionalSettingsKeys = additionalSettings.stream().map(s -> s.getKey()).collect(Collectors.toSet()); - for (HashMap extensionMap : unreadExtensions) { - try { - // checking to see whether any required fields are missing from extension.yml file or not - String[] requiredFields = { - "name", - "uniqueId", - "hostAddress", - "port", - "version", - "opensearchVersion", - "minimumCompatibleVersion" }; - List missingFields = Arrays.stream(requiredFields) - .filter(field -> !extensionMap.containsKey(field)) - .collect(Collectors.toList()); - if (!missingFields.isEmpty()) { - throw new IOException("Extension is missing these required fields : " + missingFields); - } - - // Parse extension dependencies - List extensionDependencyList = new ArrayList(); - if (extensionMap.get("dependencies") != null) { - List> extensionDependencies = new ArrayList<>( - (Collection>) extensionMap.get("dependencies") - ); - for (HashMap dependency : extensionDependencies) { - extensionDependencyList.add( - new ExtensionDependency( - dependency.get("uniqueId").toString(), - Version.fromString(dependency.get("version").toString()) - ) - ); - } - } - - ExtensionScopedSettings extAdditionalSettings = new ExtensionScopedSettings(additionalSettings); - Map additionalSettingsMap = extensionMap.entrySet() - .stream() - .filter(kv -> additionalSettingsKeys.contains(kv.getKey())) - .collect(Collectors.toMap(map -> map.getKey(), map -> map.getValue())); - - Settings.Builder output = Settings.builder(); - output.loadFromMap(additionalSettingsMap); - extAdditionalSettings.applySettings(output.build()); - - // Create extension read from yml config - readExtensions.add( - new Extension( - extensionMap.get("name").toString(), - extensionMap.get("uniqueId").toString(), - extensionMap.get("hostAddress").toString(), - extensionMap.get("port").toString(), - extensionMap.get("version").toString(), - extensionMap.get("opensearchVersion").toString(), - extensionMap.get("minimumCompatibleVersion").toString(), - extensionDependencyList, - extAdditionalSettings - ) - ); - } catch (IOException e) { - logger.warn("loading extension has been failed because of exception : " + e.getMessage()); - } - } - inputStream.close(); - return new ExtensionsSettings(readExtensions); - } - } - static String getRequestExtensionActionName() { return REQUEST_EXTENSION_ACTION_NAME; } @@ -561,10 +453,6 @@ static Logger getLogger() { return logger; } - Path getExtensionsPath() { - return extensionsPath; - } - TransportService getTransportService() { return transportService; } diff --git a/server/src/main/java/org/opensearch/extensions/NoopExtensionsManager.java b/server/src/main/java/org/opensearch/extensions/NoopExtensionsManager.java index fb7160bc1bc67..d434074279041 100644 --- a/server/src/main/java/org/opensearch/extensions/NoopExtensionsManager.java +++ b/server/src/main/java/org/opensearch/extensions/NoopExtensionsManager.java @@ -9,7 +9,6 @@ package org.opensearch.extensions; import java.io.IOException; -import java.nio.file.Path; import java.util.Optional; import java.util.Set; @@ -32,7 +31,7 @@ public class NoopExtensionsManager extends ExtensionsManager { public NoopExtensionsManager() throws IOException { - super(Path.of(""), Set.of()); + super(Set.of()); } @Override @@ -59,11 +58,6 @@ public ExtensionActionResponse handleTransportRequest(ExtensionActionRequest req return new ExtensionActionResponse(new byte[0]); } - @Override - protected void discover() throws IOException { - // no-op - } - @Override public void initialize() { // no-op diff --git a/server/src/main/java/org/opensearch/extensions/rest/RestActionsRequestHandler.java b/server/src/main/java/org/opensearch/extensions/rest/RestActionsRequestHandler.java index 37638f2a333d5..d890c1b85bb81 100644 --- a/server/src/main/java/org/opensearch/extensions/rest/RestActionsRequestHandler.java +++ b/server/src/main/java/org/opensearch/extensions/rest/RestActionsRequestHandler.java @@ -13,7 +13,6 @@ import org.opensearch.extensions.DiscoveryExtensionNode; import org.opensearch.rest.RestController; import org.opensearch.rest.RestHandler; -import org.opensearch.rest.extensions.RestSendToExtensionAction; import org.opensearch.transport.TransportResponse; import org.opensearch.transport.TransportService; diff --git a/server/src/main/java/org/opensearch/extensions/rest/RestInitializeExtensionAction.java b/server/src/main/java/org/opensearch/extensions/rest/RestInitializeExtensionAction.java new file mode 100644 index 0000000000000..e0806f8172278 --- /dev/null +++ b/server/src/main/java/org/opensearch/extensions/rest/RestInitializeExtensionAction.java @@ -0,0 +1,141 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.extensions.rest; + +import org.opensearch.client.node.NodeClient; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.extensions.ExtensionDependency; +import org.opensearch.extensions.ExtensionScopedSettings; +import org.opensearch.extensions.ExtensionsManager; +import org.opensearch.extensions.ExtensionsSettings.Extension; +import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.BytesRestResponse; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.RestStatus; +import org.opensearch.transport.ConnectTransportException; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletionException; +import java.util.concurrent.TimeoutException; + +import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken; +import static org.opensearch.rest.RestRequest.Method.POST; + +/** + * An action that initializes an extension + */ +public class RestInitializeExtensionAction extends BaseRestHandler { + + private final ExtensionsManager extensionsManager; + + @Override + public String getName() { + return ExtensionsManager.REQUEST_EXTENSION_ACTION_NAME; + } + + @Override + public List routes() { + return List.of(new Route(POST, "/_extensions/initialize")); + } + + public RestInitializeExtensionAction(ExtensionsManager extensionsManager) { + this.extensionsManager = extensionsManager; + } + + @Override + public RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { + String name = null; + String uniqueId = null; + String hostAddress = null; + String port = null; + String version = null; + String openSearchVersion = null; + String minimumCompatibleVersion = null; + List dependencies = new ArrayList<>(); + + try (XContentParser parser = request.contentParser()) { + parser.nextToken(); + ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser); + while (parser.nextToken() != XContentParser.Token.END_OBJECT) { + String currentFieldName = parser.currentName(); + parser.nextToken(); + if ("name".equals(currentFieldName)) { + name = parser.text(); + } else if ("uniqueId".equals(currentFieldName)) { + uniqueId = parser.text(); + } else if ("hostAddress".equals(currentFieldName)) { + hostAddress = parser.text(); + } else if ("port".equals(currentFieldName)) { + port = parser.text(); + } else if ("version".equals(currentFieldName)) { + version = parser.text(); + } else if ("opensearchVersion".equals(currentFieldName)) { + openSearchVersion = parser.text(); + } else if ("minimumCompatibleVersion".equals(currentFieldName)) { + minimumCompatibleVersion = parser.text(); + } else if ("dependencies".equals(currentFieldName)) { + ensureExpectedToken(XContentParser.Token.START_ARRAY, parser.currentToken(), parser); + while (parser.nextToken() != XContentParser.Token.END_ARRAY) { + dependencies.add(ExtensionDependency.parse(parser)); + } + } + } + } catch (IOException e) { + throw new IOException("Missing attribute", e); + } + + Extension extension = new Extension( + name, + uniqueId, + hostAddress, + port, + version, + openSearchVersion, + minimumCompatibleVersion, + dependencies, + // TODO add this to the API (https://github.com/opensearch-project/OpenSearch/issues/8032) + new ExtensionScopedSettings(Collections.emptySet()) + ); + try { + extensionsManager.loadExtension(extension); + extensionsManager.initialize(); + } catch (CompletionException e) { + Throwable cause = e.getCause(); + if (cause instanceof TimeoutException) { + return channel -> channel.sendResponse( + new BytesRestResponse(RestStatus.REQUEST_TIMEOUT, "No response from extension to request.") + ); + } else if (cause instanceof ConnectTransportException || cause instanceof RuntimeException) { + return channel -> channel.sendResponse( + new BytesRestResponse(RestStatus.REQUEST_TIMEOUT, "Connection failed with the extension.") + ); + } + if (e.getCause() instanceof Error) { + throw (Error) e.getCause(); + } + } catch (Exception e) { + return channel -> channel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, e.getMessage())); + + } + + return channel -> { + try (XContentBuilder builder = channel.newBuilder()) { + builder.startObject(); + builder.field("success", "A request to initialize an extension has been sent."); + builder.endObject(); + channel.sendResponse(new BytesRestResponse(RestStatus.ACCEPTED, builder)); + } + }; + + } +} diff --git a/server/src/main/java/org/opensearch/rest/extensions/RestSendToExtensionAction.java b/server/src/main/java/org/opensearch/extensions/rest/RestSendToExtensionAction.java similarity index 98% rename from server/src/main/java/org/opensearch/rest/extensions/RestSendToExtensionAction.java rename to server/src/main/java/org/opensearch/extensions/rest/RestSendToExtensionAction.java index 51ff74b1869a0..073b3f3f45818 100644 --- a/server/src/main/java/org/opensearch/rest/extensions/RestSendToExtensionAction.java +++ b/server/src/main/java/org/opensearch/extensions/rest/RestSendToExtensionAction.java @@ -6,7 +6,7 @@ * compatible open source license. */ -package org.opensearch.rest.extensions; +package org.opensearch.extensions.rest; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -17,9 +17,6 @@ import org.opensearch.common.xcontent.XContentType; import org.opensearch.extensions.DiscoveryExtensionNode; import org.opensearch.extensions.ExtensionsManager; -import org.opensearch.extensions.rest.ExtensionRestRequest; -import org.opensearch.extensions.rest.RegisterRestActionsRequest; -import org.opensearch.extensions.rest.RestExecuteOnExtensionResponse; import org.opensearch.rest.BaseRestHandler; import org.opensearch.rest.BytesRestResponse; import org.opensearch.rest.NamedRoute; diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 808c054de4969..688f2d05b203b 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -475,7 +475,7 @@ protected Node( for (ExtensionAwarePlugin extAwarePlugin : extensionAwarePlugins) { additionalSettings.addAll(extAwarePlugin.getExtensionSettings()); } - this.extensionsManager = new ExtensionsManager(initialEnvironment.extensionDir(), additionalSettings); + this.extensionsManager = new ExtensionsManager(additionalSettings); } else { this.extensionsManager = new NoopExtensionsManager(); } @@ -803,7 +803,8 @@ protected Node( circuitBreakerService, usageService, systemIndices, - identityService + identityService, + extensionsManager ); modules.add(actionModule); @@ -1306,7 +1307,6 @@ public Node start() throws NodeValidationException { assert clusterService.localNode().equals(localNodeFactory.getNode()) : "clusterService has a different local node than the factory provided"; transportService.acceptIncomingRequests(); - extensionsManager.initialize(); discovery.startInitialJoin(); final TimeValue initialStateTimeout = DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING.get(settings()); configureNodeAndClusterIdStateListener(clusterService); diff --git a/server/src/main/java/org/opensearch/rest/extensions/package-info.java b/server/src/main/java/org/opensearch/rest/extensions/package-info.java deleted file mode 100644 index 64b92e8b5c149..0000000000000 --- a/server/src/main/java/org/opensearch/rest/extensions/package-info.java +++ /dev/null @@ -1,10 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -/** REST classes for the extensions package. OpenSearch extensions provide extensibility to OpenSearch.*/ -package org.opensearch.rest.extensions; diff --git a/server/src/test/java/org/opensearch/action/ActionModuleTests.java b/server/src/test/java/org/opensearch/action/ActionModuleTests.java index 66af9ebfd814f..109c60aa1e4f1 100644 --- a/server/src/test/java/org/opensearch/action/ActionModuleTests.java +++ b/server/src/test/java/org/opensearch/action/ActionModuleTests.java @@ -46,6 +46,7 @@ import org.opensearch.common.settings.SettingsFilter; import org.opensearch.common.settings.SettingsModule; import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.extensions.ExtensionsManager; import org.opensearch.identity.IdentityService; import org.opensearch.plugins.ActionPlugin; import org.opensearch.plugins.ActionPlugin.ActionHandler; @@ -65,6 +66,7 @@ import java.io.IOException; import java.util.List; +import java.util.Set; import java.util.function.Supplier; import static java.util.Collections.emptyList; @@ -124,7 +126,7 @@ protected FakeAction() { ); } - public void testSetupRestHandlerContainsKnownBuiltin() { + public void testSetupRestHandlerContainsKnownBuiltin() throws IOException { SettingsModule settings = new SettingsModule(Settings.EMPTY); UsageService usageService = new UsageService(); ActionModule actionModule = new ActionModule( @@ -139,7 +141,8 @@ public void testSetupRestHandlerContainsKnownBuiltin() { null, usageService, null, - new IdentityService(Settings.EMPTY, new ArrayList<>()) + new IdentityService(Settings.EMPTY, new ArrayList<>()), + new ExtensionsManager(Set.of()) ); actionModule.initRestHandlers(null); // At this point the easiest way to confirm that a handler is loaded is to try to register another one on top of it and to fail @@ -196,6 +199,7 @@ public String getName() { null, usageService, null, + null, null ); Exception e = expectThrows(IllegalArgumentException.class, () -> actionModule.initRestHandlers(null)); @@ -246,6 +250,7 @@ public List getRestHandlers( null, usageService, null, + null, null ); actionModule.initRestHandlers(null); diff --git a/server/src/test/java/org/opensearch/action/DynamicActionRegistryTests.java b/server/src/test/java/org/opensearch/action/DynamicActionRegistryTests.java index a5b4f91ff1ed5..963d47df3baff 100644 --- a/server/src/test/java/org/opensearch/action/DynamicActionRegistryTests.java +++ b/server/src/test/java/org/opensearch/action/DynamicActionRegistryTests.java @@ -18,7 +18,7 @@ import org.opensearch.extensions.action.ExtensionTransportAction; import org.opensearch.rest.NamedRoute; import org.opensearch.rest.RestRequest; -import org.opensearch.rest.extensions.RestSendToExtensionAction; +import org.opensearch.extensions.rest.RestSendToExtensionAction; import org.opensearch.tasks.Task; import org.opensearch.tasks.TaskManager; import org.opensearch.test.OpenSearchTestCase; diff --git a/server/src/test/java/org/opensearch/extensions/ExtensionsManagerTests.java b/server/src/test/java/org/opensearch/extensions/ExtensionsManagerTests.java index 75a1d9ec62c82..f8ec138d8eff2 100644 --- a/server/src/test/java/org/opensearch/extensions/ExtensionsManagerTests.java +++ b/server/src/test/java/org/opensearch/extensions/ExtensionsManagerTests.java @@ -23,10 +23,7 @@ import java.io.IOException; import java.net.InetAddress; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; import java.nio.file.Path; -import java.security.AccessControlException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -40,6 +37,7 @@ import org.apache.logging.log4j.LogManager; import org.junit.After; import org.junit.Before; +import org.opensearch.OpenSearchException; import org.opensearch.Version; import org.opensearch.action.ActionModule; import org.opensearch.action.ActionModule.DynamicActionRegistry; @@ -47,12 +45,13 @@ import org.opensearch.client.node.NodeClient; import org.opensearch.cluster.ClusterSettingsResponse; import org.opensearch.common.util.FeatureFlags; +import org.opensearch.common.xcontent.json.JsonXContent; +import org.opensearch.core.xcontent.XContentParser; import org.opensearch.env.EnvironmentSettingsResponse; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.bytes.BytesReference; -import org.opensearch.common.io.PathUtils; import org.opensearch.common.io.stream.BytesStreamInput; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.io.stream.NamedWriteableRegistry; @@ -69,6 +68,7 @@ import org.opensearch.extensions.proto.ExtensionRequestProto; import org.opensearch.extensions.rest.RegisterRestActionsRequest; import org.opensearch.extensions.settings.RegisterCustomSettingsRequest; +import org.opensearch.extensions.ExtensionsSettings.Extension; import org.opensearch.identity.IdentityService; import org.opensearch.indices.breaker.NoneCircuitBreakerService; import org.opensearch.plugins.ExtensionAwarePlugin; @@ -203,10 +203,37 @@ public void tearDown() throws Exception { ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); } - public void testDiscover() throws Exception { - Files.write(extensionDir.resolve("extensions.yml"), extensionsYmlLines, StandardCharsets.UTF_8); + public void testLoadExtensions() throws Exception { - ExtensionsManager extensionsManager = new ExtensionsManager(extensionDir, Set.of()); + Set> additionalSettings = extAwarePlugin.getExtensionSettings().stream().collect(Collectors.toSet()); + ExtensionScopedSettings extensionScopedSettings = new ExtensionScopedSettings(additionalSettings); + ExtensionsManager extensionsManager = new ExtensionsManager(additionalSettings); + ExtensionDependency dependentExtension = new ExtensionDependency("uniqueid0", Version.fromString("2.0.0")); + + Extension firstExtension = new Extension( + "firstExtension", + "uniqueid1", + "127.0.0.1", + "9300", + "0.0.7", + "3.0.0", + "3.0.0", + Collections.emptyList(), + extensionScopedSettings + ); + Extension secondExtension = new Extension( + "secondExtension", + "uniqueid2", + "127.0.0.1", + "9301", + "0.0.7", + "2.0.0", + "2.0.0", + List.of(dependentExtension), + extensionScopedSettings + ); + extensionsManager.loadExtension(firstExtension); + extensionsManager.loadExtension(secondExtension); List expectedExtensions = new ArrayList(); @@ -218,7 +245,7 @@ public void testDiscover() throws Exception { new DiscoveryExtensionNode( "firstExtension", "uniqueid1", - new TransportAddress(InetAddress.getByName("127.0.0.0"), 9300), + new TransportAddress(InetAddress.getByName("127.0.0.1"), 9300), new HashMap(), Version.fromString("3.0.0"), Version.fromString("3.0.0"), @@ -252,14 +279,37 @@ public void testDiscover() throws Exception { } } - public void testNonUniqueExtensionsDiscovery() throws Exception { - Path emptyExtensionDir = createTempDir(); - List nonUniqueYmlLines = extensionsYmlLines.stream() - .map(s -> s.replace("uniqueid2", "uniqueid1")) - .collect(Collectors.toList()); - Files.write(emptyExtensionDir.resolve("extensions.yml"), nonUniqueYmlLines, StandardCharsets.UTF_8); + public void testNonUniqueLoadedExtensions() throws Exception { - ExtensionsManager extensionsManager = new ExtensionsManager(emptyExtensionDir, Set.of()); + Extension firstExtension = new Extension( + "firstExtension", + "uniqueid1", + "127.0.0.0", + "9300", + "0.0.7", + "3.0.0", + "3.0.0", + Collections.emptyList(), + null + ); + Extension secondExtension = new Extension( + "secondExtension", + "uniqueid1", + "127.0.0.0", + "9300", + "0.0.7", + "3.0.0", + "3.0.0", + null, + null + ); + ExtensionsManager extensionsManager = new ExtensionsManager(Set.of()); + extensionsManager.loadExtension(firstExtension); + IOException exception = expectThrows(IOException.class, () -> extensionsManager.loadExtension(secondExtension)); + assertEquals( + "Duplicate uniqueId [uniqueid1]. Did not load extension: Extension [name=secondExtension, uniqueId=uniqueid1, hostAddress=127.0.0.0, port=9300, version=0.0.7, opensearchVersion=3.0.0, minimumCompatibleVersion=3.0.0]", + exception.getMessage() + ); List expectedExtensions = new ArrayList(); @@ -289,56 +339,15 @@ public void testNonUniqueExtensionsDiscovery() throws Exception { assertTrue(expectedExtensions.containsAll(emptyList())); } - public void testMissingRequiredFieldsInExtensionDiscovery() throws Exception { - Path emptyExtensionDir = createTempDir(); - ExtensionsManager extensionsManager; - List requiredFieldMissingYmlLines = extensionsYmlLines.stream() - .map(s -> s.replace(" minimumCompatibleVersion: '2.0.0'", "")) - .collect(Collectors.toList()); - Files.write(emptyExtensionDir.resolve("extensions.yml"), requiredFieldMissingYmlLines, StandardCharsets.UTF_8); + public void testMissingRequiredFieldsWhileLoadingExtension() throws Exception { - try (MockLogAppender mockLogAppender = MockLogAppender.createForLoggers(LogManager.getLogger(ExtensionsManager.class))) { + Extension firstExtension = new Extension("firstExtension", "uniqueid1", "127.0.0.0", "9300", "0.0.7", "3.0.0", "", null, null); + ExtensionsManager extensionsManager = new ExtensionsManager(Set.of()); - mockLogAppender.addExpectation( - new MockLogAppender.SeenEventExpectation( - "Required field is missing in extensions.yml", - "org.opensearch.extensions.ExtensionsManager", - Level.WARN, - "loading extension has been failed because of exception : Extension is missing these required fields : [minimumCompatibleVersion]" - ) - ); - - extensionsManager = new ExtensionsManager(emptyExtensionDir, Set.of()); + IOException exception = expectThrows(IOException.class, () -> extensionsManager.loadExtension(firstExtension)); + assertEquals("Required field [minimum opensearch version] is missing in the request", exception.getMessage()); - mockLogAppender.assertAllExpectationsMatched(); - } - - List expectedExtensions = new ArrayList(); - - expectedExtensions.add( - new DiscoveryExtensionNode( - "firstExtension", - "uniqueid1", - new TransportAddress(InetAddress.getByName("127.0.0.0"), 9300), - new HashMap(), - Version.fromString("3.0.0"), - Version.fromString("3.0.0"), - Collections.emptyList() - ) - ); - assertEquals(expectedExtensions.size(), extensionsManager.getExtensionIdMap().values().size()); - for (DiscoveryExtensionNode extension : expectedExtensions) { - DiscoveryExtensionNode initializedExtension = extensionsManager.getExtensionIdMap().get(extension.getId()); - assertEquals(extension.getName(), initializedExtension.getName()); - assertEquals(extension.getId(), initializedExtension.getId()); - assertEquals(extension.getAddress(), initializedExtension.getAddress()); - assertEquals(extension.getAttributes(), initializedExtension.getAttributes()); - assertEquals(extension.getVersion(), initializedExtension.getVersion()); - assertEquals(extension.getMinimumCompatibleVersion(), initializedExtension.getMinimumCompatibleVersion()); - assertEquals(extension.getDependencies(), initializedExtension.getDependencies()); - } - assertTrue(expectedExtensions.containsAll(emptyList())); - assertTrue(expectedExtensions.containsAll(emptyList())); + assertEquals(0, extensionsManager.getExtensionIdMap().values().size()); } public void testDiscoveryExtension() throws Exception { @@ -389,49 +398,18 @@ public void testExtensionDependency() throws Exception { } } - public void testNonAccessibleDirectory() throws Exception { - AccessControlException e = expectThrows( + public void testParseExtensionDependency() throws Exception { + XContentParser parser = createParser(JsonXContent.jsonXContent, "{\"uniqueId\": \"test1\", \"version\": \"2.0.0\"}"); - AccessControlException.class, - () -> new ExtensionsManager(PathUtils.get(""), Set.of()) - ); - assertEquals("access denied (\"java.io.FilePermission\" \"\" \"read\")", e.getMessage()); - } - - public void testNoExtensionsFile() throws Exception { - Settings settings = Settings.builder().build(); - - try (MockLogAppender mockLogAppender = MockLogAppender.createForLoggers(LogManager.getLogger(ExtensionsManager.class))) { - - mockLogAppender.addExpectation( - new MockLogAppender.SeenEventExpectation( - "No Extensions File Present", - "org.opensearch.extensions.ExtensionsManager", - Level.WARN, - "Extensions.yml file is not present. No extensions will be loaded." - ) - ); + assertEquals(XContentParser.Token.START_OBJECT, parser.nextToken()); + ExtensionDependency dependency = ExtensionDependency.parse(parser); - new ExtensionsManager(extensionDir, Set.of()); - - mockLogAppender.assertAllExpectationsMatched(); - } - } - - public void testEmptyExtensionsFile() throws Exception { - Path emptyExtensionDir = createTempDir(); - - List emptyExtensionsYmlLines = Arrays.asList(); - Files.write(emptyExtensionDir.resolve("extensions.yml"), emptyExtensionsYmlLines, StandardCharsets.UTF_8); - - Settings settings = Settings.builder().build(); - - expectThrows(IOException.class, () -> new ExtensionsManager(emptyExtensionDir, Set.of())); + assertEquals("test1", dependency.getUniqueId()); + assertEquals(Version.fromString("2.0.0"), dependency.getVersion()); } public void testInitialize() throws Exception { - Files.write(extensionDir.resolve("extensions.yml"), extensionsYmlLines, StandardCharsets.UTF_8); - ExtensionsManager extensionsManager = new ExtensionsManager(extensionDir, Set.of()); + ExtensionsManager extensionsManager = new ExtensionsManager(Set.of()); initialize(extensionsManager); @@ -472,9 +450,8 @@ public void testInitialize() throws Exception { } public void testHandleRegisterRestActionsRequest() throws Exception { - Files.write(extensionDir.resolve("extensions.yml"), extensionsYmlLines, StandardCharsets.UTF_8); - ExtensionsManager extensionsManager = new ExtensionsManager(extensionDir, Set.of()); + ExtensionsManager extensionsManager = new ExtensionsManager(Set.of()); initialize(extensionsManager); String uniqueIdStr = "uniqueid1"; @@ -488,8 +465,7 @@ public void testHandleRegisterRestActionsRequest() throws Exception { } public void testHandleRegisterSettingsRequest() throws Exception { - Files.write(extensionDir.resolve("extensions.yml"), extensionsYmlLines, StandardCharsets.UTF_8); - ExtensionsManager extensionsManager = new ExtensionsManager(extensionDir, Set.of()); + ExtensionsManager extensionsManager = new ExtensionsManager(Set.of()); initialize(extensionsManager); String uniqueIdStr = "uniqueid1"; @@ -505,7 +481,7 @@ public void testHandleRegisterSettingsRequest() throws Exception { } public void testHandleRegisterRestActionsRequestWithInvalidMethod() throws Exception { - ExtensionsManager extensionsManager = new ExtensionsManager(extensionDir, Set.of()); + ExtensionsManager extensionsManager = new ExtensionsManager(Set.of()); initialize(extensionsManager); String uniqueIdStr = "uniqueid1"; @@ -520,7 +496,7 @@ public void testHandleRegisterRestActionsRequestWithInvalidMethod() throws Excep } public void testHandleRegisterRestActionsRequestWithInvalidDeprecatedMethod() throws Exception { - ExtensionsManager extensionsManager = new ExtensionsManager(extensionDir, Set.of()); + ExtensionsManager extensionsManager = new ExtensionsManager(Set.of()); initialize(extensionsManager); String uniqueIdStr = "uniqueid1"; @@ -535,7 +511,7 @@ public void testHandleRegisterRestActionsRequestWithInvalidDeprecatedMethod() th } public void testHandleRegisterRestActionsRequestWithInvalidUri() throws Exception { - ExtensionsManager extensionsManager = new ExtensionsManager(extensionDir, Set.of()); + ExtensionsManager extensionsManager = new ExtensionsManager(Set.of()); initialize(extensionsManager); String uniqueIdStr = "uniqueid1"; List actionsList = List.of("GET", "PUT /bar", "POST /baz"); @@ -549,7 +525,7 @@ public void testHandleRegisterRestActionsRequestWithInvalidUri() throws Exceptio } public void testHandleRegisterRestActionsRequestWithInvalidDeprecatedUri() throws Exception { - ExtensionsManager extensionsManager = new ExtensionsManager(extensionDir, Set.of()); + ExtensionsManager extensionsManager = new ExtensionsManager(Set.of()); initialize(extensionsManager); String uniqueIdStr = "uniqueid1"; List actionsList = List.of("GET /foo", "PUT /bar", "POST /baz"); @@ -563,7 +539,7 @@ public void testHandleRegisterRestActionsRequestWithInvalidDeprecatedUri() throw } public void testHandleExtensionRequest() throws Exception { - ExtensionsManager extensionsManager = new ExtensionsManager(extensionDir, Set.of()); + ExtensionsManager extensionsManager = new ExtensionsManager(Set.of()); initialize(extensionsManager); ExtensionRequest clusterStateRequest = new ExtensionRequest(ExtensionRequestProto.RequestType.REQUEST_EXTENSION_CLUSTER_STATE); @@ -717,9 +693,7 @@ public void testEnvironmentSettingsDefaultValue() throws Exception { } public void testAddSettingsUpdateConsumerRequest() throws Exception { - Path extensionDir = createTempDir(); - Files.write(extensionDir.resolve("extensions.yml"), extensionsYmlLines, StandardCharsets.UTF_8); - ExtensionsManager extensionsManager = new ExtensionsManager(extensionDir, Set.of()); + ExtensionsManager extensionsManager = new ExtensionsManager(Set.of()); initialize(extensionsManager); List> componentSettings = List.of( @@ -763,10 +737,7 @@ public void testAddSettingsUpdateConsumerRequest() throws Exception { } public void testHandleAddSettingsUpdateConsumerRequest() throws Exception { - - Path extensionDir = createTempDir(); - Files.write(extensionDir.resolve("extensions.yml"), extensionsYmlLines, StandardCharsets.UTF_8); - ExtensionsManager extensionsManager = new ExtensionsManager(extensionDir, Set.of()); + ExtensionsManager extensionsManager = new ExtensionsManager(Set.of()); initialize(extensionsManager); List> componentSettings = List.of( @@ -786,9 +757,7 @@ public void testHandleAddSettingsUpdateConsumerRequest() throws Exception { } public void testUpdateSettingsRequest() throws Exception { - Path extensionDir = createTempDir(); - Files.write(extensionDir.resolve("extensions.yml"), extensionsYmlLines, StandardCharsets.UTF_8); - ExtensionsManager extensionsManager = new ExtensionsManager(extensionDir, Set.of()); + ExtensionsManager extensionsManager = new ExtensionsManager(Set.of()); initialize(extensionsManager); Setting componentSetting = Setting.boolSetting("falseSetting", false, Property.Dynamic); @@ -817,7 +786,7 @@ public void testUpdateSettingsRequest() throws Exception { public void testRegisterHandler() throws Exception { - ExtensionsManager extensionsManager = new ExtensionsManager(extensionDir, Set.of()); + ExtensionsManager extensionsManager = new ExtensionsManager(Set.of()); TransportService mockTransportService = spy( new TransportService( @@ -842,43 +811,50 @@ public void testRegisterHandler() throws Exception { } - public void testIncompatibleExtensionRegistration() throws IOException, IllegalAccessException { - - try (MockLogAppender mockLogAppender = MockLogAppender.createForLoggers(LogManager.getLogger(ExtensionsManager.class))) { - - mockLogAppender.addExpectation( - new MockLogAppender.SeenEventExpectation( - "Could not load extension with uniqueId", - "org.opensearch.extensions.ExtensionsManager", - Level.ERROR, - "Could not load extension with uniqueId uniqueid1 due to OpenSearchException[Extension minimumCompatibleVersion: 3.99.0 is greater than current" - ) - ); - - List incompatibleExtension = Arrays.asList( - "extensions:", - " - name: firstExtension", - " uniqueId: uniqueid1", - " hostAddress: '127.0.0.0'", - " port: '9300'", - " version: '0.0.7'", - " opensearchVersion: '3.0.0'", - " minimumCompatibleVersion: '3.99.0'" - ); - - Files.write(extensionDir.resolve("extensions.yml"), incompatibleExtension, StandardCharsets.UTF_8); - ExtensionsManager extensionsManager = new ExtensionsManager(extensionDir, Set.of()); - assertEquals(0, extensionsManager.getExtensionIdMap().values().size()); - mockLogAppender.assertAllExpectationsMatched(); - } + public void testIncompatibleExtensionRegistration() throws IOException { + ExtensionsManager extensionsManager = new ExtensionsManager(Set.of()); + Extension firstExtension = new Extension( + "firstExtension", + "uniqueid1", + "127.0.0.0", + "9300", + "0.0.7", + "3.0.0", + "3.99.0", + List.of(), + null + ); + expectThrows(OpenSearchException.class, () -> extensionsManager.loadExtension(firstExtension)); + assertEquals(0, extensionsManager.getExtensionIdMap().values().size()); } public void testAdditionalExtensionSettingsForExtensionWithCustomSettingSet() throws Exception { - Files.write(extensionDir.resolve("extensions.yml"), extensionsYmlLines, StandardCharsets.UTF_8); + Setting customSetting = Setting.simpleString("custom_extension_setting", "custom_setting", Property.ExtensionScope); + ExtensionAwarePlugin extAwarePlugin = new ExtensionAwarePlugin() { + @Override + public List> getExtensionSettings() { + List> settings = new ArrayList>(); + settings.add(customSetting); + return settings; + } + }; Set> additionalSettings = extAwarePlugin.getExtensionSettings().stream().collect(Collectors.toSet()); + ExtensionScopedSettings extensionScopedSettings = new ExtensionScopedSettings(additionalSettings); + Extension firstExtension = new Extension( + "firstExtension", + "uniqueid1", + "127.0.0.0", + "9300", + "0.0.7", + "3.0.0", + "3.0.0", + List.of(), + extensionScopedSettings + ); - ExtensionsManager extensionsManager = new ExtensionsManager(extensionDir, additionalSettings); + ExtensionsManager extensionsManager = new ExtensionsManager(additionalSettings); + extensionsManager.loadExtension(firstExtension); DiscoveryExtensionNode extension = new DiscoveryExtensionNode( "firstExtension", @@ -900,11 +876,23 @@ public void testAdditionalExtensionSettingsForExtensionWithCustomSettingSet() th } public void testAdditionalExtensionSettingsForExtensionWithoutCustomSettingSet() throws Exception { - Files.write(extensionDir.resolve("extensions.yml"), extensionsYmlLines, StandardCharsets.UTF_8); Set> additionalSettings = extAwarePlugin.getExtensionSettings().stream().collect(Collectors.toSet()); + ExtensionScopedSettings extensionScopedSettings = new ExtensionScopedSettings(additionalSettings); + Extension firstExtension = new Extension( + "secondExtension", + "uniqueid2", + "127.0.0.0", + "9301", + "0.0.7", + "2.0.0", + "2.0.0", + List.of(), + extensionScopedSettings + ); - ExtensionsManager extensionsManager = new ExtensionsManager(extensionDir, additionalSettings); + ExtensionsManager extensionsManager = new ExtensionsManager(additionalSettings); + extensionsManager.loadExtension(firstExtension); DiscoveryExtensionNode extension = new DiscoveryExtensionNode( "secondExtension", diff --git a/server/src/test/java/org/opensearch/extensions/rest/RestInitializeExtensionActionTests.java b/server/src/test/java/org/opensearch/extensions/rest/RestInitializeExtensionActionTests.java new file mode 100644 index 0000000000000..8d027b7fca9c2 --- /dev/null +++ b/server/src/test/java/org/opensearch/extensions/rest/RestInitializeExtensionActionTests.java @@ -0,0 +1,124 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.extensions.rest; + +import java.util.Collections; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static java.util.Collections.emptyMap; +import static java.util.Collections.emptySet; +import static org.mockito.Mockito.mock; + +import org.junit.After; +import org.junit.Before; +import org.opensearch.Version; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.bytes.BytesArray; +import org.opensearch.common.io.stream.NamedWriteableRegistry; +import org.opensearch.common.network.NetworkService; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.PageCacheRecycler; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.extensions.ExtensionsManager; +import org.opensearch.indices.breaker.NoneCircuitBreakerService; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.RestStatus; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.test.rest.FakeRestChannel; +import org.opensearch.test.rest.FakeRestRequest; +import org.opensearch.test.transport.MockTransportService; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportService; +import org.opensearch.transport.nio.MockNioTransport; + +public class RestInitializeExtensionActionTests extends OpenSearchTestCase { + + private TransportService transportService; + private MockNioTransport transport; + private final ThreadPool threadPool = new TestThreadPool(RestInitializeExtensionActionTests.class.getSimpleName()); + + @Before + public void setup() throws Exception { + Settings settings = Settings.builder().put("cluster.name", "test").build(); + transport = new MockNioTransport( + settings, + Version.CURRENT, + threadPool, + new NetworkService(Collections.emptyList()), + PageCacheRecycler.NON_RECYCLING_INSTANCE, + new NamedWriteableRegistry(Collections.emptyList()), + new NoneCircuitBreakerService() + ); + transportService = new MockTransportService( + settings, + transport, + threadPool, + TransportService.NOOP_TRANSPORT_INTERCEPTOR, + (boundAddress) -> new DiscoveryNode( + "test_node", + "test_node", + boundAddress.publishAddress(), + emptyMap(), + emptySet(), + Version.CURRENT + ), + null, + Collections.emptySet() + ); + + } + + @Override + @After + public void tearDown() throws Exception { + super.tearDown(); + transportService.close(); + ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); + } + + public void testRestInitializeExtensionActionResponse() throws Exception { + ExtensionsManager extensionsManager = mock(ExtensionsManager.class); + RestInitializeExtensionAction restInitializeExtensionAction = new RestInitializeExtensionAction(extensionsManager); + final String content = "{\"name\":\"ad-extension\",\"uniqueId\":\"ad-extension\",\"hostAddress\":\"127.0.0.1\"," + + "\"port\":\"4532\",\"version\":\"1.0\",\"opensearchVersion\":\"3.0.0\"," + + "\"minimumCompatibleVersion\":\"3.0.0\"}"; + RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withContent(new BytesArray(content), XContentType.JSON) + .withMethod(RestRequest.Method.POST) + .build(); + + FakeRestChannel channel = new FakeRestChannel(request, false, 0); + restInitializeExtensionAction.handleRequest(request, channel, null); + + assertEquals(channel.capturedResponse().status(), RestStatus.ACCEPTED); + assertTrue(channel.capturedResponse().content().utf8ToString().contains("A request to initialize an extension has been sent.")); + } + + public void testRestInitializeExtensionActionFailure() throws Exception { + ExtensionsManager extensionsManager = new ExtensionsManager(Set.of()); + RestInitializeExtensionAction restInitializeExtensionAction = new RestInitializeExtensionAction(extensionsManager); + + final String content = "{\"name\":\"ad-extension\",\"uniqueId\":\"\",\"hostAddress\":\"127.0.0.1\"," + + "\"port\":\"4532\",\"version\":\"1.0\",\"opensearchVersion\":\"3.0.0\"," + + "\"minimumCompatibleVersion\":\"3.0.0\"}"; + RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withContent(new BytesArray(content), XContentType.JSON) + .withMethod(RestRequest.Method.POST) + .build(); + + FakeRestChannel channel = new FakeRestChannel(request, false, 0); + restInitializeExtensionAction.handleRequest(request, channel, null); + + assertEquals(1, channel.errors().get()); + assertTrue( + channel.capturedResponse().content().utf8ToString().contains("Required field [extension uniqueId] is missing in the request") + ); + } + +} diff --git a/server/src/test/java/org/opensearch/extensions/rest/RestSendToExtensionActionTests.java b/server/src/test/java/org/opensearch/extensions/rest/RestSendToExtensionActionTests.java index df047afb677d9..fe8792b36f048 100644 --- a/server/src/test/java/org/opensearch/extensions/rest/RestSendToExtensionActionTests.java +++ b/server/src/test/java/org/opensearch/extensions/rest/RestSendToExtensionActionTests.java @@ -42,6 +42,7 @@ import org.opensearch.common.util.PageCacheRecycler; import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.extensions.DiscoveryExtensionNode; +import org.opensearch.extensions.ExtensionsManager; import org.opensearch.extensions.action.ExtensionAction; import org.opensearch.extensions.action.ExtensionTransportAction; import org.opensearch.identity.IdentityService; @@ -49,7 +50,6 @@ import org.opensearch.rest.NamedRoute; import org.opensearch.rest.RestHandler.Route; import org.opensearch.rest.RestRequest.Method; -import org.opensearch.rest.extensions.RestSendToExtensionAction; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.transport.MockTransportService; import org.opensearch.threadpool.TestThreadPool; @@ -118,7 +118,8 @@ public void setup() throws Exception { null, usageService, null, - new IdentityService(Settings.EMPTY, new ArrayList<>()) + new IdentityService(Settings.EMPTY, new ArrayList<>()), + new ExtensionsManager(Set.of()) ); dynamicActionRegistry = actionModule.getDynamicActionRegistry(); }