Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add WorkflowStep Factory and implement XContent-based Template Parsing #47

Merged
merged 27 commits into from
Sep 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
24b3542
Add WorkflowStepFactory class
dbwiddis Sep 19, 2023
e70b9e8
Add XContent classes representing Template JSON
dbwiddis Sep 20, 2023
677cc24
Add parse methods for the Template XContent
dbwiddis Sep 21, 2023
57fa740
Cleanup parsing, javadocs, and demo output
dbwiddis Sep 21, 2023
1214ebc
Refactor to use field name constants, get tests working again
dbwiddis Sep 21, 2023
fd72b2d
Separate WorkflowNode and ProcessNode functionality
dbwiddis Sep 22, 2023
f2441b3
Fix demos to align with template field names
dbwiddis Sep 22, 2023
c7a2371
Add workflow, node, and edge tests
dbwiddis Sep 22, 2023
388741c
Add Template tests
dbwiddis Sep 22, 2023
df4a0d8
Refactor TemplateParser to WorkflowProcessSorter
dbwiddis Sep 22, 2023
8589cfa
Test exceptional cases
dbwiddis Sep 23, 2023
309d059
Finish up exceptional cases
dbwiddis Sep 23, 2023
2804a52
Fix a template field name bug in demo
dbwiddis Sep 25, 2023
d846cb9
Rebase with #34
dbwiddis Sep 25, 2023
078059b
Rebase changes from #54
dbwiddis Sep 25, 2023
90e95e3
Integrate thread pool executor service
dbwiddis Sep 26, 2023
3fa4d0a
Fix flaky ProcessNodeTests by removing orTimeout
dbwiddis Sep 26, 2023
62fa53c
Rebase and refactor with #44
dbwiddis Sep 26, 2023
6c86dd1
Fix demos and remove DataDemo
dbwiddis Sep 26, 2023
c11a5bf
Use non-deprecated mapping method for CreateIndexStep
dbwiddis Sep 26, 2023
adc349a
Eliminate casting and deprecation warnings on test classes
dbwiddis Sep 26, 2023
21b530b
Remove unused/leftover demo class
dbwiddis Sep 26, 2023
8d28308
Typo
dbwiddis Sep 26, 2023
555d88b
Don't offer steps as an alternative to nodes
dbwiddis Sep 26, 2023
5c78046
Move Workflow into package with all the other parsing classes
dbwiddis Sep 26, 2023
87685ff
Move process sequencing classes into workflow package
dbwiddis Sep 26, 2023
2fca71b
Add PipelineProcessor class and XContent parsing, rename package
dbwiddis Sep 27, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ repositories {
dependencies {
implementation "org.opensearch:opensearch:${opensearch_version}"
implementation 'org.junit.jupiter:junit-jupiter:5.10.0'
implementation "com.google.code.gson:gson:2.10.1"
owaiskazi19 marked this conversation as resolved.
Show resolved Hide resolved
implementation "com.google.guava:guava:32.1.2-jre"
api group: 'org.opensearch', name:'opensearch-ml-client', version: "${opensearch_build}"

Expand Down
78 changes: 0 additions & 78 deletions src/main/java/demo/CreateIndexWorkflowStep.java

This file was deleted.

85 changes: 0 additions & 85 deletions src/main/java/demo/DataDemo.java

This file was deleted.

41 changes: 19 additions & 22 deletions src/main/java/demo/Demo.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,48 +10,41 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.client.Client;
import org.opensearch.client.node.NodeClient;
import org.opensearch.common.SuppressForbidden;
import org.opensearch.common.io.PathUtils;
import org.opensearch.flowframework.template.ProcessNode;
import org.opensearch.flowframework.template.TemplateParser;
import org.opensearch.flowframework.workflow.WorkflowStep;
import org.opensearch.flowframework.model.Template;
import org.opensearch.flowframework.workflow.ProcessNode;
import org.opensearch.flowframework.workflow.WorkflowProcessSorter;
import org.opensearch.flowframework.workflow.WorkflowStepFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;

/**
* Demo class exercising {@link TemplateParser}. This will be moved to a unit test.
* Demo class exercising {@link WorkflowProcessSorter}. This will be moved to a unit test.
*/
public class Demo {

private static final Logger logger = LogManager.getLogger(Demo.class);

// This is temporary. We need a factory class to generate these workflow steps
// based on a field in the JSON.
private static Map<String, WorkflowStep> workflowMap = new HashMap<>();
static {
workflowMap.put("fetch_model", new DemoWorkflowStep(3000));
workflowMap.put("create_ingest_pipeline", new DemoWorkflowStep(3000));
workflowMap.put("create_search_pipeline", new DemoWorkflowStep(5000));
workflowMap.put("create_neural_search_index", new DemoWorkflowStep(2000));
}

/**
* Demonstrate parsing a JSON graph.
*
* @param args unused
* @throws IOException on a failure
*/
@SuppressForbidden(reason = "just a demo class that will be deleted")
public static void main(String[] args) {
public static void main(String[] args) throws IOException {
String path = "src/test/resources/template/demo.json";
String json;
try {
Expand All @@ -60,13 +53,18 @@ public static void main(String[] args) {
logger.error("Failed to read JSON at path {}", path);
return;
}
Client client = new NodeClient(null, null);
WorkflowStepFactory factory = WorkflowStepFactory.create(client);
ExecutorService executor = Executors.newFixedThreadPool(10);
WorkflowProcessSorter.create(factory, executor);

logger.info("Parsing graph to sequence...");
List<ProcessNode> processSequence = TemplateParser.parseJsonGraphToSequence(json, workflowMap);
Template t = Template.parse(json);
List<ProcessNode> processSequence = WorkflowProcessSorter.get().sortProcessNodes(t.workflows().get("demo"));
List<CompletableFuture<?>> futureList = new ArrayList<>();

for (ProcessNode n : processSequence) {
Set<ProcessNode> predecessors = n.getPredecessors();
List<ProcessNode> predecessors = n.predecessors();
logger.info(
"Queueing process [{}].{}",
n.id(),
Expand All @@ -78,11 +76,10 @@ public static void main(String[] args) {
predecessors.stream().map(p -> p.id()).collect(Collectors.joining(", "))
)
);
// TODO need to handle this better, passing an argument when we start them all at the beginning is silly
futureList.add(n.execute());
}
futureList.forEach(CompletableFuture::join);
logger.info("All done!");
executor.shutdown();
}

}
65 changes: 65 additions & 0 deletions src/main/java/demo/TemplateParseDemo.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package demo;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.client.Client;
import org.opensearch.client.node.NodeClient;
import org.opensearch.common.SuppressForbidden;
import org.opensearch.common.io.PathUtils;
import org.opensearch.flowframework.model.Template;
import org.opensearch.flowframework.model.Workflow;
import org.opensearch.flowframework.workflow.WorkflowProcessSorter;
import org.opensearch.flowframework.workflow.WorkflowStepFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.Map.Entry;
import java.util.concurrent.Executors;

/**
* Demo class exercising {@link WorkflowProcessSorter}. This will be moved to a unit test.
*/
public class TemplateParseDemo {

private static final Logger logger = LogManager.getLogger(TemplateParseDemo.class);

/**
* Demonstrate parsing a JSON graph.
*
* @param args unused
* @throws IOException on error.
*/
@SuppressForbidden(reason = "just a demo class that will be deleted")
public static void main(String[] args) throws IOException {
String path = "src/test/resources/template/finaltemplate.json";
String json;
try {
json = new String(Files.readAllBytes(PathUtils.get(path)), StandardCharsets.UTF_8);
} catch (IOException e) {
logger.error("Failed to read JSON at path {}", path);
return;
}
Client client = new NodeClient(null, null);
WorkflowStepFactory factory = WorkflowStepFactory.create(client);
WorkflowProcessSorter.create(factory, Executors.newFixedThreadPool(10));

Template t = Template.parse(json);

System.out.println(t.toJson());
System.out.println(t.toYaml());

for (Entry<String, Workflow> e : t.workflows().entrySet()) {
logger.info("Parsing {} workflow.", e.getKey());
WorkflowProcessSorter.get().sortProcessNodes(e.getValue());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.flowframework.workflow.CreateIndex.CreateIndexStep;
import org.opensearch.flowframework.workflow.CreateIngestPipelineStep;
import org.opensearch.flowframework.workflow.WorkflowProcessSorter;
import org.opensearch.flowframework.workflow.WorkflowStepFactory;
import org.opensearch.plugins.Plugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.script.ScriptService;
Expand All @@ -32,8 +32,6 @@
*/
public class FlowFrameworkPlugin extends Plugin {

private Client client;

@Override
public Collection<Object> createComponents(
Client client,
Expand All @@ -48,9 +46,9 @@
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<RepositoriesService> repositoriesServiceSupplier
) {
this.client = client;
CreateIngestPipelineStep createIngestPipelineStep = new CreateIngestPipelineStep(client);
CreateIndexStep createIndexStep = new CreateIndexStep(client);
return ImmutableList.of(createIngestPipelineStep, createIndexStep);
WorkflowStepFactory workflowStepFactory = WorkflowStepFactory.create(client);
WorkflowProcessSorter workflowProcessSorter = WorkflowProcessSorter.create(workflowStepFactory, threadPool.generic());

Check warning on line 50 in src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java#L49-L50

Added lines #L49 - L50 were not covered by tests
dbwiddis marked this conversation as resolved.
Show resolved Hide resolved

return ImmutableList.of(workflowStepFactory, workflowProcessSorter);

Check warning on line 52 in src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java#L52

Added line #L52 was not covered by tests
}
}
Loading
Loading