diff --git a/build.gradle b/build.gradle index c3b193b15..510788da5 100644 --- a/build.gradle +++ b/build.gradle @@ -119,8 +119,12 @@ dependencies { test { include '**/*Tests.class' + systemProperty 'tests.security.manager', 'false' } +def opensearch_tmp_dir = rootProject.file('build/private/opensearch_tmp').absoluteFile +opensearch_tmp_dir.mkdirs() + jacocoTestReport { dependsOn test reports { @@ -137,6 +141,29 @@ task integTest(type: RestIntegTestTask) { tasks.named("check").configure { dependsOn(integTest) } integTest { + dependsOn "bundlePlugin" + systemProperty 'tests.security.manager', 'false' + systemProperty 'java.io.tmpdir', opensearch_tmp_dir.absolutePath + systemProperty "https", System.getProperty("https") + systemProperty "user", System.getProperty("user") + systemProperty "password", System.getProperty("password") + +// // Only rest case can run with remote cluster +// if (System.getProperty("tests.rest.cluster") != null) { +// filter { +// includeTestsMatching "org.opensearch.flowframework.rest.*IT" +// } +// } +// +// if (System.getProperty("https") == null || System.getProperty("https") == "false") { +// filter { +// } +// } + + filter { + excludeTestsMatching "org.opensearch.flowframework.indices.*Tests" + } + // The --debug-jvm command-line option makes the cluster debuggable; this makes the tests debuggable if (System.getProperty("test.debug") != null) { jvmArgs '-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=*:5005' diff --git a/src/main/java/org/opensearch/flowframework/constant/CommonName.java b/src/main/java/org/opensearch/flowframework/constant/CommonName.java new file mode 100644 index 000000000..701fa6d8a --- /dev/null +++ b/src/main/java/org/opensearch/flowframework/constant/CommonName.java @@ -0,0 +1,17 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.constant; + +/** + * Representation of common names that used across the project + */ +public class CommonName { + public static final String GLOBAL_CONTEXT_INDEX_NAME = ".opensearch-flow-framework-global-context"; + +} diff --git a/src/main/java/org/opensearch/flowframework/constant/CommonValue.java b/src/main/java/org/opensearch/flowframework/constant/CommonValue.java new file mode 100644 index 000000000..c166833f9 --- /dev/null +++ b/src/main/java/org/opensearch/flowframework/constant/CommonValue.java @@ -0,0 +1,91 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.constant; + +/** + * Representation of common values that are used across project + */ +public class CommonValue { + + public static Integer NO_SCHEMA_VERSION = 0; + public static final String META = "_meta"; + public static final String SCHEMA_VERSION_FIELD = "schema_version"; + public static final Integer GLOBAL_CONTEXT_INDEX_SCHEMA_VERSION = 1; + public static final String GLOBAL_CONTEXT_INDEX_MAPPING = "{\n " + + " \"dynamic\": false,\n" + + " \"_meta\": {\n" + + " \"schema_version\": " + + GLOBAL_CONTEXT_INDEX_SCHEMA_VERSION + + "\n" + + " },\n" + + " \"properties\": {\n" + + " \"pipeline_id\": {\n" + + " \"type\": \"keyword\"\n" + + " },\n" + + " \"name\": {\n" + + " \"type\": \"text\",\n" + + " \"fields\": {\n" + + " \"keyword\": {\n" + + " \"type\": \"keyword\",\n" + + " \"ignore_above\": 256\n" + + " }\n" + + " }\n" + + " },\n" + + " \"description\": {\n" + + " \"type\": \"text\"\n" + + " },\n" + + " \"use_case\": {\n" + + " \"type\": \"keyword\"\n" + + " },\n" + + " \"operations\": {\n" + + " \"type\": \"keyword\"\n" + + " },\n" + + " \"version\": {\n" + + " \"type\": \"nested\",\n" + + " \"properties\": {\n" + + " \"template\": {\n" + + " \"type\": \"integer\"\n" + + " },\n" + + " \"compatibility\": {\n" + + " \"type\": \"integer\"\n" + + " }\n" + + " }\n" + + " },\n" + + " \"user_inputs\": {\n" + + " \"type\": \"nested\",\n" + + " \"properties\": {\n" + + " \"model_id\": {\n" + + " \"type\": \"keyword\"\n" + + " },\n" + + " \"input_field\": {\n" + + " \"type\": \"keyword\"\n" + + " },\n" + + " \"output_field\": {\n" + + " \"type\": \"keyword\"\n" + + " },\n" + + " \"ingest_pipeline_name\": {\n" + + " \"type\": \"keyword\"\n" + + " },\n" + + " \"index_name\": {\n" + + " \"type\": \"keyword\"\n" + + " }\n" + + " }\n" + + " },\n" + + " \"workflows\": {\n" + + " \"type\": \"text\"\n" + + " },\n" + + " \"responses\": {\n" + + " \"type\": \"text\"\n" + + " },\n" + + " \"resources_created\": {\n" + + " \"type\": \"text\"\n" + + " }\n" + + " }\n" + + "}"; +} diff --git a/src/main/java/org/opensearch/flowframework/exception/FlowFrameworkException.java b/src/main/java/org/opensearch/flowframework/exception/FlowFrameworkException.java new file mode 100644 index 000000000..899bbffc9 --- /dev/null +++ b/src/main/java/org/opensearch/flowframework/exception/FlowFrameworkException.java @@ -0,0 +1,40 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.exception; + +/** + * Representation of Flow Framework Exceptions + */ +public class FlowFrameworkException extends RuntimeException { + /** + * Constructor with error message. + * + * @param message message of the exception + */ + public FlowFrameworkException(String message) { + super(message); + } + + /** + * Constructor with specified cause. + * @param cause exception cause + */ + public FlowFrameworkException(Throwable cause) { + super(cause); + } + + /** + * Constructor with specified error message adn cause. + * @param message error message + * @param cause exception cause + */ + public FlowFrameworkException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndex.java b/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndex.java new file mode 100644 index 000000000..8dd9e1f30 --- /dev/null +++ b/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndex.java @@ -0,0 +1,42 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.indices; + +import static org.opensearch.flowframework.constant.CommonName.GLOBAL_CONTEXT_INDEX_NAME; +import static org.opensearch.flowframework.constant.CommonValue.GLOBAL_CONTEXT_INDEX_MAPPING; +import static org.opensearch.flowframework.constant.CommonValue.GLOBAL_CONTEXT_INDEX_SCHEMA_VERSION; + +/** + * An enumeration of Flow Framework indices + */ +public enum FlowFrameworkIndex { + GLOBAL_CONTEXT(GLOBAL_CONTEXT_INDEX_NAME, GLOBAL_CONTEXT_INDEX_MAPPING, GLOBAL_CONTEXT_INDEX_SCHEMA_VERSION); + + private final String indexName; + private final String mapping; + private final Integer version; + + FlowFrameworkIndex(String name, String mapping, Integer version) { + this.indexName = name; + this.mapping = mapping; + this.version = version; + } + + public String getIndexName() { + return indexName; + } + + public String getMapping() { + return mapping; + } + + public Integer getVersion() { + return version; + } +} diff --git a/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java b/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java new file mode 100644 index 000000000..ca64700d7 --- /dev/null +++ b/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java @@ -0,0 +1,176 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.indices; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.admin.indices.create.CreateIndexRequest; +import org.opensearch.action.admin.indices.create.CreateIndexResponse; +import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest; +import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest; +import org.opensearch.client.Client; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.core.action.ActionListener; +import org.opensearch.flowframework.exception.FlowFrameworkException; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.opensearch.flowframework.constant.CommonValue.META; +import static org.opensearch.flowframework.constant.CommonValue.NO_SCHEMA_VERSION; +import static org.opensearch.flowframework.constant.CommonValue.SCHEMA_VERSION_FIELD; + +/** + * Flow Framework Indices Handler + */ +public class FlowFrameworkIndicesHandler { + + private static final Logger logger = LogManager.getLogger(FlowFrameworkIndicesHandler.class); + private static final Map indexSettings = Map.of("index.auto_expand_replicas", "0-1"); + private static final Map indexMappingUpdated = new HashMap<>(); + + private ClusterService clusterService; + private Client client; + + /** + * Handler constructor + * @param clusterService cluster service + * @param client client + */ + public FlowFrameworkIndicesHandler(ClusterService clusterService, Client client) { + this.clusterService = clusterService; + this.client = client; + } + + static { + for (FlowFrameworkIndex flowFrameworkIndex : FlowFrameworkIndex.values()) { + indexMappingUpdated.put(flowFrameworkIndex.getIndexName(), new AtomicBoolean(false)); + } + } + + /** + * Initiate global context index if it's absent + * @param listener action listner + */ + public void initGlobalContextIndexIfAbsent(ActionListener listener) { + initFlowFrameworkIndexIfAbsent(FlowFrameworkIndex.GLOBAL_CONTEXT, listener); + } + + /** + * General method for initiate flow framework indices or update index mapping if it's needed + * @param index flow framework index + * @param listener action listener + */ + public void initFlowFrameworkIndexIfAbsent(FlowFrameworkIndex index, ActionListener listener) { + String indexName = index.getIndexName(); + String mapping = index.getMapping(); + + try (ThreadContext.StoredContext threadContext = client.threadPool().getThreadContext().stashContext()) { + ActionListener internalListener = ActionListener.runBefore(listener, () -> threadContext.restore()); + if (!clusterService.state().metadata().hasIndex(indexName)) { + ActionListener actionListener = ActionListener.wrap(r -> { + if (r.isAcknowledged()) { + logger.info("create index:{}", indexName); + internalListener.onResponse(true); + } else { + internalListener.onResponse(false); + } + }, e -> { + logger.error("Failed to create index " + indexName, e); + internalListener.onFailure(e); + }); + CreateIndexRequest request = new CreateIndexRequest(indexName).mapping(mapping).settings(indexSettings); + client.admin().indices().create(request, actionListener); + } else { + logger.debug("index:{} is already created", indexName); + if (indexMappingUpdated.containsKey(indexName) && !indexMappingUpdated.get(indexName).get()) { + shouldUpdateIndex(indexName, index.getVersion(), ActionListener.wrap(r -> { + if (r) { + // return true if update index is needed + client.admin() + .indices() + .putMapping( + new PutMappingRequest().indices(indexName).source(mapping, XContentType.JSON), + ActionListener.wrap(response -> { + if (response.isAcknowledged()) { + UpdateSettingsRequest updateSettingRequest = new UpdateSettingsRequest(); + updateSettingRequest.indices(indexName).settings(indexSettings); + client.admin() + .indices() + .updateSettings(updateSettingRequest, ActionListener.wrap(updateResponse -> { + if (response.isAcknowledged()) { + indexMappingUpdated.get(indexName).set(true); + internalListener.onResponse(true); + } else { + internalListener.onFailure( + new FlowFrameworkException("Failed to update index setting for: " + indexName) + ); + } + }, exception -> { + logger.error("Failed to update index setting for: " + indexName, exception); + internalListener.onFailure(exception); + })); + } else { + internalListener.onFailure(new FlowFrameworkException("Failed to update index: " + indexName)); + } + }, exception -> { + logger.error("Failed to update index " + indexName, exception); + internalListener.onFailure(exception); + }) + ); + } else { + // no need to update index if it does not exist or the version is already up-to-date. + indexMappingUpdated.get(indexName).set(true); + internalListener.onResponse(true); + } + }, e -> { + logger.error("Failed to update index mapping", e); + internalListener.onFailure(e); + })); + } else { + // No need to update index if it's already updated. + internalListener.onResponse(true); + } + } + } catch (Exception e) { + logger.error("Failed to init index " + indexName, e); + listener.onFailure(e); + } + } + + /** + * Check if we should update index based on schema version. + * @param indexName index name + * @param newVersion new index mapping version + * @param listener action listener, if update index is needed, will pass true to its onResponse method + */ + private void shouldUpdateIndex(String indexName, Integer newVersion, ActionListener listener) { + IndexMetadata indexMetaData = clusterService.state().getMetadata().indices().get(indexName); + if (indexMetaData == null) { + listener.onResponse(Boolean.FALSE); + return; + } + Integer oldVersion = NO_SCHEMA_VERSION; + Map indexMapping = indexMetaData.mapping().getSourceAsMap(); + Object meta = indexMapping.get(META); + if (meta != null && meta instanceof Map) { + @SuppressWarnings("unchecked") + Map metaMapping = (Map) meta; + Object schemaVersion = metaMapping.get(SCHEMA_VERSION_FIELD); + if (schemaVersion instanceof Integer) { + oldVersion = (Integer) schemaVersion; + } + } + listener.onResponse(newVersion > oldVersion); + } +}