Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change max retries to retry duration, refactor settings for consistency #381

Merged
merged 5 commits into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@
import static org.opensearch.flowframework.common.CommonValue.FLOW_FRAMEWORK_THREAD_POOL_PREFIX;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_THREAD_POOL;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.FLOW_FRAMEWORK_ENABLED;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_GET_TASK_REQUEST_RETRY;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_WORKFLOWS;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_WORKFLOW_STEPS;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.TASK_REQUEST_RETRY_DURATION;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.WORKFLOW_REQUEST_TIMEOUT;

/**
Expand All @@ -84,8 +84,6 @@ public class FlowFrameworkPlugin extends Plugin implements ActionPlugin {

private FlowFrameworkSettings flowFrameworkSettings;

private ClusterService clusterService;

/**
* Instantiate this plugin.
*/
Expand All @@ -106,7 +104,6 @@ public Collection<Object> createComponents(
Supplier<RepositoriesService> repositoriesServiceSupplier
) {
Settings settings = environment.settings();
this.clusterService = clusterService;
flowFrameworkSettings = new FlowFrameworkSettings(clusterService, settings);
MachineLearningNodeClient mlClient = new MachineLearningNodeClient(client);
EncryptorUtils encryptorUtils = new EncryptorUtils(clusterService, client);
Expand All @@ -127,7 +124,7 @@ public Collection<Object> createComponents(
flowFrameworkSettings
);

return List.of(workflowStepFactory, workflowProcessSorter, encryptorUtils, flowFrameworkIndicesHandler);
return List.of(workflowStepFactory, workflowProcessSorter, encryptorUtils, flowFrameworkIndicesHandler, flowFrameworkSettings);
}

@Override
Expand All @@ -141,7 +138,7 @@ public List<RestHandler> getRestHandlers(
Supplier<DiscoveryNodes> nodesInCluster
) {
return List.of(
new RestCreateWorkflowAction(flowFrameworkSettings, settings, clusterService),
new RestCreateWorkflowAction(flowFrameworkSettings),
new RestDeleteWorkflowAction(flowFrameworkSettings),
new RestProvisionWorkflowAction(flowFrameworkSettings),
new RestDeprovisionWorkflowAction(flowFrameworkSettings),
Expand All @@ -168,7 +165,7 @@ public List<RestHandler> getRestHandlers(

@Override
public List<Setting<?>> getSettings() {
return List.of(FLOW_FRAMEWORK_ENABLED, MAX_WORKFLOWS, MAX_WORKFLOW_STEPS, WORKFLOW_REQUEST_TIMEOUT, MAX_GET_TASK_REQUEST_RETRY);
return List.of(FLOW_FRAMEWORK_ENABLED, MAX_WORKFLOWS, MAX_WORKFLOW_STEPS, WORKFLOW_REQUEST_TIMEOUT, TASK_REQUEST_RETRY_DURATION);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,21 @@
public class FlowFrameworkSettings {

private volatile Boolean isFlowFrameworkEnabled;
/** The maximum number of transport request retries */
private volatile Integer maxRetry;
/** Max workflow steps that can be created*/
/** The duration between request retries */
private volatile TimeValue retryDuration;
/** Max workflow steps that can be created */
private volatile Integer maxWorkflowSteps;
/** Max workflows that can be created*/
protected volatile Integer maxWorkflows;
/** Timeout for internal requests*/
protected volatile TimeValue requestTimeout;

/** The upper limit of max workflows that can be created */
public static final int MAX_WORKFLOWS_LIMIT = 10000;
/** The upper limit of max workflow steps that can be in a single workflow */
public static final int MAX_WORKFLOW_STEPS_LIMIT = 500;

/** This setting sets max workflows that can be created */
/** This setting sets max workflows that can be created */
public static final Setting<Integer> MAX_WORKFLOWS = Setting.intSetting(
"plugins.flow_framework.max_workflows",
1000,
Expand All @@ -37,7 +41,7 @@ public class FlowFrameworkSettings {
Setting.Property.Dynamic
);

/** This setting sets max workflows that can be created */
/** This setting sets max workflows that can be created */
public static final Setting<Integer> MAX_WORKFLOW_STEPS = Setting.intSetting(
"plugins.flow_framework.max_workflow_steps",
50,
Expand All @@ -47,7 +51,7 @@ public class FlowFrameworkSettings {
Setting.Property.Dynamic
);

/** This setting sets the timeout for the request */
/** This setting sets the timeout for the request */
public static final Setting<TimeValue> WORKFLOW_REQUEST_TIMEOUT = Setting.positiveTimeSetting(
"plugins.flow_framework.request_timeout",
TimeValue.timeValueSeconds(10),
Expand All @@ -63,11 +67,10 @@ public class FlowFrameworkSettings {
Setting.Property.Dynamic
);

/** This setting sets the maximum number of get task request retries */
public static final Setting<Integer> MAX_GET_TASK_REQUEST_RETRY = Setting.intSetting(
"plugins.flow_framework.max_get_task_request_retry",
5,
0,
/** This setting sets the time between task request retries */
public static final Setting<TimeValue> TASK_REQUEST_RETRY_DURATION = Setting.positiveTimeSetting(
"plugins.flow_framework.task_request_retry_duration",
TimeValue.timeValueSeconds(5),
Setting.Property.NodeScope,
Setting.Property.Dynamic
);
Expand All @@ -82,27 +85,31 @@ public FlowFrameworkSettings(ClusterService clusterService, Settings settings) {
// Currently this is just an on/off switch for the entire plugin's API.
// If desired more fine-tuned feature settings can be added below.
this.isFlowFrameworkEnabled = FLOW_FRAMEWORK_ENABLED.get(settings);
this.maxRetry = MAX_GET_TASK_REQUEST_RETRY.get(settings);
this.retryDuration = TASK_REQUEST_RETRY_DURATION.get(settings);
this.maxWorkflowSteps = MAX_WORKFLOW_STEPS.get(settings);
this.maxWorkflows = MAX_WORKFLOWS.get(settings);
this.requestTimeout = WORKFLOW_REQUEST_TIMEOUT.get(settings);
clusterService.getClusterSettings().addSettingsUpdateConsumer(FLOW_FRAMEWORK_ENABLED, it -> isFlowFrameworkEnabled = it);
clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_GET_TASK_REQUEST_RETRY, it -> maxRetry = it);
clusterService.getClusterSettings().addSettingsUpdateConsumer(TASK_REQUEST_RETRY_DURATION, it -> retryDuration = it);
clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_WORKFLOW_STEPS, it -> maxWorkflowSteps = it);
clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_WORKFLOWS, it -> maxWorkflows = it);
clusterService.getClusterSettings().addSettingsUpdateConsumer(WORKFLOW_REQUEST_TIMEOUT, it -> requestTimeout = it);
owaiskazi19 marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* Whether the flow framework feature is enabled. If disabled, no REST APIs will be availble.
* Whether the flow framework feature is enabled. If disabled, no REST APIs will be available.
* @return whether Flow Framework is enabled.
*/
public boolean isFlowFrameworkEnabled() {
return isFlowFrameworkEnabled;
}

/**
* Getter for max retry count
* @return count of max retry
* Getter for retry duration
* @return retry duration
*/
public Integer getMaxRetry() {
return maxRetry;
public TimeValue getRetryDuration() {
return retryDuration;
}

/**
Expand All @@ -112,4 +119,20 @@ public Integer getMaxRetry() {
public Integer getMaxWorkflowSteps() {
return maxWorkflowSteps;
}

/**
* Getter for max workflows
* @return max workflows
*/
public Integer getMaxWorkflows() {
return maxWorkflows;
}

/**
* Getter for request timeout
* @return request timeout
*/
public TimeValue getRequestTimeout() {
return requestTimeout;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,33 +48,33 @@
/** Search action type*/
protected final ActionType<SearchResponse> actionType;
/** Settings to enable FlowFramework API*/
protected final FlowFrameworkSettings flowFrameworkFeatureEnabledSetting;
protected final FlowFrameworkSettings flowFrameworkSettings;

/**
* Instantiates a new AbstractSearchWorkflowAction
* @param urlPaths urlPaths to create routes
* @param index index the search should be done on
* @param clazz model class
* @param actionType from which action abstract class is called
* @param flowFrameworkFeatureEnabledSetting Whether this API is enabled
* @param flowFrameworkSettings Whether this API is enabled
*/
public AbstractSearchWorkflowAction(
List<String> urlPaths,
String index,
Class<T> clazz,
ActionType<SearchResponse> actionType,
FlowFrameworkSettings flowFrameworkFeatureEnabledSetting
FlowFrameworkSettings flowFrameworkSettings
) {
this.urlPaths = urlPaths;
this.index = index;
this.clazz = clazz;
this.actionType = actionType;
this.flowFrameworkFeatureEnabledSetting = flowFrameworkFeatureEnabledSetting;
this.flowFrameworkSettings = flowFrameworkSettings;
}

@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
if (!flowFrameworkFeatureEnabledSetting.isFlowFrameworkEnabled()) {
if (!flowFrameworkSettings.isFlowFrameworkEnabled()) {
FlowFrameworkException ffe = new FlowFrameworkException(
"This API is disabled. To enable it, update the setting [" + FLOW_FRAMEWORK_ENABLED.getKey() + "] to true.",
RestStatus.FORBIDDEN
Expand All @@ -87,7 +87,8 @@
searchSourceBuilder.parseXContent(request.contentOrSourceParamParser());
searchSourceBuilder.fetchSource(getSourceContext(request, searchSourceBuilder));
searchSourceBuilder.seqNoAndPrimaryTerm(true).version(true);
searchSourceBuilder.timeout(flowFrameworkSettings.getRequestTimeout());
SearchRequest searchRequest = new SearchRequest().source(searchSourceBuilder).indices(index);

Check warning on line 91 in src/main/java/org/opensearch/flowframework/rest/AbstractSearchWorkflowAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/rest/AbstractSearchWorkflowAction.java#L90-L91

Added lines #L90 - L91 were not covered by tests
return channel -> client.execute(actionType, searchRequest, search(channel));
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.client.node.NodeClient;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContent;
Expand All @@ -23,6 +21,7 @@
import org.opensearch.flowframework.model.Template;
import org.opensearch.flowframework.transport.CreateWorkflowAction;
import org.opensearch.flowframework.transport.WorkflowRequest;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.BytesRestResponse;
import org.opensearch.rest.RestRequest;

Expand All @@ -40,26 +39,19 @@
/**
* Rest Action to facilitate requests to create and update a use case template
*/
public class RestCreateWorkflowAction extends AbstractWorkflowAction {
public class RestCreateWorkflowAction extends BaseRestHandler {

private static final Logger logger = LogManager.getLogger(RestCreateWorkflowAction.class);
private static final String CREATE_WORKFLOW_ACTION = "create_workflow_action";

private FlowFrameworkSettings flowFrameworkFeatureEnabledSetting;
private FlowFrameworkSettings flowFrameworkSettings;

/**
* Instantiates a new RestCreateWorkflowAction
* @param flowFrameworkFeatureEnabledSetting Whether this API is enabled
* @param settings Environment settings
* @param clusterService clusterService
* @param flowFrameworkSettings The settings for the flow framework plugin
*/
public RestCreateWorkflowAction(
FlowFrameworkSettings flowFrameworkFeatureEnabledSetting,
Settings settings,
ClusterService clusterService
) {
super(settings, clusterService);
this.flowFrameworkFeatureEnabledSetting = flowFrameworkFeatureEnabledSetting;
public RestCreateWorkflowAction(FlowFrameworkSettings flowFrameworkSettings) {
this.flowFrameworkSettings = flowFrameworkSettings;
}

@Override
Expand All @@ -80,7 +72,7 @@ public List<Route> routes() {
@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
String workflowId = request.param(WORKFLOW_ID);
if (!flowFrameworkFeatureEnabledSetting.isFlowFrameworkEnabled()) {
if (!flowFrameworkSettings.isFlowFrameworkEnabled()) {
FlowFrameworkException ffe = new FlowFrameworkException(
"This API is disabled. To enable it, set [" + FLOW_FRAMEWORK_ENABLED.getKey() + "] to true.",
RestStatus.FORBIDDEN
Expand All @@ -96,14 +88,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
String[] validation = request.paramAsStringArray(VALIDATION, new String[] { "all" });
boolean provision = request.paramAsBoolean(PROVISION_WORKFLOW, false);

WorkflowRequest workflowRequest = new WorkflowRequest(
workflowId,
template,
validation,
provision,
requestTimeout,
maxWorkflows
);
WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, template, validation, provision);

return channel -> client.execute(CreateWorkflowAction.INSTANCE, workflowRequest, ActionListener.wrap(response -> {
XContentBuilder builder = response.toXContent(channel.newBuilder(), ToXContent.EMPTY_PARAMS);
Expand Down
Loading
Loading