Skip to content

Commit

Permalink
[Offline Nodes] Adds new library for offline tasks (opensearch-projec…
Browse files Browse the repository at this point in the history
…t#13574)

---------

Signed-off-by: Varun Bansal <[email protected]>
Signed-off-by: Bukhtawar Khan <[email protected]>
Co-authored-by: Bukhtawar Khan <[email protected]>
  • Loading branch information
2 people authored and wangdongyu.danny committed Aug 22, 2024
1 parent c5f4f8f commit eb1ea3a
Show file tree
Hide file tree
Showing 26 changed files with 1,306 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## [Unreleased 2.x]
### Added
- [Offline Nodes] Adds offline-tasks library containing various interfaces to be used for Offline Background Tasks. ([#13574](https://github.com/opensearch-project/OpenSearch/pull/13574))
- Fix for hasInitiatedFetching to fix allocation explain and manual reroute APIs (([#14972](https://github.com/opensearch-project/OpenSearch/pull/14972))
- [Workload Management] Add queryGroupId to Task ([14708](https://github.com/opensearch-project/OpenSearch/pull/14708))
- Add setting to ignore throttling nodes for allocation of unassigned primaries in remote restore ([#14991](https://github.com/opensearch-project/OpenSearch/pull/14991))
Expand Down
25 changes: 25 additions & 0 deletions libs/task-commons/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

dependencies {
api project(':libs:opensearch-common')

testImplementation "com.carrotsearch.randomizedtesting:randomizedtesting-runner:${versions.randomizedrunner}"
testImplementation "junit:junit:${versions.junit}"
testImplementation "org.hamcrest:hamcrest:${versions.hamcrest}"
testImplementation(project(":test:framework")) {
exclude group: 'org.opensearch', module: 'opensearch-task-commons'
}
}

tasks.named('forbiddenApisMain').configure {
replaceSignatureFiles 'jdk-signatures'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.task.commons.clients;

import org.opensearch.task.commons.task.TaskStatus;
import org.opensearch.task.commons.task.TaskType;
import org.opensearch.task.commons.worker.WorkerNode;

/**
* Request object for listing tasks
*/
public class TaskListRequest {

/**
* Filters listTasks response by specific task status'
*/
private TaskStatus[] taskStatus;

/**
* Filter listTasks response by specific task types
*/
private TaskType[] taskTypes;

/**
* Filter listTasks response by specific worker node
*/
private WorkerNode workerNodes;

/**
* Depicts the start page number for the list call.
*
* @see TaskManagerClient#listTasks(TaskListRequest)
*/
private int startPageNumber;

/**
* Depicts the page size for the list call.
*
* @see TaskManagerClient#listTasks(TaskListRequest)
*/
private int pageSize;

/**
* Default constructor
*/
public TaskListRequest() {}

/**
* Update task types to filter with in the request
* @param taskTypes TaskType[]
* @return ListTaskRequest
*/
public TaskListRequest taskType(TaskType... taskTypes) {
this.taskTypes = taskTypes;
return this;
}

/**
* Update task status to filter with in the request
* @param taskStatus TaskStatus[]
* @return ListTaskRequest
*/
public TaskListRequest taskType(TaskStatus... taskStatus) {
this.taskStatus = taskStatus;
return this;
}

/**
* Update worker node to filter with in the request
* @param workerNode WorkerNode
* @return ListTaskRequest
*/
private TaskListRequest workerNode(WorkerNode workerNode) {
this.workerNodes = workerNode;
return this;
}

/**
* Update page number to start with when fetching the list of tasks
* @param startPageNumber startPageNumber
* @return ListTaskRequest
*/
public TaskListRequest startPageNumber(int startPageNumber) {
this.startPageNumber = startPageNumber;
return this;
}

/**
* Update page size for the list tasks response
* @param pageSize int
* @return ListTaskRequest
*/
public TaskListRequest pageSize(int pageSize) {
this.pageSize = pageSize;
return this;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.task.commons.clients;

import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.task.commons.task.Task;
import org.opensearch.task.commons.task.TaskId;
import org.opensearch.task.commons.worker.WorkerNode;

import java.util.List;

/**
* Client used to interact with Task Store/Queue.
*
* TODO: TaskManager can be something not running an opensearch process.
* We need to come up with a way to allow this interface to be used with in and out opensearch as well
*
* @opensearch.experimental
*/
@ExperimentalApi
public interface TaskManagerClient {

/**
* Get task from TaskStore/Queue
*
* @param taskId TaskId of the task to be retrieved
* @return Task corresponding to TaskId
*/
Task getTask(TaskId taskId);

/**
* Update task in TaskStore/Queue
*
* @param task Task to be updated
*/
void updateTask(Task task);

/**
* Mark task as cancelled.
* Ongoing Tasks can be cancelled as well if the corresponding worker supports cancellation
*
* @param taskId TaskId of the task to be cancelled
*/
void cancelTask(TaskId taskId);

/**
* List all tasks applying all the filters present in listTaskRequest
*
* @param taskListRequest TaskListRequest
* @return list of all the task matching the filters in listTaskRequest
*/
List<Task> listTasks(TaskListRequest taskListRequest);

/**
* Assign Task to a particular WorkerNode. This ensures no 2 worker Nodes work on the same task.
* This API can be used in both pull and push models of task assignment.
*
* @param taskId TaskId of the task to be assigned
* @param node WorkerNode task is being assigned to
* @return true if task is assigned successfully, false otherwise
*/
boolean assignTask(TaskId taskId, WorkerNode node);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.task.commons.clients;

import org.opensearch.task.commons.task.Task;

/**
* Producer interface used to submit new tasks for execution on worker nodes.
*/
public interface TaskProducerClient {

/**
* Submit a new task to TaskStore/Queue
*
* @param task Task to be submitted for execution on offline nodes
*/
void submitTask(Task task);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.task.commons.clients;

import org.opensearch.task.commons.task.Task;
import org.opensearch.task.commons.task.TaskId;

import java.util.List;

/**
* Consumer interface used to find new tasks assigned to a {@code WorkerNode} for execution.
*/
public interface TaskWorkerClient {

/**
* List all tasks assigned to a WorkerNode.
* Useful when the implementation uses a separate store for Task assignments to Worker nodes
*
* @param taskListRequest TaskListRequest
* @return list of all tasks assigned to a WorkerNode
*/
List<Task> getAssignedTasks(TaskListRequest taskListRequest);

/**
* Sends task heart beat to Task Store/Queue
*
* @param taskId TaskId of Task to send heartbeat for
* @param timestamp timestamp of heartbeat to be recorded in TaskStore/Queue
*/
void sendTaskHeartbeat(TaskId taskId, long timestamp);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/**
* Contains task client related classes
*/
package org.opensearch.task.commons.clients;
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/**
* Contains offline tasks related classes
*/
package org.opensearch.task.commons;
Loading

0 comments on commit eb1ea3a

Please sign in to comment.