Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Change WorkflowData from interface to class #58

Merged
merged 1 commit into from
Sep 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions src/main/java/demo/CreateIndexWorkflowStep.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,7 @@ public CompletableFuture<WorkflowData> execute(List<WorkflowData> data) {
} catch (InterruptedException e) {}
// Simulate response of created index
CreateIndexResponse response = new CreateIndexResponse(true, true, inputIndex);
future.complete(new WorkflowData() {
@Override
public Map<String, Object> getContent() {
return Map.of("index", response.index());
}
});
future.complete(new WorkflowData(Map.of("index", response.index())));
});

return future;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,21 +76,10 @@ public static List<ProcessNode> parseJsonGraphToSequence(String json, Map<String
WorkflowData inputData = WorkflowData.EMPTY;
if (List.of("create_index", "create_another_index").contains(nodeId)) {
CreateIndexRequest request = new CreateIndexRequest(nodeObject.get("index_name").getAsString());
inputData = new WorkflowData() {

@Override
public Map<String, Object> getContent() {
// See CreateIndexRequest ParseFields for source of content keys needed
return Map.of("mappings", request.mappings(), "settings", request.settings(), "aliases", request.aliases());
}

@Override
public Map<String, String> getParams() {
// See RestCreateIndexAction for source of param keys needed
return Map.of("index", request.index());
}

};
inputData = new WorkflowData(
Map.of("mappings", request.mappings(), "settings", request.settings(), "aliases", request.aliases()),
Map.of("index", request.index())
);
}
nodes.add(new ProcessNode(nodeId, workflowStep, inputData));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,7 @@ public CompletableFuture<WorkflowData> execute(List<WorkflowData> data) {
@Override
public void onResponse(CreateIndexResponse createIndexResponse) {
logger.info("created index: {}", createIndexResponse.index());
future.complete(new WorkflowData() {
@Override
public Map<String, Object> getContent() {
return Map.of("index-name", createIndexResponse.index());
}
});
future.complete(new WorkflowData(Map.of("index-name", createIndexResponse.index())));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,29 +12,53 @@
import java.util.Map;

/**
* Interface representing data provided as input to, and produced as output from, {@link WorkflowStep}s.
* Class encapsulating data provided as input to, and produced as output from, {@link WorkflowStep}s.
*/
public interface WorkflowData {
public class WorkflowData {

/**
* An object representing no data, useful when a workflow step has no required input or output.
*/
WorkflowData EMPTY = new WorkflowData() {
};
public static WorkflowData EMPTY = new WorkflowData();

private final Map<String, Object> content;
private final Map<String, String> params;

private WorkflowData() {
this(Collections.emptyMap(), Collections.emptyMap());
}

/**
* Instantiate this object with content and empty params.
* @param content The content map
*/
public WorkflowData(Map<String, Object> content) {
this(content, Collections.emptyMap());
}

/**
* Instantiate this object with content and params.
* @param content The content map
* @param params The params map
*/
public WorkflowData(Map<String, Object> content, Map<String, String> params) {
this.content = Map.copyOf(content);
this.params = Map.copyOf(params);
}

/**
* Accesses a map containing the content of the workflow step. This represents the data associated with a Rest API request.
* @return the content of this step.
* Returns a map which represents the content associated with a Rest API request or response.
* @return the content of this data.
*/
default Map<String, Object> getContent() {
return Collections.emptyMap();
public Map<String, Object> getContent() {
return this.content;
};

/**
* Accesses a map containing the params of this workflow step. This represents the params associated with a Rest API request, parsed from the URI.
* @return the params of this step.
* Returns a map represents the params associated with a Rest API request, parsed from the URI.
* @return the params of this data.
*/
default Map<String, String> getParams() {
return Collections.emptyMap();
public Map<String, String> getParams() {
return this.params;
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

@ThreadLeakScope(Scope.NONE)
public class ProcessNodeTests extends OpenSearchTestCase {
Expand Down Expand Up @@ -49,12 +48,13 @@ public String getName() {
assertEquals(Collections.emptySet(), nodeA.getPredecessors());
assertEquals("A", nodeA.toString());

// TODO: Once we can get OpenSearch Thread Pool for this execute method, create an IT and don't test execute here
CompletableFuture<WorkflowData> f = nodeA.execute();
assertEquals(f, nodeA.getFuture());
f.orTimeout(5, TimeUnit.SECONDS);
assertTrue(f.isDone());
assertEquals(WorkflowData.EMPTY, f.get());
// TODO: This test is flaky on Windows. Disabling until thread pool is integrated
// https://github.com/opensearch-project/opensearch-ai-flow-framework/issues/42
// CompletableFuture<WorkflowData> f = nodeA.execute();
// assertEquals(f, nodeA.future());
// f.orTimeout(5, TimeUnit.SECONDS);
// assertTrue(f.isDone());
// assertEquals(WorkflowData.EMPTY, f.get());

ProcessNode nodeB = new ProcessNode("B", null);
assertNotEquals(nodeA, nodeB);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,7 @@ public class CreateIndexStepTests extends OpenSearchTestCase {
public void setUp() throws Exception {
super.setUp();

inputData = new WorkflowData() {

@Override
public Map<String, Object> getContent() {
return Map.ofEntries(Map.entry("index-name", "demo"), Map.entry("type", "knn"));
}

};

inputData = new WorkflowData(Map.ofEntries(Map.entry("index-name", "demo"), Map.entry("type", "knn")));
client = mock(Client.class);
adminClient = mock(AdminClient.class);
indicesAdminClient = mock(IndicesAdminClient.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

import org.opensearch.test.OpenSearchTestCase;

import java.util.Collections;
import java.util.Map;

public class WorkflowDataTests extends OpenSearchTestCase {

Expand All @@ -20,9 +20,19 @@ public void setUp() throws Exception {
}

public void testWorkflowData() {
WorkflowData data = new WorkflowData() {
};
assertEquals(Collections.emptyMap(), data.getParams());
assertEquals(Collections.emptyMap(), data.getContent());

WorkflowData empty = WorkflowData.EMPTY;
assertTrue(empty.getParams().isEmpty());
assertTrue(empty.getContent().isEmpty());

Map<String, Object> expectedContent = Map.of("baz", new String[] { "qux", "quxx" });
WorkflowData contentOnly = new WorkflowData(expectedContent);
assertTrue(contentOnly.getParams().isEmpty());
assertEquals(expectedContent, contentOnly.getContent());

Map<String, String> expectedParams = Map.of("foo", "bar");
WorkflowData contentAndParams = new WorkflowData(expectedContent, expectedParams);
assertEquals(expectedParams, contentAndParams.getParams());
assertEquals(expectedContent, contentAndParams.getContent());
}
}