Skip to content

Commit

Permalink
fix: improved readability
Browse files Browse the repository at this point in the history
  • Loading branch information
veeso committed Jan 22, 2024
1 parent 981a48d commit 4f84d9e
Showing 1 changed file with 12 additions and 21 deletions.
33 changes: 12 additions & 21 deletions ic-task-scheduler/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,18 +84,16 @@ where
}

async fn run_with_timestamp(&mut self, now_timestamp_secs: u64) -> Result<u32> {
let mut to_be_reprocessed = Vec::new();
let mut task_execution_started = 0;

let tasks_running_count = TASKS_RUNNING.with_borrow(|tasks| tasks.len());

match tasks_running_count {
// checks tasks that are still in tasks running (something bad happened in the last cycle)
match TASKS_RUNNING.with_borrow(|tasks| tasks.len()) {
0 => {
// if there are no processing tasks, initialize the tasks to be processed
// HAPPY PATH: if there are no processing tasks, initialize the tasks o be processed
self.init_tasks_to_be_processed(now_timestamp_secs).await?;
}
1 => {
// if there are processing tasks with length 1,
// if there is only one processing task, we can assume that it panicked
// delete that task and mark it as panicked
let task = TASKS_RUNNING.with_borrow(|tasks| *tasks.iter().next().unwrap());
self.delete_unprocessable_task(task).await?;
Expand All @@ -109,55 +107,49 @@ where
}
}

let tasks_to_be_processed = TASKS_TO_BE_PROCESSED.with_borrow(|tasks| tasks.clone());

for task_id in tasks_to_be_processed {
// iterate over tasks to be processed, and execute it one by one
for task_id in TASKS_TO_BE_PROCESSED.with_borrow(|tasks| tasks.clone()) {
let lock = self.pending_tasks.lock();
let mut task = match lock.get(&task_id) {
Some(task) => task,
None => continue,
};
drop(lock);

if task.options.execute_after_timestamp_in_secs > now_timestamp_secs {
to_be_reprocessed.push(task);
continue;
}

task_execution_started += 1;
let key = task_id;
let mut task_scheduler = self.clone();
Self::spawn(async move {
// put task to processing tasks
task_scheduler
.put_task_to_processing_tasks(key)
.await
.unwrap();

// execute the task
if let Err(err) = task.task.execute(Box::new(task_scheduler.clone())).await {
task.options.failures += 1;
let (should_retry, retry_delay) = task
.options
.retry_strategy
.should_retry(task.options.failures);
if should_retry {
// remove task, but don't report state
// remove task from processing task, but don't report state
task_scheduler.remove_task_from_processing_tasks(key);

// re-add task to the queue
task.options.execute_after_timestamp_in_secs =
now_timestamp_secs + (retry_delay as u64);

task_scheduler.append_task(task.clone())
} else {
// report error
// remove task from processing and port its failure
task_scheduler
.remove_failed_task_from_processing_tasks(key, err)
.await
.unwrap();

return;
}
} else {
// remove task from queue
// in case of success, remove task from queue and report success
task_scheduler
.remove_completed_task_from_processing_tasks(key)
.await
Expand All @@ -166,7 +158,6 @@ where
});
}

self.append_tasks(to_be_reprocessed);
Ok(task_execution_started)
}

Expand Down

0 comments on commit 4f84d9e

Please sign in to comment.