Skip to content

Commit

Permalink
Create async_scheduler and thread_scheduler mods to make pub(crate) i…
Browse files Browse the repository at this point in the history
…nterface enforceable
  • Loading branch information
tyleragreen committed Sep 14, 2023
1 parent b29a914 commit 82dfb0f
Show file tree
Hide file tree
Showing 4 changed files with 197 additions and 184 deletions.
66 changes: 66 additions & 0 deletions tulsa/src/async_scheduler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
use std::collections::HashMap;
use std::sync::mpsc::Receiver;
use std::sync::{Arc, Mutex};
use tokio::runtime::Builder as TokioBuilder;
use tokio::task::JoinHandle as TaskJoinHandle;

use crate::model::{AsyncTask, Operation};

pub(crate) struct AsyncScheduler {
tasks: HashMap<usize, TaskJoinHandle<()>>,
num_runtime_threads: usize,
}

impl AsyncScheduler {
pub(crate) fn new() -> Self {
AsyncScheduler {
tasks: HashMap::new(),
num_runtime_threads: 1,
}
}

pub(crate) fn listen(&mut self, receiver: Arc<Mutex<Receiver<AsyncTask>>>) {
println!("AsyncScheduler initialized.");

let runtime = TokioBuilder::new_multi_thread()
.enable_all()
.worker_threads(self.num_runtime_threads)
.thread_name("scheduler-runtime")
.build()
.unwrap();

let r = receiver.clone();

runtime.block_on(async {
loop {
match r.lock().unwrap().recv() {
Ok(async_task) => self.handle(async_task),
Err(e) => eprintln!("{}", e),
}
}
});
}

fn start(&mut self, task: AsyncTask) {
let future = tokio::spawn(task.func);
self.tasks.insert(task.id, future);
}

fn stop(&mut self, task_id: usize) {
let task = &self.tasks[&task_id];
task.abort_handle().abort();
self.tasks.remove(&task_id);
println!("Stopped {}", task_id);
}

fn handle(&mut self, task: AsyncTask) {
match task.op {
Operation::Create => self.start(task),
Operation::Delete => self.stop(task.id),
Operation::Update => {
self.stop(task.id);
self.start(task);
}
}
}
}
6 changes: 4 additions & 2 deletions tulsa/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
pub mod model;
pub mod scheduler;
mod async_scheduler;
mod model;
mod scheduler;
mod thread_scheduler;

pub use model::{AsyncTask, SyncTask};
pub use scheduler::Scheduler;
186 changes: 4 additions & 182 deletions tulsa/src/scheduler.rs
Original file line number Diff line number Diff line change
@@ -1,188 +1,10 @@
use std::collections::HashMap;
use std::pin::Pin;
use std::sync::mpsc::Receiver;
use std::sync::{Arc, Mutex};
use std::thread::{self, Builder as ThreadBuilder, JoinHandle as ThreadJoinHandle};
use std::time::Duration;
use tokio::runtime::Builder as TokioBuilder;
use tokio::task::JoinHandle as TaskJoinHandle;
use std::thread::Builder as ThreadBuilder;

use crate::model::{AsyncTask, Operation, SyncTask};

struct AsyncScheduler {
tasks: HashMap<usize, TaskJoinHandle<()>>,
num_runtime_threads: usize,
}

impl AsyncScheduler {
fn new() -> Self {
AsyncScheduler {
tasks: HashMap::new(),
num_runtime_threads: 1,
}
}

fn listen(&mut self, receiver: Arc<Mutex<Receiver<AsyncTask>>>) {
println!("AsyncScheduler initialized.");

let runtime = TokioBuilder::new_multi_thread()
.enable_all()
.worker_threads(self.num_runtime_threads)
.thread_name("scheduler-runtime")
.build()
.unwrap();

let r = receiver.clone();

runtime.block_on(async {
loop {
match r.lock().unwrap().recv() {
Ok(async_task) => self.handle(async_task),
Err(e) => eprintln!("{}", e),
}
}
});
}

fn start(&mut self, task: AsyncTask) {
let future = tokio::spawn(task.func);
self.tasks.insert(task.id, future);
}

fn stop(&mut self, task_id: usize) {
let task = &self.tasks[&task_id];
task.abort_handle().abort();
self.tasks.remove(&task_id);
println!("Stopped {}", task_id);
}

fn handle(&mut self, task: AsyncTask) {
match task.op {
Operation::Create => self.start(task),
Operation::Delete => self.stop(task.id),
Operation::Update => {
self.stop(task.id);
self.start(task);
}
}
}
}

struct TaskRunner {
id: usize,
frequency: Duration,
thread_handle: Option<ThreadJoinHandle<()>>,
runner_data: Arc<Mutex<RunnerData>>,
}

struct RunnerData {
stopping: bool,
}

impl TaskRunner {
fn new(id: usize, frequency: Duration) -> Self {
let thread_handle = None;
let runner_data = Arc::new(Mutex::new(RunnerData { stopping: false }));
Self {
id,
frequency,
thread_handle,
runner_data,
}
}

fn start(&mut self, func: Pin<Box<dyn Fn() + Send + Sync + 'static>>) {
println!("Starting {}", self.id);
let frequency = self.frequency;
let runner_data = self.runner_data.clone();
let builder = ThreadBuilder::new().name("task".to_string());

let handle = builder.spawn(move || loop {
{
if runner_data.lock().unwrap().stopping {
break;
}
}

func();
thread::sleep(frequency);
});

self.thread_handle = Some(handle.unwrap());
}

fn stop(&mut self) {
println!("Stopping {}", self.id);
// Use a block so that the lock is released.
{
self.runner_data.lock().unwrap().stopping = true;
}

if let Some(handle) = self.thread_handle.take() {
match handle.join() {
Ok(_) => println!("Stopped {}", self.id),
Err(e) => panic!("{:?}", e),
}
}
}
}

struct ThreadScheduler {
tasks: Arc<Mutex<Vec<TaskRunner>>>,
}

impl ThreadScheduler {
fn new() -> Self {
ThreadScheduler {
tasks: Arc::new(Mutex::new(Vec::<TaskRunner>::new())),
}
}

fn listen(&mut self, receiver: Arc<Mutex<Receiver<SyncTask>>>) {
println!("ThreadScheduler initialized.");

let r = receiver.clone();
loop {
match r.lock().unwrap().recv() {
Ok(task) => self.handle(task),
Err(e) => eprintln!("{}", e),
}
}
}

fn find_index(&mut self, id: usize) -> Option<usize> {
self.tasks
.lock()
.unwrap()
.iter()
.position(|runner| runner.id == id)
}

fn start(&mut self, task: SyncTask) {
let mut runner = TaskRunner::new(task.id, task.frequency);
runner.start(task.func);
self.tasks.lock().unwrap().push(runner);
}

fn stop(&mut self, task_id: usize) {
if let Some(idx) = self.find_index(task_id) {
let mut runners = self.tasks.lock().unwrap();
runners[idx].stop();
runners.remove(idx);
}
}

fn handle(&mut self, task: SyncTask) {
match task.op {
Operation::Create => self.start(task),
Operation::Delete => self.stop(task.id),
Operation::Update => {
self.stop(task.id);
self.start(task);
}
}
}
}
use crate::async_scheduler::AsyncScheduler;
use crate::model::{AsyncTask, SyncTask};
use crate::thread_scheduler::ThreadScheduler;

pub struct Scheduler<T> {
receiver: Arc<Mutex<Receiver<T>>>,
Expand Down
123 changes: 123 additions & 0 deletions tulsa/src/thread_scheduler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
use std::pin::Pin;
use std::sync::mpsc::Receiver;
use std::sync::{Arc, Mutex};
use std::thread::{sleep, Builder as ThreadBuilder, JoinHandle as ThreadJoinHandle};
use std::time::Duration;

use crate::model::{Operation, SyncTask};

struct TaskRunner {
id: usize,
frequency: Duration,
thread_handle: Option<ThreadJoinHandle<()>>,
runner_data: Arc<Mutex<RunnerData>>,
}

struct RunnerData {
stopping: bool,
}

impl TaskRunner {
fn new(id: usize, frequency: Duration) -> Self {
let thread_handle = None;
let runner_data = Arc::new(Mutex::new(RunnerData { stopping: false }));
Self {
id,
frequency,
thread_handle,
runner_data,
}
}

fn start(&mut self, func: Pin<Box<dyn Fn() + Send + Sync + 'static>>) {
println!("Starting {}", self.id);
let frequency = self.frequency;
let runner_data = self.runner_data.clone();
let builder = ThreadBuilder::new().name("task".to_string());

let handle = builder.spawn(move || loop {
{
if runner_data.lock().unwrap().stopping {
break;
}
}

func();
sleep(frequency);
});

self.thread_handle = Some(handle.unwrap());
}

fn stop(&mut self) {
println!("Stopping {}", self.id);
// Use a block so that the lock is released.
{
self.runner_data.lock().unwrap().stopping = true;
}

if let Some(handle) = self.thread_handle.take() {
match handle.join() {
Ok(_) => println!("Stopped {}", self.id),
Err(e) => panic!("{:?}", e),
}
}
}
}

pub(crate) struct ThreadScheduler {
tasks: Arc<Mutex<Vec<TaskRunner>>>,
}

impl ThreadScheduler {
pub(crate) fn new() -> Self {
ThreadScheduler {
tasks: Arc::new(Mutex::new(Vec::<TaskRunner>::new())),
}
}

pub(crate) fn listen(&mut self, receiver: Arc<Mutex<Receiver<SyncTask>>>) {
println!("ThreadScheduler initialized.");

let r = receiver.clone();
loop {
match r.lock().unwrap().recv() {
Ok(task) => self.handle(task),
Err(e) => eprintln!("{}", e),
}
}
}

fn find_index(&mut self, id: usize) -> Option<usize> {
self.tasks
.lock()
.unwrap()
.iter()
.position(|runner| runner.id == id)
}

fn start(&mut self, task: SyncTask) {
let mut runner = TaskRunner::new(task.id, task.frequency);
runner.start(task.func);
self.tasks.lock().unwrap().push(runner);
}

fn stop(&mut self, task_id: usize) {
if let Some(idx) = self.find_index(task_id) {
let mut runners = self.tasks.lock().unwrap();
runners[idx].stop();
runners.remove(idx);
}
}

fn handle(&mut self, task: SyncTask) {
match task.op {
Operation::Create => self.start(task),
Operation::Delete => self.stop(task.id),
Operation::Update => {
self.stop(task.id);
self.start(task);
}
}
}
}

0 comments on commit 82dfb0f

Please sign in to comment.