Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Offline Nodes] Adds new library for offline tasks #13574

Merged
merged 35 commits into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from 33 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
bf65b92
[Offline Nodes] Adds new library offline tasks
linuxpi May 7, 2024
1742088
add documentation & TaskWorker interface
linuxpi May 7, 2024
2efd394
add docstring for offline-tasks library
linuxpi May 7, 2024
d0b4983
add TaskWorkerPlugin interface
linuxpi May 7, 2024
152e28e
fix missingjavadoc issue
linuxpi May 8, 2024
dbf97ca
move plugin interfaces to :server plugins namespace
linuxpi May 9, 2024
d6eadca
add changelog
linuxpi May 9, 2024
a2beb11
update changelog
linuxpi May 9, 2024
a304615
Merge branch 'main' into offline-tasks-library
linuxpi May 9, 2024
9b19227
add missing javaDocStrings
linuxpi May 9, 2024
804fcb1
Merge branch 'main' into offline-tasks-library
linuxpi May 22, 2024
9e48827
Add TaskStatus attribute of a Task and update CHANGELOG
linuxpi May 22, 2024
806edc8
Refactor list tasks : Add dedicated TaskListQueryParams
linuxpi May 22, 2024
25fa935
change claimTask signature to return boolean
linuxpi May 22, 2024
a330bfd
add new TaskStatus - UNASSIGNED, ASSIGNED, CANCELLED
linuxpi May 31, 2024
7c08662
add getTasks contract to accept TaskType as well
linuxpi May 31, 2024
34094a5
add builder for Task and add timestamp related attributes
linuxpi May 31, 2024
a0cbd97
add WorkerNode to Task to add assigned node information and provision…
linuxpi May 31, 2024
aceb74d
refactor TaskClient into TaskProducerClient and TaskManagerClient
linuxpi May 31, 2024
f50b167
refactor list tasks api
linuxpi May 31, 2024
8bfcb40
rename all occurences of claim to assign
linuxpi Jun 13, 2024
42f9e73
add support to list task assignments from a separate store other than…
linuxpi Jun 13, 2024
bd2db78
Rename to task commmons
Bukhtawar Jun 13, 2024
8e3385c
fix spotless findings
linuxpi Jun 13, 2024
1bd0b3b
separate out Task Clients based on different actors
linuxpi Jun 13, 2024
cc92cca
add missing docstring
linuxpi Jun 13, 2024
09d6965
Merge branch 'main' into offline-tasks-library
linuxpi Jul 31, 2024
37d81b1
add Tests for Task
linuxpi Aug 1, 2024
ec76335
Merge branch 'main' into offline-tasks-library
linuxpi Aug 1, 2024
6470881
Merge branch 'main' into offline-tasks-library
linuxpi Aug 4, 2024
3a84e95
support to update task status via Builder
linuxpi Aug 5, 2024
a1e3bc3
add tests for WorkerNode and TaskId
linuxpi Aug 6, 2024
21abf2a
reword to remove offline keyword
linuxpi Aug 7, 2024
cf26f01
gaurd plugin registration behind feature flag
linuxpi Aug 9, 2024
9a87245
Merge branch 'main' into offline-tasks-library
linuxpi Aug 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## [Unreleased 2.x]
### Added
- [Offline Nodes] Adds offline-tasks library containing various interfaces to be used for Offline Background Tasks. ([#13574](https://github.com/opensearch-project/OpenSearch/pull/13574))
sachinpkale marked this conversation as resolved.
Show resolved Hide resolved
- Fix for hasInitiatedFetching to fix allocation explain and manual reroute APIs (([#14972](https://github.com/opensearch-project/OpenSearch/pull/14972))
- [Workload Management] Add queryGroupId to Task ([14708](https://github.com/opensearch-project/OpenSearch/pull/14708))
- Add setting to ignore throttling nodes for allocation of unassigned primaries in remote restore ([#14991](https://github.com/opensearch-project/OpenSearch/pull/14991))
Expand Down
25 changes: 25 additions & 0 deletions libs/task-commons/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

dependencies {
api project(':libs:opensearch-common')

testImplementation "com.carrotsearch.randomizedtesting:randomizedtesting-runner:${versions.randomizedrunner}"
testImplementation "junit:junit:${versions.junit}"
testImplementation "org.hamcrest:hamcrest:${versions.hamcrest}"
testImplementation(project(":test:framework")) {
exclude group: 'org.opensearch', module: 'opensearch-task-commons'
}
}

tasks.named('forbiddenApisMain').configure {
replaceSignatureFiles 'jdk-signatures'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.task.commons.clients;

import org.opensearch.task.commons.task.TaskStatus;
import org.opensearch.task.commons.task.TaskType;
import org.opensearch.task.commons.worker.WorkerNode;

/**
* Request object for listing tasks
*/
public class TaskListRequest {

/**
* Filters listTasks response by specific task status'
*/
private TaskStatus[] taskStatus;

/**
* Filter listTasks response by specific task types
*/
private TaskType[] taskTypes;

/**
* Filter listTasks response by specific worker node
*/
private WorkerNode workerNodes;

/**
* Depicts the start page number for the list call.
*
* @see TaskManagerClient#listTasks(TaskListRequest)
*/
private int startPageNumber;

/**
* Depicts the page size for the list call.
*
* @see TaskManagerClient#listTasks(TaskListRequest)
*/
private int pageSize;

/**
* Default constructor
*/
public TaskListRequest() {}

Check warning on line 52 in libs/task-commons/src/main/java/org/opensearch/task/commons/clients/TaskListRequest.java

View check run for this annotation

Codecov / codecov/patch

libs/task-commons/src/main/java/org/opensearch/task/commons/clients/TaskListRequest.java#L52

Added line #L52 was not covered by tests

/**
* Update task types to filter with in the request
* @param taskTypes TaskType[]
* @return ListTaskRequest
*/
public TaskListRequest taskType(TaskType... taskTypes) {
this.taskTypes = taskTypes;
return this;

Check warning on line 61 in libs/task-commons/src/main/java/org/opensearch/task/commons/clients/TaskListRequest.java

View check run for this annotation

Codecov / codecov/patch

libs/task-commons/src/main/java/org/opensearch/task/commons/clients/TaskListRequest.java#L60-L61

Added lines #L60 - L61 were not covered by tests
}

/**
* Update task status to filter with in the request
* @param taskStatus TaskStatus[]
* @return ListTaskRequest
*/
public TaskListRequest taskType(TaskStatus... taskStatus) {
this.taskStatus = taskStatus;
return this;

Check warning on line 71 in libs/task-commons/src/main/java/org/opensearch/task/commons/clients/TaskListRequest.java

View check run for this annotation

Codecov / codecov/patch

libs/task-commons/src/main/java/org/opensearch/task/commons/clients/TaskListRequest.java#L70-L71

Added lines #L70 - L71 were not covered by tests
}

/**
* Update worker node to filter with in the request
* @param workerNode WorkerNode
* @return ListTaskRequest
*/
private TaskListRequest workerNode(WorkerNode workerNode) {
this.workerNodes = workerNode;
return this;

Check warning on line 81 in libs/task-commons/src/main/java/org/opensearch/task/commons/clients/TaskListRequest.java

View check run for this annotation

Codecov / codecov/patch

libs/task-commons/src/main/java/org/opensearch/task/commons/clients/TaskListRequest.java#L80-L81

Added lines #L80 - L81 were not covered by tests
}

/**
* Update page number to start with when fetching the list of tasks
* @param startPageNumber startPageNumber
* @return ListTaskRequest
*/
public TaskListRequest startPageNumber(int startPageNumber) {
this.startPageNumber = startPageNumber;
return this;

Check warning on line 91 in libs/task-commons/src/main/java/org/opensearch/task/commons/clients/TaskListRequest.java

View check run for this annotation

Codecov / codecov/patch

libs/task-commons/src/main/java/org/opensearch/task/commons/clients/TaskListRequest.java#L90-L91

Added lines #L90 - L91 were not covered by tests
}

/**
* Update page size for the list tasks response
* @param pageSize int
* @return ListTaskRequest
*/
public TaskListRequest pageSize(int pageSize) {
this.pageSize = pageSize;
return this;

Check warning on line 101 in libs/task-commons/src/main/java/org/opensearch/task/commons/clients/TaskListRequest.java

View check run for this annotation

Codecov / codecov/patch

libs/task-commons/src/main/java/org/opensearch/task/commons/clients/TaskListRequest.java#L100-L101

Added lines #L100 - L101 were not covered by tests
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.task.commons.clients;

import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.task.commons.task.Task;
import org.opensearch.task.commons.task.TaskId;
import org.opensearch.task.commons.worker.WorkerNode;

import java.util.List;

/**
* Client used to interact with Task Store/Queue.
*
* TODO: TaskManager can be something not running an opensearch process.
* We need to come up with a way to allow this interface to be used with in and out opensearch as well
*
* @opensearch.experimental
*/
@ExperimentalApi
public interface TaskManagerClient {

/**
* Get task from TaskStore/Queue
*
* @param taskId TaskId of the task to be retrieved
* @return Task corresponding to TaskId
*/
Task getTask(TaskId taskId);

/**
* Update task in TaskStore/Queue
*
* @param task Task to be updated
*/
void updateTask(Task task);

/**
* Mark task as cancelled.
* Ongoing Tasks can be cancelled as well if the corresponding worker supports cancellation
*
* @param taskId TaskId of the task to be cancelled
*/
void cancelTask(TaskId taskId);

/**
* List all tasks applying all the filters present in listTaskRequest
*
* @param taskListRequest TaskListRequest
* @return list of all the task matching the filters in listTaskRequest
*/
List<Task> listTasks(TaskListRequest taskListRequest);

/**
* Assign Task to a particular WorkerNode. This ensures no 2 worker Nodes work on the same task.
* This API can be used in both pull and push models of task assignment.
*
* @param taskId TaskId of the task to be assigned
* @param node WorkerNode task is being assigned to
* @return true if task is assigned successfully, false otherwise
*/
boolean assignTask(TaskId taskId, WorkerNode node);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.task.commons.clients;

import org.opensearch.task.commons.task.Task;

/**
* Producer interface used to submit new tasks for execution on worker nodes.
*/
public interface TaskProducerClient {

/**
* Submit a new task to TaskStore/Queue
*
* @param task Task to be submitted for execution on offline nodes
*/
void submitTask(Task task);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.task.commons.clients;

import org.opensearch.task.commons.task.Task;
import org.opensearch.task.commons.task.TaskId;

import java.util.List;

/**
* Consumer interface used to find new tasks assigned to a {@code WorkerNode} for execution.
*/
public interface TaskWorkerClient {

/**
* List all tasks assigned to a WorkerNode.
* Useful when the implementation uses a separate store for Task assignments to Worker nodes
*
* @param taskListRequest TaskListRequest
* @return list of all tasks assigned to a WorkerNode
*/
List<Task> getAssignedTasks(TaskListRequest taskListRequest);

/**
* Sends task heart beat to Task Store/Queue
*
* @param taskId TaskId of Task to send heartbeat for
* @param timestamp timestamp of heartbeat to be recorded in TaskStore/Queue
*/
void sendTaskHeartbeat(TaskId taskId, long timestamp);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/**
* Contains task client related classes
*/
package org.opensearch.task.commons.clients;
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/**
* Contains offline tasks related classes
*/
package org.opensearch.task.commons;
Loading
Loading