Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce global-context index and related operations #65

Merged
merged 20 commits into from
Oct 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 13 additions & 13 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,20 @@ publishing {
publications {
pluginZip(MavenPublication) { publication ->
pom {
name = pluginName
description = pluginDescription
licenses {
license {
name = "The Apache License, Version 2.0"
url = "http://www.apache.org/licenses/LICENSE-2.0.txt"
name = pluginName
description = pluginDescription
licenses {
license {
name = "The Apache License, Version 2.0"
url = "http://www.apache.org/licenses/LICENSE-2.0.txt"
}
}
}
developers {
developer {
name = "OpenSearch AI Flow Framework Plugin"
url = "https://github.com/opensearch-project/opensearch-ai-flow-framework"
developers {
developer {
name = "OpenSearch AI Flow Framework Plugin"
url = "https://github.com/opensearch-project/opensearch-ai-flow-framework"
}
}
}
}
}
}
Expand Down Expand Up @@ -159,7 +159,7 @@ task updateVersion {
doLast {
ext.newVersion = System.getProperty('newVersion')
println "Setting version to ${newVersion}."
// String tokenization to support -SNAPSHOT
// String tokenization to support -SNAPSHOT
ant.replaceregexp(file:'build.gradle', match: '"opensearch.version", "\\d.*"', replace: '"opensearch.version", "' + newVersion.tokenize('-')[0] + '-SNAPSHOT"', flags:'g', byline:true)
}
}
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/demo/Demo.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.client.Client;
import org.opensearch.client.node.NodeClient;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.SuppressForbidden;
import org.opensearch.common.io.PathUtils;
import org.opensearch.common.settings.Settings;
Expand Down Expand Up @@ -56,8 +57,9 @@ public static void main(String[] args) throws IOException {
logger.error("Failed to read JSON at path {}", path);
return;
}
ClusterService clusterService = new ClusterService(null, null, null);
Client client = new NodeClient(null, null);
WorkflowStepFactory factory = new WorkflowStepFactory(client);
WorkflowStepFactory factory = new WorkflowStepFactory(clusterService, client);

ThreadPool threadPool = new ThreadPool(Settings.EMPTY);
WorkflowProcessSorter workflowProcessSorter = new WorkflowProcessSorter(factory, threadPool);
Expand Down
5 changes: 4 additions & 1 deletion src/main/java/demo/TemplateParseDemo.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.client.Client;
import org.opensearch.client.node.NodeClient;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.SuppressForbidden;
import org.opensearch.common.io.PathUtils;
import org.opensearch.common.settings.Settings;
Expand Down Expand Up @@ -52,8 +53,10 @@ public static void main(String[] args) throws IOException {
logger.error("Failed to read JSON at path {}", path);
return;
}
ClusterService clusterService = new ClusterService(null, null, null);
Client client = new NodeClient(null, null);
WorkflowStepFactory factory = new WorkflowStepFactory(client);

WorkflowStepFactory factory = new WorkflowStepFactory(clusterService, client);
ThreadPool threadPool = new ThreadPool(Settings.EMPTY);
WorkflowProcessSorter workflowProcessSorter = new WorkflowProcessSorter(factory, threadPool);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public Collection<Object> createComponents(
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<RepositoriesService> repositoriesServiceSupplier
) {
WorkflowStepFactory workflowStepFactory = new WorkflowStepFactory(client);
WorkflowStepFactory workflowStepFactory = new WorkflowStepFactory(clusterService, client);
WorkflowProcessSorter workflowProcessSorter = new WorkflowProcessSorter(workflowStepFactory, threadPool);

return ImmutableList.of(workflowStepFactory, workflowProcessSorter);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.common;

/**
* Representation of common values that are used across project
*/
public class CommonValue {

public static Integer NO_SCHEMA_VERSION = 0;
public static final String META = "_meta";
public static final String SCHEMA_VERSION_FIELD = "schema_version";
public static final String GLOBAL_CONTEXT_INDEX = ".plugins-ai-global-context";
public static final String GLOBAL_CONTEXT_INDEX_MAPPING = "mappings/global-context.json";
public static final Integer GLOBAL_CONTEXT_INDEX_VERSION = 1;
dbwiddis marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.common;

/**
* A supplier that can throw checked exception
*
* @param <T> method parameter type
* @param <E> Exception type
*/
@FunctionalInterface
public interface ThrowingSupplier<T, E extends Exception> {
/**
* Gets a result or throws an exception if unable to produce a result.
*
* @return the result
* @throws E if unable to produce a result
*/
T get() throws E;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.common;

import java.util.function.Supplier;

/**
* Wrapper for throwing checked exception inside places that does not allow to do so
*/
public class ThrowingSupplierWrapper {

private ThrowingSupplierWrapper() {}

/**
* Utility method to use a method throwing checked exception inside a place
* that does not allow throwing the corresponding checked exception (e.g.,
* enum initialization).
* Convert the checked exception thrown by throwingConsumer to a RuntimeException
* so that the compiler won't complain.
* @param <T> the method's return type
* @param throwingSupplier the method reference that can throw checked exception
* @return converted method reference
*/
public static <T> Supplier<T> throwingSupplierWrapper(ThrowingSupplier<T, Exception> throwingSupplier) {

return () -> {
try {
return throwingSupplier.get();
} catch (Exception ex) {
throw new RuntimeException(ex);
}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.exception;

import org.opensearch.core.rest.RestStatus;

/**
* Representation of Flow Framework Exceptions
*/
public class FlowFrameworkException extends RuntimeException {
dbwiddis marked this conversation as resolved.
Show resolved Hide resolved
jackiehanyang marked this conversation as resolved.
Show resolved Hide resolved

private static final long serialVersionUID = 1L;

private final RestStatus restStatus;

/**
* Constructor with error message.
*
* @param message message of the exception
* @param restStatus HTTP status code of the response
*/
public FlowFrameworkException(String message, RestStatus restStatus) {
super(message);
this.restStatus = restStatus;
}

/**
* Constructor with specified cause.
* @param cause exception cause
* @param restStatus HTTP status code of the response
*/
public FlowFrameworkException(Throwable cause, RestStatus restStatus) {
super(cause);
this.restStatus = restStatus;
}

/**
* Constructor with specified error message adn cause.
* @param message error message
* @param cause exception cause
* @param restStatus HTTP status code of the response
*/
public FlowFrameworkException(String message, Throwable cause, RestStatus restStatus) {
super(message, cause);
this.restStatus = restStatus;
}

/**
* Getter for restStatus.
*
* @return the HTTP status code associated with the exception
*/
public RestStatus getRestStatus() {
return restStatus;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.indices;

import org.opensearch.flowframework.common.ThrowingSupplierWrapper;

import java.util.function.Supplier;

import static org.opensearch.flowframework.common.CommonValue.GLOBAL_CONTEXT_INDEX;
import static org.opensearch.flowframework.common.CommonValue.GLOBAL_CONTEXT_INDEX_VERSION;

/**
* An enumeration of Flow Framework indices
*/
public enum FlowFrameworkIndex {
GLOBAL_CONTEXT(
GLOBAL_CONTEXT_INDEX,
ThrowingSupplierWrapper.throwingSupplierWrapper(GlobalContextHandler::getGlobalContextMappings),
dbwiddis marked this conversation as resolved.
Show resolved Hide resolved
GLOBAL_CONTEXT_INDEX_VERSION
);

private final String indexName;
private final String mapping;
private final Integer version;

FlowFrameworkIndex(String name, Supplier<String> mappingSupplier, Integer version) {
this.indexName = name;
this.mapping = mappingSupplier.get();
this.version = version;
}
dbwiddis marked this conversation as resolved.
Show resolved Hide resolved

public String getIndexName() {
return indexName;
}

public String getMapping() {
return mapping;
}

public Integer getVersion() {
return version;
}
}
Loading
Loading