Skip to content

Commit

Permalink
use sequence in task id generation
Browse files Browse the repository at this point in the history
  • Loading branch information
ufoscout committed Sep 2, 2024
1 parent 59f4092 commit 50a4cf8
Showing 1 changed file with 33 additions and 26 deletions.
59 changes: 33 additions & 26 deletions ic-task-scheduler/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,15 @@ where
});
}

/// Returns the next task id.
fn next_task_id(&self) -> u64 {
let mut lock = self.task_id_sequence.lock();
let id = *lock.get();
lock.set(id + 1)
.expect("Unable to access the stable storage to set the next poll id");
id
}

// We use tokio for testing instead of ic_kit::ic::spawn because the latter blocks the current thread
// waiting for the spawned futures to complete.
// This makes impossible to test concurrent behavior.
Expand Down Expand Up @@ -286,7 +295,8 @@ where

impl<T, P, S> TaskScheduler<T> for Scheduler<T, P, S>
where
T: 'static + Task + Serialize + DeserializeOwned,
T: 'static + Task + Serialize + DeserializeOwned + Clone,
T::Ctx: Clone,
P: 'static
+ IterableSortedMapStructure<u64, InnerScheduledTask<T>>
+ BTreeMapStructure<u64, InnerScheduledTask<T>>,
Expand All @@ -295,10 +305,7 @@ where
fn append_task(&self, task: ScheduledTask<T>) -> u64 {
let time_secs = time_secs();
let mut lock = self.pending_tasks.lock();
let key = lock
.last_key_value()
.map(|(val, _)| val + 1)
.unwrap_or_default();
let key = self.next_task_id();
lock.insert(
key,
InnerScheduledTask::with_status(
Expand All @@ -316,29 +323,10 @@ where
if tasks.is_empty() {
return vec![];
};

let time_secs = time_secs();
let mut lock = self.pending_tasks.lock();
let mut key = lock
.last_key_value()
.map(|(val, _)| val + 1)
.unwrap_or_default();

let mut keys = Vec::with_capacity(tasks.len());
for task in tasks {
lock.insert(
key,
InnerScheduledTask::with_status(
key,
task,
TaskStatus::Waiting {
timestamp_secs: time_secs,
},
),
);
keys.push(key);
key += 1;
}
keys.push(self.append_task(task));
}
keys
}

Expand Down Expand Up @@ -1369,5 +1357,24 @@ mod test {
.find_id(&|task| matches!(task, SimpleTask::StepOne { id } if id == TO_FIND));
assert_eq!(found, None);
}

#[test]
fn should_get_next_task_from_the_sequence() {
let base_task_id = 12345;
let map = StableBTreeMap::new(VectorMemory::default());
let sequence = StableCell::new(VectorMemory::default(), base_task_id).unwrap();
let scheduler = Scheduler::new(map, sequence);

for id in base_task_id..(base_task_id + 10) {
let id_in_scheduler =
scheduler.append_task((SimpleTask::StepOne { id }, TaskOptions::new()).into());
assert_eq!(id_in_scheduler, id);
}

let to_find = base_task_id + 7;
let found = scheduler
.find_id(&|task| matches!(task, SimpleTask::StepOne { id } if id == to_find));
assert_eq!(found, Some(to_find));
}
}
}

0 comments on commit 50a4cf8

Please sign in to comment.