Skip to content

Commit

Permalink
update shutdown singal
Browse files Browse the repository at this point in the history
  • Loading branch information
inboxsphere committed Dec 1, 2024
1 parent 2af7a39 commit a91b1e5
Show file tree
Hide file tree
Showing 9 changed files with 61 additions and 56 deletions.
28 changes: 14 additions & 14 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
[package]
name = "persistent-scheduler"
version = "0.2.0"
version = "0.2.1"
author = "Pedro Rocha"
edition = "2021"
description = "A high-performance task scheduling system developed in Rust using Tokio. This system supports task persistence, repeatable tasks, Cron-based scheduling, and one-time tasks, ensuring reliability and flexibility for managing time-based operations in various applications."
license-file = "LICENSE"
documentation = "https://docs.rs/persistent-scheduler/0.2.0/"
documentation = "https://docs.rs/persistent-scheduler/0.2.1/"
repository = "https://github.com/inboxsphere/persistent-scheduler"

[features]
Expand All @@ -22,15 +22,15 @@ chrono = "0.4.38"
chrono-tz = "0.10.0"
serde_json = "1.0.133"
ulid = "1.1.2"
tracing = "0.1.40"
tracing = "0.1.41"
async-trait = "0.1.83"
tracing-subscriber = "0.3.18"
tracing-subscriber = "0.3.19"

native_db = { version = "0.8.1", optional = true }
native_model = { version = "0.4.20", optional = true }
once_cell = { version = "1.19.0", optional = true }
itertools = { version = "0.13.0", optional = true }
sysinfo = { version = "0.32.0", optional = true }
sysinfo = { version = "0.32.1", optional = true }
ahash = "0.8.11"


Expand Down
7 changes: 3 additions & 4 deletions examples/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ async fn main() {
.start()
.await;
let mut tasks = Vec::new();
for _ in 0..1 {
for _ in 0..100 {
tasks.push(TaskAndDelay {
inner: MyTask1::new("name1".to_string(), 32),
delay_seconds: None,
Expand All @@ -32,8 +32,7 @@ async fn main() {
tokio::spawn(async move {
context.add_tasks(tasks).await.unwrap();
});

tokio::time::sleep(Duration::from_secs(100000000)).await;
tokio::time::sleep(Duration::from_secs(20)).await;
}

#[derive(Clone, Debug, Deserialize, Serialize)]
Expand All @@ -60,7 +59,7 @@ impl Task for MyTask1 {
Box::pin(async move {
// println!("{}", self.name);
// println!("{}", self.age);
tokio::time::sleep(Duration::from_secs(15)).await;
//tokio::time::sleep(Duration::from_secs(15)).await;
// println!("my task1 is running");
Ok(())
})
Expand Down
19 changes: 6 additions & 13 deletions src/core/flow.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
use crate::core::model::TaskMeta;
use crate::core::processor::Package;
use crate::core::shutdown::shutdown_signal;
use crate::core::store::TaskStore;
use crate::core::{handlers, processor::TaskProcessor, status_updater::TaskStatusUpdater};
use ahash::AHashMap;
use std::sync::Arc;
use tokio::signal;
use tokio::sync::RwLock;

use super::processor::Package;

pub struct TaskFlow<T>
where
T: TaskStore + Send + Sync + Clone + 'static,
Expand Down Expand Up @@ -96,16 +95,10 @@ where
let clone = Arc::clone(&self);
let signal_handler = async move {
// Listen for a shutdown signal (Ctrl+C).
match signal::ctrl_c().await {
Ok(()) => {
// Notify the task to shut down.
let mut triggered = clone.shutdown.write().await; // Acquire write lock to set shutdown state
*triggered = true; // Set the shutdown state to true
}
Err(err) => {
tracing::error!("Error listening for shutdown signal: {:?}", err);
}
}
shutdown_signal().await;
// Notify the task to shut down.
let mut triggered = clone.shutdown.write().await; // Acquire write lock to set shutdown state
*triggered = true; // Set the shutdown state to true
};

tokio::spawn(signal_handler);
Expand Down
1 change: 1 addition & 0 deletions src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub mod periodic;
mod processor;
mod result;
pub mod retry;
mod shutdown;
mod status_updater;
pub mod store;
pub mod task;
Expand Down
22 changes: 5 additions & 17 deletions src/core/periodic.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::core::error::BoxError;
use crate::core::shutdown::shutdown_signal;
use std::{future::Future, sync::Arc, time::Duration};
use tokio::signal;
use tokio::sync::RwLock;
use tokio::time::sleep;
use tracing::{error, info, warn};
Expand Down Expand Up @@ -73,22 +73,10 @@ impl PeriodicTask {
let signal_clone = Arc::clone(&self);
let signal_handler = async move {
// Listen for a shutdown signal (Ctrl+C).
match signal::ctrl_c().await {
Ok(()) => {
info!(
"Shutting down periodic task '{}'...",
&self.name
);
// Notify the task to shut down.
signal_clone.shutdown().await;
}
Err(err) => {
error!(
"Error listening for shutdown signal: {:?}",
BoxError::from(err)
);
}
}
shutdown_signal().await;
info!("Shutting down periodic task '{}'...", &self.name);
// Notify the task to shut down.
signal_clone.shutdown().await;
};

// Spawn the task runner and signal handler as asynchronous tasks.
Expand Down
25 changes: 25 additions & 0 deletions src/core/shutdown.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
use tokio::signal;

pub(crate) async fn shutdown_signal() {
let ctrl_c_signal = async {
signal::ctrl_c()
.await
.expect("Error installing Ctrl+C signal handler");
};

#[cfg(unix)]
let terminate_signal = async {
signal::unix::signal(signal::unix::SignalKind::terminate())
.expect("Error installing terminate signal handler")
.recv()
.await;
};

#[cfg(not(unix))]
let terminate_signal = std::future::pending::<()>();

tokio::select! {
_ = ctrl_c_signal => {},
_ = terminate_signal => {},
};
}
1 change: 0 additions & 1 deletion src/core/status_updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ impl TaskStatusUpdater {
UpdateRequest::PoisonPill => {
poison_pill += 1;
if poison_pill == processor_num {
println!("The status update worker has exited.");
break;
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/core/utils.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
#[macro_export]
#[macro_export(local_inner_macros)]
macro_rules! generate_token {
() => {
ulid::Ulid::new().to_string()
};
}

#[macro_export]
#[macro_export(local_inner_macros)]
macro_rules! utc_now {
() => {{
use chrono::Utc;
Expand Down

0 comments on commit a91b1e5

Please sign in to comment.