Skip to content

Commit

Permalink
Rebase and refactor with #44
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Widdis <[email protected]>
  • Loading branch information
dbwiddis committed Sep 26, 2023
1 parent 3fa4d0a commit 62fa53c
Show file tree
Hide file tree
Showing 8 changed files with 27 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,8 @@
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
import org.opensearch.env.NodeEnvironment;
<<<<<<< HEAD
import org.opensearch.flowframework.workflow.CreateIndex.CreateIndexStep;
import org.opensearch.flowframework.workflow.CreateIngestPipelineStep;
=======
import org.opensearch.flowframework.template.WorkflowProcessSorter;
import org.opensearch.flowframework.workflow.WorkflowStepFactory;
>>>>>>> abffd2d (Integrate thread pool executor service)
import org.opensearch.plugins.Plugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.script.ScriptService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.workflow.CreateIndex;
package org.opensearch.flowframework.workflow;

import com.google.common.base.Charsets;
import com.google.common.io.Resources;
Expand All @@ -18,9 +18,6 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.action.ActionListener;
import org.opensearch.flowframework.workflow.WorkflowData;
import org.opensearch.flowframework.workflow.WorkflowStep;
import org.opensearch.flowframework.workflow.WorkflowStepFactory;

import java.io.IOException;
import java.net.URL;
Expand All @@ -37,7 +34,7 @@ public class CreateIndexStep implements WorkflowStep {
private Client client;

/** The name of this step, used as a key in the template and the {@link WorkflowStepFactory} */
public static final String NAME = "create_index_step";
static final String NAME = "create_index";

/**
* Instantiate this class
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@
public class CreateIngestPipelineStep implements WorkflowStep {

private static final Logger logger = LogManager.getLogger(CreateIngestPipelineStep.class);
private static final String NAME = "create_ingest_pipeline_step";

/** The name of this step, used as a key in the template and the {@link WorkflowStepFactory} */
static final String NAME = "create_ingest_pipeline";

// Common pipeline configuration fields
private static final String PIPELINE_ID_FIELD = "id";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
package org.opensearch.flowframework.workflow;

import org.opensearch.client.Client;
import org.opensearch.flowframework.workflow.CreateIndex.CreateIndexStep;

import java.util.HashMap;
import java.util.List;
Expand All @@ -25,7 +24,6 @@ public class WorkflowStepFactory {

private static WorkflowStepFactory instance = null;

private Client client;
private final Map<String, WorkflowStep> stepMap = new HashMap<>();

/**
Expand Down Expand Up @@ -55,23 +53,18 @@ public static synchronized WorkflowStepFactory get() {
}

private WorkflowStepFactory(Client client) {
this.client = client;
populateMap();
populateMap(client);
}

private void populateMap() {
private void populateMap(Client client) {
stepMap.put(CreateIndexStep.NAME, new CreateIndexStep(client));
stepMap.put(CreateIngestPipelineStep.NAME, new CreateIngestPipelineStep(client));

// TODO: These are from the demo class as placeholders
// Replace with actual implementations such as
// https://github.com/opensearch-project/opensearch-ai-flow-framework/pull/38
// https://github.com/opensearch-project/opensearch-ai-flow-framework/pull/44
stepMap.put("fetch_model", new DemoWorkflowStep(3000));
stepMap.put("create_ingest_pipeline", new DemoWorkflowStep(3000));
stepMap.put("create_search_pipeline", new DemoWorkflowStep(5000));
stepMap.put("create_neural_search_index", new DemoWorkflowStep(2000));
// TODO: These are from the demo class as placeholders, remove when demos are deleted
stepMap.put("demo_delay_3", new DemoWorkflowStep(3000));
stepMap.put("demo_delay_5", new DemoWorkflowStep(3000));

// Use until all the actual implementations are ready
// Use as a default until all the actual implementations are ready
stepMap.put("placeholder", new WorkflowStep() {
@Override
public CompletableFuture<WorkflowData> execute(List<WorkflowData> data) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
*/
package org.opensearch.flowframework.template;

import org.opensearch.client.AdminClient;
import org.opensearch.client.Client;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.flowframework.workflow.Workflow;
import org.opensearch.flowframework.workflow.WorkflowStepFactory;
Expand All @@ -25,6 +27,8 @@
import static org.opensearch.flowframework.template.TemplateTestJsonUtil.edge;
import static org.opensearch.flowframework.template.TemplateTestJsonUtil.node;
import static org.opensearch.flowframework.template.TemplateTestJsonUtil.workflow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class WorkflowProcessSorterTests extends OpenSearchTestCase {

Expand All @@ -44,8 +48,12 @@ private static List<String> parse(String json) throws IOException {

@BeforeClass
public static void setup() {
AdminClient adminClient = mock(AdminClient.class);
Client client = mock(Client.class);
when(client.admin()).thenReturn(adminClient);

executor = Executors.newFixedThreadPool(10);
WorkflowStepFactory factory = WorkflowStepFactory.create(null);
WorkflowStepFactory factory = WorkflowStepFactory.create(client);
workflowProcessSorter = WorkflowProcessSorter.create(factory, executor);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,14 @@
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.workflow.CreateIndex;
package org.opensearch.flowframework.workflow;

import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.client.AdminClient;
import org.opensearch.client.Client;
import org.opensearch.client.IndicesAdminClient;
import org.opensearch.core.action.ActionListener;
import org.opensearch.flowframework.workflow.WorkflowData;
import org.opensearch.test.OpenSearchTestCase;

import java.io.IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,14 @@
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.workflow.CreateIngestPipeline;
package org.opensearch.flowframework.workflow;

import org.opensearch.action.ingest.PutPipelineRequest;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.client.AdminClient;
import org.opensearch.client.Client;
import org.opensearch.client.ClusterAdminClient;
import org.opensearch.core.action.ActionListener;
import org.opensearch.flowframework.workflow.CreateIngestPipelineStep;
import org.opensearch.flowframework.workflow.WorkflowData;
import org.opensearch.test.OpenSearchTestCase;

import java.util.List;
Expand Down
8 changes: 4 additions & 4 deletions src/test/resources/template/demo.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,19 @@
"nodes": [
{
"id": "fetch_model",
"type": "fetch_model"
"type": "demo_delay_3"
},
{
"id": "create_ingest_pipeline",
"type": "create_ingest_pipeline"
"type": "demo_delay_3"
},
{
"id": "create_search_pipeline",
"type": "create_search_pipeline"
"type": "demo_delay_5"
},
{
"id": "create_neural_search_index",
"type": "create_neural_search_index"
"type": "demo_delay_3"
}
],
"edges": [
Expand Down

0 comments on commit 62fa53c

Please sign in to comment.