From 718704da0e48072fe27aae763bc5cdbce15c130b Mon Sep 17 00:00:00 2001 From: Daniel Widdis Date: Mon, 25 Sep 2023 08:13:17 -0700 Subject: [PATCH 1/3] Change WorkflowData from interface to class Signed-off-by: Daniel Widdis --- .../java/demo/CreateIndexWorkflowStep.java | 7 +-- .../template/TemplateParser.java | 19 ++------ .../flowframework/workflow/WorkflowData.java | 48 ++++++++++++++----- .../workflow/WorkflowDataTests.java | 20 ++++++-- 4 files changed, 56 insertions(+), 38 deletions(-) diff --git a/src/main/java/demo/CreateIndexWorkflowStep.java b/src/main/java/demo/CreateIndexWorkflowStep.java index c1a79188b..6b2ab0a7b 100644 --- a/src/main/java/demo/CreateIndexWorkflowStep.java +++ b/src/main/java/demo/CreateIndexWorkflowStep.java @@ -65,12 +65,7 @@ public CompletableFuture execute(List data) { } catch (InterruptedException e) {} // Simulate response of created index CreateIndexResponse response = new CreateIndexResponse(true, true, inputIndex); - future.complete(new WorkflowData() { - @Override - public Map getContent() { - return Map.of("index", response.index()); - } - }); + future.complete(new WorkflowData(Map.of("index", response.index()))); }); return future; diff --git a/src/main/java/org/opensearch/flowframework/template/TemplateParser.java b/src/main/java/org/opensearch/flowframework/template/TemplateParser.java index bce07c616..56635f1b4 100644 --- a/src/main/java/org/opensearch/flowframework/template/TemplateParser.java +++ b/src/main/java/org/opensearch/flowframework/template/TemplateParser.java @@ -76,21 +76,10 @@ public static List parseJsonGraphToSequence(String json, Map getContent() { - // See CreateIndexRequest ParseFields for source of content keys needed - return Map.of("mappings", request.mappings(), "settings", request.settings(), "aliases", request.aliases()); - } - - @Override - public Map getParams() { - // See RestCreateIndexAction for source of param keys needed - return Map.of("index", request.index()); - } - - }; + inputData = new WorkflowData( + Map.of("mappings", request.mappings(), "settings", request.settings(), "aliases", request.aliases()), + Map.of("index", request.index()) + ); } nodes.add(new ProcessNode(nodeId, workflowStep, inputData)); } diff --git a/src/main/java/org/opensearch/flowframework/workflow/WorkflowData.java b/src/main/java/org/opensearch/flowframework/workflow/WorkflowData.java index 09eb041fc..fbe4a5708 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/WorkflowData.java +++ b/src/main/java/org/opensearch/flowframework/workflow/WorkflowData.java @@ -12,29 +12,53 @@ import java.util.Map; /** - * Interface representing data provided as input to, and produced as output from, {@link WorkflowStep}s. + * Class encapsulating data provided as input to, and produced as output from, {@link WorkflowStep}s. */ -public interface WorkflowData { +public class WorkflowData { /** * An object representing no data, useful when a workflow step has no required input or output. */ - WorkflowData EMPTY = new WorkflowData() { - }; + public static WorkflowData EMPTY = new WorkflowData(); + + private final Map content; + private final Map params; + + private WorkflowData() { + this(Collections.emptyMap(), Collections.emptyMap()); + } + + /** + * Instantiate this object with content and empty params. + * @param content The content map + */ + public WorkflowData(Map content) { + this(content, Collections.emptyMap()); + } + + /** + * Instantiate this object with content and params. + * @param content The content map + * @param params The params map + */ + public WorkflowData(Map content, Map params) { + this.content = Map.copyOf(content); + this.params = Map.copyOf(params); + } /** - * Accesses a map containing the content of the workflow step. This represents the data associated with a Rest API request. - * @return the content of this step. + * Returns a map which represents the content associated with a Rest API request or response. + * @return the content of this data. */ - default Map getContent() { - return Collections.emptyMap(); + public Map getContent() { + return this.content; }; /** - * Accesses a map containing the params of this workflow step. This represents the params associated with a Rest API request, parsed from the URI. - * @return the params of this step. + * Returns a map represents the params associated with a Rest API request, parsed from the URI. + * @return the params of this data. */ - default Map getParams() { - return Collections.emptyMap(); + public Map getParams() { + return this.params; }; } diff --git a/src/test/java/org/opensearch/flowframework/workflow/WorkflowDataTests.java b/src/test/java/org/opensearch/flowframework/workflow/WorkflowDataTests.java index 42a1a1a03..e2464dace 100644 --- a/src/test/java/org/opensearch/flowframework/workflow/WorkflowDataTests.java +++ b/src/test/java/org/opensearch/flowframework/workflow/WorkflowDataTests.java @@ -10,7 +10,7 @@ import org.opensearch.test.OpenSearchTestCase; -import java.util.Collections; +import java.util.Map; public class WorkflowDataTests extends OpenSearchTestCase { @@ -20,9 +20,19 @@ public void setUp() throws Exception { } public void testWorkflowData() { - WorkflowData data = new WorkflowData() { - }; - assertEquals(Collections.emptyMap(), data.getParams()); - assertEquals(Collections.emptyMap(), data.getContent()); + + WorkflowData empty = WorkflowData.EMPTY; + assertTrue(empty.getParams().isEmpty()); + assertTrue(empty.getContent().isEmpty()); + + Map expectedContent = Map.of("baz", new String[] { "qux", "quxx" }); + WorkflowData contentOnly = new WorkflowData(expectedContent); + assertTrue(contentOnly.getParams().isEmpty()); + assertEquals(expectedContent, contentOnly.getContent()); + + Map expectedParams = Map.of("foo", "bar"); + WorkflowData contentAndParams = new WorkflowData(expectedContent, expectedParams); + assertEquals(expectedParams, contentAndParams.getParams()); + assertEquals(expectedContent, contentAndParams.getContent()); } } From e4a921d7da1d9143aa5fc0a87229f5ec2dbf621d Mon Sep 17 00:00:00 2001 From: Daniel Widdis Date: Mon, 25 Sep 2023 08:36:33 -0700 Subject: [PATCH 2/3] Disable flaky test Signed-off-by: Daniel Widdis --- .../flowframework/template/ProcessNodeTests.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/test/java/org/opensearch/flowframework/template/ProcessNodeTests.java b/src/test/java/org/opensearch/flowframework/template/ProcessNodeTests.java index 3feab9f3b..d9f365708 100644 --- a/src/test/java/org/opensearch/flowframework/template/ProcessNodeTests.java +++ b/src/test/java/org/opensearch/flowframework/template/ProcessNodeTests.java @@ -19,7 +19,6 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; @ThreadLeakScope(Scope.NONE) public class ProcessNodeTests extends OpenSearchTestCase { @@ -49,12 +48,13 @@ public String getName() { assertEquals(Collections.emptySet(), nodeA.getPredecessors()); assertEquals("A", nodeA.toString()); - // TODO: Once we can get OpenSearch Thread Pool for this execute method, create an IT and don't test execute here - CompletableFuture f = nodeA.execute(); - assertEquals(f, nodeA.getFuture()); - f.orTimeout(5, TimeUnit.SECONDS); - assertTrue(f.isDone()); - assertEquals(WorkflowData.EMPTY, f.get()); + // TODO: This test is flaky on Windows. Disabling until thread pool is integrated + // https://github.com/opensearch-project/opensearch-ai-flow-framework/issues/42 + // CompletableFuture f = nodeA.execute(); + // assertEquals(f, nodeA.future()); + // f.orTimeout(5, TimeUnit.SECONDS); + // assertTrue(f.isDone()); + // assertEquals(WorkflowData.EMPTY, f.get()); ProcessNode nodeB = new ProcessNode("B", null); assertNotEquals(nodeA, nodeB); From ee99427c39df689bca10ea027039d7f5d8fa8aa4 Mon Sep 17 00:00:00 2001 From: Daniel Widdis Date: Mon, 25 Sep 2023 12:00:11 -0700 Subject: [PATCH 3/3] Rebase with changes from #38 Signed-off-by: Daniel Widdis --- .../workflow/CreateIndex/CreateIndexStep.java | 7 +------ .../workflow/CreateIndex/CreateIndexStepTests.java | 10 +--------- 2 files changed, 2 insertions(+), 15 deletions(-) diff --git a/src/main/java/org/opensearch/flowframework/workflow/CreateIndex/CreateIndexStep.java b/src/main/java/org/opensearch/flowframework/workflow/CreateIndex/CreateIndexStep.java index ebef2cae8..7f92b8057 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/CreateIndex/CreateIndexStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/CreateIndex/CreateIndexStep.java @@ -52,12 +52,7 @@ public CompletableFuture execute(List data) { @Override public void onResponse(CreateIndexResponse createIndexResponse) { logger.info("created index: {}", createIndexResponse.index()); - future.complete(new WorkflowData() { - @Override - public Map getContent() { - return Map.of("index-name", createIndexResponse.index()); - } - }); + future.complete(new WorkflowData(Map.of("index-name", createIndexResponse.index()))); } @Override diff --git a/src/test/java/org/opensearch/flowframework/workflow/CreateIndex/CreateIndexStepTests.java b/src/test/java/org/opensearch/flowframework/workflow/CreateIndex/CreateIndexStepTests.java index 638dea251..c5d680a94 100644 --- a/src/test/java/org/opensearch/flowframework/workflow/CreateIndex/CreateIndexStepTests.java +++ b/src/test/java/org/opensearch/flowframework/workflow/CreateIndex/CreateIndexStepTests.java @@ -45,15 +45,7 @@ public class CreateIndexStepTests extends OpenSearchTestCase { public void setUp() throws Exception { super.setUp(); - inputData = new WorkflowData() { - - @Override - public Map getContent() { - return Map.ofEntries(Map.entry("index-name", "demo"), Map.entry("type", "knn")); - } - - }; - + inputData = new WorkflowData(Map.ofEntries(Map.entry("index-name", "demo"), Map.entry("type", "knn"))); client = mock(Client.class); adminClient = mock(AdminClient.class); indicesAdminClient = mock(IndicesAdminClient.class);