Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add global context index and indices handler #43

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ repositories {
dependencies {
implementation "org.opensearch:opensearch:${opensearch_version}"
implementation 'org.junit.jupiter:junit-jupiter:5.10.0'
implementation 'org.projectlombok:lombok:1.18.22'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should have a separate discussion/issue/RFC on whether we want to use a code generation tool like Lombok, and figure out what modifications we need to make to our CI to support it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed lombok dependency for now. It's not needed in this pr. I will open a discuss and figure out the XContent question before adding this dependency

compileOnly "com.google.guava:guava:32.1.2-jre"
api group: 'org.opensearch', name:'opensearch-ml-client', version: "${opensearch_build}"

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.constant;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious about this choice of package name. Given the class names, seems common might be better?


public class CommonName {
public static final String GLOBAL_CONTEXT_INDEX_NAME = ".opensearch-flow-framework-global-context";

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.constant;

public class CommonValue {

public static Integer NO_SCHEMA_VERSION = 0;
public static final String META = "_meta";
public static final String SCHEMA_VERSION_FIELD = "schema_version";
public static final Integer GLOBAL_CONTEXT_INDEX_SCHEMA_VERSION = 1;
public static final String GLOBAL_CONTEXT_INDEX_MAPPING =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we put this in src/main/resources and read it into this constant off the class path? See here.

Copy link
Collaborator Author

@jackiehanyang jackiehanyang Sep 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with putting this index mapping in either path; I believe it's just a matter of coding style. However, I do prefer to put it in under src/main/java cause it can save us from setting up methods to read from src/main/resources, something like getMapping() ML-Commons is using this approach as well, see here

"{\n " +
" \"dynamic\": false,\n" +
" \"_meta\": {\n" +
" \"schema_version\": 1\n" +
" },\n" +
" \"properties\": {\n" +
" \"pipeline_id\": {\n" +
" \"type\": \"keyword\"\n"+
" },\n" +
" \"name\": {\n" +
" \"type\": \"text\",\n" +
" \"fields\": {\n" +
" \"keyword\": {\n" +
" \"type\": \"keyword\",\n" +
" \"ignore_above\": 256\n" +
" }\n" +
" }\n" +
" },\n" +
" \"description\": {\n" +
" \"type\": \"text\"\n" +
" },\n" +
" \"use_case\": {\n" +
" \"type\": \"keyword\"\n" +
" },\n" +
" \"operations\": {\n" +
" \"type\": \"keyword\"\n" +
" },\n" +
" \"version\": {\n" +
" \"type\": \"nested\",\n" +
" \"properties\": {\n" +
" \"template\": {\n" +
" \"type\": \"integer\"\n" +
" },\n" +
" \"compatibility\": {\n" +
" \"type\": \"integer\"\n" +
" }\n" +
" }\n" +
" },\n" +
" \"user_inputs\": {\n" +
" \"type\": \"nested\",\n" +
" \"properties\": {\n" +
" \"model_id\": {\n" +
" \"type\": \"keyword\"\n" +
" },\n" +
" \"input_field\": {\n" +
" \"type\": \"keyword\"\n" +
" },\n" +
" \"output_field\": {\n" +
" \"type\": \"keyword\"\n" +
" },\n" +
" \"ingest_pipeline_name\": {\n" +
" \"type\": \"keyword\"\n" +
" },\n" +
" \"index_name\": {\n" +
" \"type\": \"keyword\"\n" +
" }\n" +
" }\n" +
" },\n" +
" \"workflows\": {\n" +
" \"type\": \"text\"\n" +
" },\n" +
" \"responses\": {\n" +
" \"type\": \"text\"\n" +
" }\n" +
" \"resources_created\": {\n" +
" \"type\": \"text\"\n" +
" }\n" +
" }\n" +
"}";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.exception;

public class FlowFrameworkException extends RuntimeException {
/**
* Constructor with error message.
*
* @param message message of the exception
*/
public FlowFrameworkException(String message) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea of having our own custom exceptions. Given that these will eventually need to be processed in a RestResponse, can we consider including the appropriate return code as part of constructing this exception? For example, "index not found" should return RestStatus.NOT_FOUND etc.

super(message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.indices;

import static org.opensearch.flowframework.constant.CommonName.GLOBAL_CONTEXT_INDEX_NAME;
import static org.opensearch.flowframework.constant.CommonValue.GLOBAL_CONTEXT_INDEX_MAPPING;
import static org.opensearch.flowframework.constant.CommonValue.GLOBAL_CONTEXT_INDEX_SCHEMA_VERSION;

public enum FlowFrameworkIndex {
GLOBAL_CONTEXT(
GLOBAL_CONTEXT_INDEX_NAME,
GLOBAL_CONTEXT_INDEX_MAPPING,
GLOBAL_CONTEXT_INDEX_SCHEMA_VERSION
);

private final String indexName;
private final String mapping;
private final Integer version;

FlowFrameworkIndex(String name, String mapping, Integer version) {
this.indexName = name;
this.mapping = mapping;
this.version = version;
}

public String getIndexName() {
return indexName;
}

public String getMapping() {
return mapping;
}

public Integer getVersion() {
return version;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.indices;

import lombok.AccessLevel;
import lombok.RequiredArgsConstructor;
import lombok.experimental.FieldDefaults;
import lombok.extern.log4j.Log4j2;

import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.action.ActionListener;
import org.opensearch.flowframework.exception.FlowFrameworkException;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.opensearch.flowframework.constant.CommonValue.*;

@FieldDefaults(makeFinal = true, level = AccessLevel.PRIVATE)
@RequiredArgsConstructor
@Log4j2
public class FlowFrameworkIndicesHandler {

ClusterService clusterService;
Client client;
private static final Map<String, Object> indexSettings = Map.of("index.auto_expand_replicas", "0-1");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the justification for this auto_expand_replicas setting, just trying to understand why this is necessary

private static final Map<String, AtomicBoolean> indexMappingUpdated = new HashMap<>();

static {
for (FlowFrameworkIndex flowFrameworkIndex : FlowFrameworkIndex.values()) {
indexMappingUpdated.put(flowFrameworkIndex.getIndexName(), new AtomicBoolean(false));
}
}

public void initGlobalContextIndexIfAbsent(ActionListener<Boolean> listener) {
initFlowFrameworkIndexIfAbsent(FlowFrameworkIndex.GLOBAL_CONTEXT, listener);
}

public void initFlowFrameworkIndexIfAbsent(FlowFrameworkIndex index, ActionListener<Boolean> listener) {
String indexName = index.getIndexName();
String mapping = index.getMapping();

try (ThreadContext.StoredContext threadContext = client.threadPool().getThreadContext().stashContext()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I understand about security, we only need to stash the thread context if this particular index is managed by the security plugin's list of protected indices. This allows the plugin to do CRUD operations on this protected index. Is the plan to add all flow framework indices to the security plugin's list of protected indices, and if not, why stash the thread context?

ActionListener<Boolean> internalListener = ActionListener.runBefore(listener, () -> threadContext.restore());
if (!clusterService.state().metadata().hasIndex(indexName)) {
ActionListener<CreateIndexResponse> actionListener = ActionListener.wrap(r -> {
if (r.isAcknowledged()) {
log.info("create index:{}", indexName);
internalListener.onResponse(true);
} else {
internalListener.onResponse(false);
}
}, e -> {
log.error("Failed to create index " + indexName, e);
internalListener.onFailure(e);
});
CreateIndexRequest request = new CreateIndexRequest(indexName).mapping(mapping).settings(indexSettings);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does the mapping here require a mediatype? Looking at the javadoc it seems it expects JSON but needs a special format with a _doc key. We shold be consistent between our various implementations so all our mapping files look consistent.

client.admin().indices().create(request, actionListener);
} else {
log.debug("index:{} is already created", indexName);
if (indexMappingUpdated.containsKey(indexName) && !indexMappingUpdated.get(indexName).get()) {
shouldUpdateIndex(indexName, index.getVersion(), ActionListener.wrap(r -> {
if (r) {
// return true if update index is needed
client
.admin()
.indices()
.putMapping(
new PutMappingRequest().indices(indexName).source(mapping, XContentType.JSON),
ActionListener.wrap(response -> {
if (response.isAcknowledged()) {
UpdateSettingsRequest updateSettingRequest = new UpdateSettingsRequest();
updateSettingRequest.indices(indexName).settings(indexSettings);
client
.admin()
.indices()
.updateSettings(updateSettingRequest, ActionListener.wrap(updateResponse -> {
if (response.isAcknowledged()) {
indexMappingUpdated.get(indexName).set(true);
internalListener.onResponse(true);
} else {
internalListener
.onFailure(new FlowFrameworkException("Failed to update index setting for: " + indexName));
}
}, exception -> {
log.error("Failed to update index setting for: " + indexName, exception);
internalListener.onFailure(exception);
}));
} else {
internalListener.onFailure(new FlowFrameworkException("Failed to update index: " + indexName));
}
}, exception -> {
log.error("Failed to update index " + indexName, exception);
internalListener.onFailure(exception);
})
);
} else {
// no need to update index if it does not exist or the version is already up-to-date.
indexMappingUpdated.get(indexName).set(true);
internalListener.onResponse(true);
}
}, e -> {
log.error("Failed to update index mapping", e);
internalListener.onFailure(e);
}));
} else {
// No need to update index if it's already updated.
internalListener.onResponse(true);
}
}
} catch (Exception e) {
log.error("Failed to init index " + indexName, e);
listener.onFailure(e);
}
}

/**
* Check if we should update index based on schema version.
* @param indexName index name
* @param newVersion new index mapping version
* @param listener action listener, if update index is needed, will pass true to its onResponse method
*/
public void shouldUpdateIndex(String indexName, Integer newVersion, ActionListener<Boolean> listener) {
IndexMetadata indexMetaData = clusterService.state().getMetadata().indices().get(indexName);
if (indexMetaData == null) {
listener.onResponse(Boolean.FALSE);
return;
}
Integer oldVersion = NO_SCHEMA_VERSION;
Map<String, Object> indexMapping = indexMetaData.mapping().getSourceAsMap();
Object meta = indexMapping.get(META);
if (meta != null && meta instanceof Map) {
@SuppressWarnings("unchecked")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we have suppress warnings here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect because all we know is that meta is an Object that an instance of Map but we don't know the types of its key and value when they are cast.

Map<String, Object> metaMapping = (Map<String, Object>) meta;
Object schemaVersion = metaMapping.get(SCHEMA_VERSION_FIELD);
if (schemaVersion instanceof Integer) {
oldVersion = (Integer) schemaVersion;
}
}
listener.onResponse(newVersion > oldVersion);
}
}
Loading