Skip to content

Commit

Permalink
Merge pull request #229 from bitfinity-network/maxim/add_reschedule
Browse files Browse the repository at this point in the history
Add `find_id` and `reschedule` methods to scheduler
  • Loading branch information
Maximkaaa authored Aug 16, 2024
2 parents 800c882 + 8c27895 commit fadfe78
Show file tree
Hide file tree
Showing 2 changed files with 257 additions and 2 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ members = [
]

[workspace.package]
version = "0.20.0"
version = "0.20.1"
edition = "2021"

[workspace.dependencies]
Expand Down
257 changes: 256 additions & 1 deletion ic-task-scheduler/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use parking_lot::Mutex;
use serde::de::DeserializeOwned;
use serde::Serialize;

use crate::task::{InnerScheduledTask, ScheduledTask, Task, TaskStatus};
use crate::task::{InnerScheduledTask, ScheduledTask, Task, TaskOptions, TaskStatus};
use crate::time::time_secs;
use crate::SchedulerError;

Expand Down Expand Up @@ -176,6 +176,9 @@ where
}
Err(err) => {
let mut lock = task_scheduler.pending_tasks.lock();
if let Some(updated_task) = lock.get(&task.id) {
task.options = updated_task.options;
}
task.options.failures += 1;
let (should_retry, retry_delay) = match err {
SchedulerError::Unrecoverable(_) => (false, 0),
Expand Down Expand Up @@ -231,10 +234,28 @@ where
pub trait TaskScheduler<T: 'static + Task> {
/// Append a task to the scheduler and return the key of the task.
fn append_task(&self, task: ScheduledTask<T>) -> u32;

/// Append a list of tasks to the scheduler and return the keys of the tasks.
fn append_tasks(&self, tasks: Vec<ScheduledTask<T>>) -> Vec<u32>;

/// Get a task by its key.
fn get_task(&self, task_id: u32) -> Option<InnerScheduledTask<T>>;

/// Returns identifier of the first task for which the `filter` predicate returns true.
/// This method is used to locate a task id of a specific task.
///
/// NOTE: Iterating over all tasks requires loading them one by once from IC stable memory
/// (if stable memory is used for scheduler), which can be slow in case there are many pending
/// tasks in the scheduler.
fn find_id(&self, filter: &dyn Fn(T) -> bool) -> Option<u32>;

/// Changes the retry parameters of the given task id to the new `options` value.
///
/// If the task is currently running, the current execution will be considered as the first
/// execution of the new retry schedule.
///
/// If the task with `task_id` identifier doesn't exist, does nothing.
fn reschedule(&self, task_id: u32, options: TaskOptions);
}

impl<T, P> Clone for Scheduler<T, P>
Expand Down Expand Up @@ -316,6 +337,28 @@ where
fn get_task(&self, task_id: u32) -> Option<InnerScheduledTask<T>> {
self.pending_tasks.lock().get(&task_id)
}

fn reschedule(&self, task_id: u32, options: TaskOptions) {
let mut lock = self.pending_tasks.lock();
let Some(mut task) = lock.get(&task_id) else {
return;
};

task.options = options;
lock.insert(task_id, task);
}

fn find_id(&self, filter: &dyn Fn(T) -> bool) -> Option<u32> {
self.pending_tasks.lock().iter().find_map(
|(id, task)| {
if filter(task.task) {
Some(id)
} else {
None
}
},
)
}
}

#[cfg(test)]
Expand Down Expand Up @@ -1090,4 +1133,216 @@ mod test {
.await;
}
}

mod task_rescheduling {
use std::future::Future;
use std::pin::Pin;
use std::time::Duration;

use candid::Deserialize;
use ic_stable_structures::{StableBTreeMap, VectorMemory};
use tokio::sync::Notify;

use super::*;
use crate::retry::BackoffPolicy;

#[derive(Serialize, Deserialize, Debug, Clone)]
struct SucceedingTask {}
impl Task for SucceedingTask {
type Ctx = ();

fn execute(
&self,
_context: Self::Ctx,
_task_scheduler: Box<dyn 'static + TaskScheduler<Self>>,
) -> Pin<Box<dyn Future<Output = Result<(), SchedulerError>>>> {
Box::pin(async move { Ok(()) })
}
}

#[derive(Serialize, Deserialize, Debug, Clone)]
struct FailingTask {}
impl Task for FailingTask {
type Ctx = ();

fn execute(
&self,
_context: Self::Ctx,
_task_scheduler: Box<dyn 'static + TaskScheduler<Self>>,
) -> Pin<Box<dyn Future<Output = Result<(), SchedulerError>>>> {
Box::pin(async move { Err(SchedulerError::TaskExecutionFailed("".to_string())) })
}
}

thread_local! {
static COMPLETE: Arc<Notify> = Arc::new(Notify::new());
}

#[derive(Serialize, Deserialize, Debug, Clone)]
struct AwaitingTask {}

impl AwaitingTask {
fn complete(&self) {
COMPLETE.with(|v| v.notify_one());
}
}

impl Task for AwaitingTask {
type Ctx = ();

fn execute(
&self,
_context: Self::Ctx,
_task_scheduler: Box<dyn 'static + TaskScheduler<Self>>,
) -> Pin<Box<dyn Future<Output = Result<(), SchedulerError>>>> {
let complete = COMPLETE.with(|v| v.clone());
Box::pin(async move {
println!("Waiting for execution");
complete.notified().await;
println!("Executing task after completion");
Err(SchedulerError::TaskExecutionFailed("".to_string()))
})
}
}

#[tokio::test]
async fn options_updated_for_scheduled_tasks() {
let local = tokio::task::LocalSet::new();
local
.run_until(async move {
let map = StableBTreeMap::new(VectorMemory::default());
let scheduler = Scheduler::new(map);
let task = SucceedingTask {};

let id = scheduler.append_task((task, TaskOptions::new()).into());

let options = TaskOptions::new().with_max_retries_policy(5);
scheduler.reschedule(id, options.clone());

assert_eq!(scheduler.get_task(id).unwrap().options, options);
})
.await;
}

#[tokio::test]
async fn options_updated_for_retrying_tasks() {
let local = tokio::task::LocalSet::new();
local
.run_until(async move {
let map = StableBTreeMap::new(VectorMemory::default());
let scheduler = Scheduler::new(map);
let task = FailingTask {};

let id = scheduler
.append_task((task, TaskOptions::new().with_max_retries_policy(3)).into());
scheduler.run(()).unwrap();
tokio::time::sleep(Duration::from_millis(25)).await;

let options = TaskOptions::new().with_max_retries_policy(5);
scheduler.reschedule(id, options.clone());

assert_eq!(scheduler.get_task(id).unwrap().options, options);
})
.await;
}

#[tokio::test]
async fn reschedule_is_noop_for_non_existing_task() {
let local = tokio::task::LocalSet::new();
local
.run_until(async move {
let map: StableBTreeMap<u32, InnerScheduledTask<SucceedingTask>, _> =
StableBTreeMap::new(VectorMemory::default());
let scheduler = Scheduler::new(map);

scheduler.reschedule(42, TaskOptions::new());
})
.await;
}

#[tokio::test]
async fn options_updated_for_running_tasks() {
let local = tokio::task::LocalSet::new();
local
.run_until(async move {
let map = StableBTreeMap::new(VectorMemory::default());
let scheduler = Scheduler::new(map);
let task = AwaitingTask {};

let id = scheduler.append_task(
(
task.clone(),
TaskOptions::new()
.with_max_retries_policy(3)
.with_backoff_policy(BackoffPolicy::None),
)
.into(),
);
scheduler.run(()).unwrap();
task.complete();
tokio::time::sleep(Duration::from_millis(25)).await;

dbg!(scheduler.get_task(id));

scheduler.run(()).unwrap();
let options = TaskOptions::new()
.with_execute_after_timestamp_in_secs(1000)
.with_max_retries_policy(5);
scheduler.reschedule(id, options.clone());

assert_eq!(scheduler.get_task(id).unwrap().options, options);

task.complete();
tokio::time::sleep(Duration::from_millis(25)).await;

assert_ne!(scheduler.get_task(id).unwrap().options, options);
assert_eq!(scheduler.get_task(id).unwrap().options.failures, 1);
assert_eq!(
scheduler.get_task(id).unwrap().options.retry_strategy,
options.retry_strategy
);
})
.await;
}
}

mod test_find_id {
use ic_stable_structures::{StableBTreeMap, VectorMemory};

use crate::scheduler::test::test_delay::SimpleTask;
use crate::scheduler::{Scheduler, TaskScheduler};
use crate::task::TaskOptions;

#[test]
fn finding_id_by_task_returns_correct_id() {
let map = StableBTreeMap::new(VectorMemory::default());
let scheduler = Scheduler::new(map);
for id in 0..10 {
let id_in_scheduler =
scheduler.append_task((SimpleTask::StepOne { id }, TaskOptions::new()).into());
assert_eq!(id_in_scheduler, id);
}

const TO_FIND: u32 = 7;
let found = scheduler
.find_id(&|task| matches!(task, SimpleTask::StepOne { id } if id == TO_FIND));
assert_eq!(found, Some(TO_FIND));
}

#[test]
fn finding_id_by_task_returns_none_if_not_found() {
let map = StableBTreeMap::new(VectorMemory::default());
let scheduler = Scheduler::new(map);
for id in 0..10 {
let id_in_scheduler =
scheduler.append_task((SimpleTask::StepOne { id }, TaskOptions::new()).into());
assert_eq!(id_in_scheduler, id);
}

const TO_FIND: u32 = 42;
let found = scheduler
.find_id(&|task| matches!(task, SimpleTask::StepOne { id } if id == TO_FIND));
assert_eq!(found, None);
}
}
}

0 comments on commit fadfe78

Please sign in to comment.