Skip to content

Commit

Permalink
Added workflow step for ReIndex Step (opensearch-project#718)
Browse files Browse the repository at this point in the history
* Initial commit for reindex workflow step with extra params

Signed-off-by: owaiskazi19 <[email protected]>

* Addressed PR comments

Signed-off-by: owaiskazi19 <[email protected]>

* Changed request per second to Float

Signed-off-by: owaiskazi19 <[email protected]>

* Addressed string array for source indices and removed state index entry

Signed-off-by: owaiskazi19 <[email protected]>

* Minor comments

Signed-off-by: owaiskazi19 <[email protected]>

---------

Signed-off-by: owaiskazi19 <[email protected]>
Signed-off-by: martinpkr <[email protected]>
  • Loading branch information
owaiskazi19 authored and martinpkr committed Jun 1, 2024
1 parent 0cc74bf commit 421f1fd
Show file tree
Hide file tree
Showing 7 changed files with 393 additions and 2 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)
## [Unreleased 2.x](https://github.com/opensearch-project/flow-framework/compare/2.14...2.x)
### Features
### Enhancements
- Add Workflow Step for Reindex from source index to destination ([#718](https://github.com/opensearch-project/flow-framework/pull/718))

### Bug Fixes
- Add user mapping to Workflow State index ([#705](https://github.com/opensearch-project/flow-framework/pull/705))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,10 @@ private CommonValue() {}
public static final String DELAY_FIELD = "delay";
/** Model Interface Field */
public static final String INTERFACE_FIELD = "interface";

/** The source index field for reindex */
public static final String SOURCE_INDEX = "source_index";
/** The destination index field for reindex */
public static final String DESTINATION_INDEX = "destination_index";
/*
* Constants associated with resource provisioning / state
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.opensearch.flowframework.workflow.RegisterLocalSparseEncodingModelStep;
import org.opensearch.flowframework.workflow.RegisterModelGroupStep;
import org.opensearch.flowframework.workflow.RegisterRemoteModelStep;
import org.opensearch.flowframework.workflow.ReindexStep;
import org.opensearch.flowframework.workflow.UndeployModelStep;

import java.util.Set;
Expand Down Expand Up @@ -58,6 +59,8 @@ public enum WorkflowResources {
CREATE_SEARCH_PIPELINE(CreateSearchPipelineStep.NAME, WorkflowResources.PIPELINE_ID, null), // TODO delete step
/** Workflow steps for creating an index and associated created resource */
CREATE_INDEX(CreateIndexStep.NAME, WorkflowResources.INDEX_NAME, NoOpStep.NAME),
/** Workflow steps for reindex a source index to destination index and associated created resource */
REINDEX(ReindexStep.NAME, WorkflowResources.INDEX_NAME, NoOpStep.NAME),
/** Workflow steps for registering/deleting an agent and the associated created resource */
REGISTER_AGENT(RegisterAgentStep.NAME, WorkflowResources.AGENT_ID, DeleteAgentStep.NAME);

Expand Down
176 changes: 176 additions & 0 deletions src/main/java/org/opensearch/flowframework/workflow/ReindexStep.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.workflow;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.ExceptionsHelper;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.client.Client;
import org.opensearch.common.Booleans;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.Strings;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.exception.WorkflowStepException;
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
import org.opensearch.flowframework.util.ParseUtils;
import org.opensearch.index.reindex.BulkByScrollResponse;
import org.opensearch.index.reindex.ReindexAction;
import org.opensearch.index.reindex.ReindexRequest;

import java.util.Map;
import java.util.Set;

import static org.opensearch.flowframework.common.CommonValue.DESTINATION_INDEX;
import static org.opensearch.flowframework.common.CommonValue.SOURCE_INDEX;

/**
* Step to reindex
*/
public class ReindexStep implements WorkflowStep {

private static final Logger logger = LogManager.getLogger(ReindexStep.class);
private final Client client;
private final FlowFrameworkIndicesHandler flowFrameworkIndicesHandler;

/** The name of this step, used as a key in the template and the {@link WorkflowStepFactory} */
public static final String NAME = "reindex";
/** The refresh field for reindex */
private static final String REFRESH = "refresh";
/** The requests_per_second field for reindex */
private static final String REQUESTS_PER_SECOND = "requests_per_second";
/** The require_alias field for reindex */
private static final String REQUIRE_ALIAS = "require_alias";
/** The slices field for reindex */
private static final String SLICES = "slices";
/** The max_docs field for reindex */
private static final String MAX_DOCS = "max_docs";

/**
* Instantiate this class
*
* @param client Client to create an index
* @param flowFrameworkIndicesHandler FlowFrameworkIndicesHandler class to update system indices
*/
public ReindexStep(Client client, FlowFrameworkIndicesHandler flowFrameworkIndicesHandler) {
this.client = client;
this.flowFrameworkIndicesHandler = flowFrameworkIndicesHandler;
}

@Override
public PlainActionFuture<WorkflowData> execute(
String currentNodeId,
WorkflowData currentNodeInputs,
Map<String, WorkflowData> outputs,
Map<String, String> previousNodeInputs,
Map<String, String> params
) {

PlainActionFuture<WorkflowData> reIndexFuture = PlainActionFuture.newFuture();

Set<String> requiredKeys = Set.of(SOURCE_INDEX, DESTINATION_INDEX);

Set<String> optionalKeys = Set.of(REFRESH, REQUESTS_PER_SECOND, REQUIRE_ALIAS, SLICES, MAX_DOCS);

try {
Map<String, Object> inputs = ParseUtils.getInputsFromPreviousSteps(
requiredKeys,
optionalKeys,
currentNodeInputs,
outputs,
previousNodeInputs,
params
);

String sourceIndices = (String) inputs.get(SOURCE_INDEX);
String destinationIndex = (String) inputs.get(DESTINATION_INDEX);
Boolean refresh = inputs.containsKey(REFRESH) ? Booleans.parseBoolean(inputs.get(REFRESH).toString()) : null;
Float requestsPerSecond = inputs.containsKey(REQUESTS_PER_SECOND)
? Float.parseFloat(inputs.get(REQUESTS_PER_SECOND).toString())
: null;
Boolean requireAlias = inputs.containsKey(REQUIRE_ALIAS) ? Booleans.parseBoolean(inputs.get(REQUIRE_ALIAS).toString()) : null;
Integer slices = (Integer) inputs.get(SLICES);
Integer maxDocs = (Integer) inputs.get(MAX_DOCS);

ReindexRequest reindexRequest = new ReindexRequest().setSourceIndices(Strings.splitStringByCommaToArray(sourceIndices))
.setDestIndex(destinationIndex);

if (refresh != null) {
reindexRequest.setRefresh(refresh);
}
if (requestsPerSecond != null) {
reindexRequest.setRequestsPerSecond(requestsPerSecond);
}
if (requireAlias != null) {
reindexRequest.setRequireAlias(requireAlias);
}
if (maxDocs != null) {
reindexRequest.setMaxDocs(maxDocs);
}
if (slices != null) {
reindexRequest.setSlices(slices);
}

ActionListener<BulkByScrollResponse> actionListener = new ActionListener<>() {

@Override
public void onResponse(BulkByScrollResponse bulkByScrollResponse) {
logger.info("Reindex from source: {} to destination {}", sourceIndices, destinationIndex);
try {
if (bulkByScrollResponse.getBulkFailures().isEmpty() && bulkByScrollResponse.getSearchFailures().isEmpty()) {
reIndexFuture.onResponse(
new WorkflowData(
Map.of(
NAME,
Map.ofEntries(
Map.entry(DESTINATION_INDEX, destinationIndex),
Map.entry(SOURCE_INDEX, sourceIndices)
)
),
currentNodeInputs.getWorkflowId(),
currentNodeInputs.getNodeId()
)
);
} else {
String errorMessage = "Failed to get bulk response " + bulkByScrollResponse.getBulkFailures();
reIndexFuture.onFailure(new FlowFrameworkException(errorMessage, RestStatus.BAD_REQUEST));
}
} catch (Exception e) {
String errorMessage = "Failed to parse and update new created resource";
logger.error(errorMessage, e);
reIndexFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));
}
}

@Override
public void onFailure(Exception e) {
String errorMessage = "Failed to reindex from source " + sourceIndices + " to " + destinationIndex;
logger.error(errorMessage, e);
reIndexFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e)));
}
};

client.execute(ReindexAction.INSTANCE, reindexRequest, actionListener);

} catch (IllegalArgumentException iae) {
String error = "Failed to reindex " + iae.getMessage();
reIndexFuture.onFailure(new WorkflowStepException(error, RestStatus.BAD_REQUEST));
} catch (Exception e) {
reIndexFuture.onFailure(e);
}

return reIndexFuture;
}

@Override
public String getName() {
return NAME;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import static org.opensearch.flowframework.common.CommonValue.CONFIGURATIONS;
import static org.opensearch.flowframework.common.CommonValue.CREDENTIAL_FIELD;
import static org.opensearch.flowframework.common.CommonValue.DESCRIPTION_FIELD;
import static org.opensearch.flowframework.common.CommonValue.DESTINATION_INDEX;
import static org.opensearch.flowframework.common.CommonValue.EMBEDDING_DIMENSION;
import static org.opensearch.flowframework.common.CommonValue.FRAMEWORK_TYPE;
import static org.opensearch.flowframework.common.CommonValue.FUNCTION_NAME;
Expand All @@ -47,6 +48,7 @@
import static org.opensearch.flowframework.common.CommonValue.PIPELINE_ID;
import static org.opensearch.flowframework.common.CommonValue.PROTOCOL_FIELD;
import static org.opensearch.flowframework.common.CommonValue.REGISTER_MODEL_STATUS;
import static org.opensearch.flowframework.common.CommonValue.SOURCE_INDEX;
import static org.opensearch.flowframework.common.CommonValue.SUCCESS;
import static org.opensearch.flowframework.common.CommonValue.TOOLS_FIELD;
import static org.opensearch.flowframework.common.CommonValue.TYPE;
Expand Down Expand Up @@ -84,6 +86,7 @@ public WorkflowStepFactory(
) {
stepMap.put(NoOpStep.NAME, NoOpStep::new);
stepMap.put(CreateIndexStep.NAME, () -> new CreateIndexStep(client, flowFrameworkIndicesHandler));
stepMap.put(ReindexStep.NAME, () -> new ReindexStep(client, flowFrameworkIndicesHandler));
stepMap.put(
RegisterLocalCustomModelStep.NAME,
() -> new RegisterLocalCustomModelStep(threadPool, mlClient, flowFrameworkIndicesHandler, flowFrameworkSettings)
Expand Down Expand Up @@ -125,6 +128,9 @@ public enum WorkflowSteps {
/** Create Index Step */
CREATE_INDEX(CreateIndexStep.NAME, List.of(INDEX_NAME, CONFIGURATIONS), List.of(INDEX_NAME), Collections.emptyList(), null),

/** Create ReIndex Step */
REINDEX(ReindexStep.NAME, List.of(SOURCE_INDEX, DESTINATION_INDEX), List.of(ReindexStep.NAME), Collections.emptyList(), null),

/** Create Connector Step */
CREATE_CONNECTOR(
CreateConnectorStep.NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public void testParseWorkflowValidator() throws IOException {

WorkflowValidator validator = new WorkflowValidator(workflowStepValidators);

assertEquals(17, validator.getWorkflowStepValidators().size());
assertEquals(18, validator.getWorkflowStepValidators().size());

assertTrue(validator.getWorkflowStepValidators().keySet().contains("create_connector"));
assertEquals(7, validator.getWorkflowStepValidators().get("create_connector").getInputs().size());
Expand Down
Loading

0 comments on commit 421f1fd

Please sign in to comment.