diff --git a/Cargo.lock b/Cargo.lock index 98a3b33..9c6263f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "addr2line" @@ -369,9 +369,9 @@ checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" [[package]] name = "libc" -version = "0.2.161" +version = "0.2.167" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e9489c2807c139ffd9c1794f4af0ebe86a828db53ecdc7fea2111d0fed085d1" +checksum = "09d6582e104315a817dff97f75133544b2e094ee22447d2acf4a74e189ba06fc" [[package]] name = "linux-raw-sys" @@ -558,7 +558,7 @@ dependencies = [ [[package]] name = "persistent-scheduler" -version = "0.2.0" +version = "0.2.1" dependencies = [ "ahash", "async-trait", @@ -922,9 +922,9 @@ dependencies = [ [[package]] name = "sysinfo" -version = "0.32.0" +version = "0.32.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e3b5ae3f4f7d64646c46c4cae4e3f01d1c5d255c7406fdd7c7f999a94e488791" +checksum = "4c33cd241af0f2e9e3b5c32163b873b29956890b5342e6745b917ce9d490f4af" dependencies = [ "core-foundation-sys", "libc", @@ -1028,9 +1028,9 @@ dependencies = [ [[package]] name = "tracing" -version = "0.1.40" +version = "0.1.41" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" +checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0" dependencies = [ "pin-project-lite", "tracing-attributes", @@ -1039,9 +1039,9 @@ dependencies = [ [[package]] name = "tracing-attributes" -version = "0.1.27" +version = "0.1.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" +checksum = "395ae124c09f9e6918a2310af6038fba074bcf474ac352496d5910dd59a2226d" dependencies = [ "proc-macro2", "quote", @@ -1050,9 +1050,9 @@ dependencies = [ [[package]] name = "tracing-core" -version = "0.1.32" +version = "0.1.33" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" +checksum = "e672c95779cf947c5311f83787af4fa8fffd12fb27e4993211a84bdfd9610f9c" dependencies = [ "once_cell", "valuable", @@ -1071,9 +1071,9 @@ dependencies = [ [[package]] name = "tracing-subscriber" -version = "0.3.18" +version = "0.3.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b" +checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008" dependencies = [ "nu-ansi-term", "sharded-slab", diff --git a/Cargo.toml b/Cargo.toml index ff477c9..7356315 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,11 +1,11 @@ [package] name = "persistent-scheduler" -version = "0.2.0" +version = "0.2.1" author = "Pedro Rocha" edition = "2021" description = "A high-performance task scheduling system developed in Rust using Tokio. This system supports task persistence, repeatable tasks, Cron-based scheduling, and one-time tasks, ensuring reliability and flexibility for managing time-based operations in various applications." license-file = "LICENSE" -documentation = "https://docs.rs/persistent-scheduler/0.2.0/" +documentation = "https://docs.rs/persistent-scheduler/0.2.1/" repository = "https://github.com/inboxsphere/persistent-scheduler" [features] @@ -22,15 +22,15 @@ chrono = "0.4.38" chrono-tz = "0.10.0" serde_json = "1.0.133" ulid = "1.1.2" -tracing = "0.1.40" +tracing = "0.1.41" async-trait = "0.1.83" -tracing-subscriber = "0.3.18" +tracing-subscriber = "0.3.19" native_db = { version = "0.8.1", optional = true } native_model = { version = "0.4.20", optional = true } once_cell = { version = "1.19.0", optional = true } itertools = { version = "0.13.0", optional = true } -sysinfo = { version = "0.32.0", optional = true } +sysinfo = { version = "0.32.1", optional = true } ahash = "0.8.11" diff --git a/examples/basic.rs b/examples/basic.rs index a2a46fe..c573f86 100644 --- a/examples/basic.rs +++ b/examples/basic.rs @@ -22,7 +22,7 @@ async fn main() { .start() .await; let mut tasks = Vec::new(); - for _ in 0..1 { + for _ in 0..100 { tasks.push(TaskAndDelay { inner: MyTask1::new("name1".to_string(), 32), delay_seconds: None, @@ -32,8 +32,7 @@ async fn main() { tokio::spawn(async move { context.add_tasks(tasks).await.unwrap(); }); - - tokio::time::sleep(Duration::from_secs(100000000)).await; + tokio::time::sleep(Duration::from_secs(20)).await; } #[derive(Clone, Debug, Deserialize, Serialize)] @@ -60,7 +59,7 @@ impl Task for MyTask1 { Box::pin(async move { // println!("{}", self.name); // println!("{}", self.age); - tokio::time::sleep(Duration::from_secs(15)).await; + //tokio::time::sleep(Duration::from_secs(15)).await; // println!("my task1 is running"); Ok(()) }) diff --git a/src/core/flow.rs b/src/core/flow.rs index 6899cbb..21a3829 100644 --- a/src/core/flow.rs +++ b/src/core/flow.rs @@ -1,13 +1,12 @@ use crate::core::model::TaskMeta; +use crate::core::processor::Package; +use crate::core::shutdown::shutdown_signal; use crate::core::store::TaskStore; use crate::core::{handlers, processor::TaskProcessor, status_updater::TaskStatusUpdater}; use ahash::AHashMap; use std::sync::Arc; -use tokio::signal; use tokio::sync::RwLock; -use super::processor::Package; - pub struct TaskFlow where T: TaskStore + Send + Sync + Clone + 'static, @@ -96,16 +95,10 @@ where let clone = Arc::clone(&self); let signal_handler = async move { // Listen for a shutdown signal (Ctrl+C). - match signal::ctrl_c().await { - Ok(()) => { - // Notify the task to shut down. - let mut triggered = clone.shutdown.write().await; // Acquire write lock to set shutdown state - *triggered = true; // Set the shutdown state to true - } - Err(err) => { - tracing::error!("Error listening for shutdown signal: {:?}", err); - } - } + shutdown_signal().await; + // Notify the task to shut down. + let mut triggered = clone.shutdown.write().await; // Acquire write lock to set shutdown state + *triggered = true; // Set the shutdown state to true }; tokio::spawn(signal_handler); diff --git a/src/core/mod.rs b/src/core/mod.rs index dfd6549..3203292 100644 --- a/src/core/mod.rs +++ b/src/core/mod.rs @@ -9,6 +9,7 @@ pub mod periodic; mod processor; mod result; pub mod retry; +mod shutdown; mod status_updater; pub mod store; pub mod task; diff --git a/src/core/periodic.rs b/src/core/periodic.rs index 351ccc9..81e7f2f 100644 --- a/src/core/periodic.rs +++ b/src/core/periodic.rs @@ -1,6 +1,6 @@ use crate::core::error::BoxError; +use crate::core::shutdown::shutdown_signal; use std::{future::Future, sync::Arc, time::Duration}; -use tokio::signal; use tokio::sync::RwLock; use tokio::time::sleep; use tracing::{error, info, warn}; @@ -73,22 +73,10 @@ impl PeriodicTask { let signal_clone = Arc::clone(&self); let signal_handler = async move { // Listen for a shutdown signal (Ctrl+C). - match signal::ctrl_c().await { - Ok(()) => { - info!( - "Shutting down periodic task '{}'...", - &self.name - ); - // Notify the task to shut down. - signal_clone.shutdown().await; - } - Err(err) => { - error!( - "Error listening for shutdown signal: {:?}", - BoxError::from(err) - ); - } - } + shutdown_signal().await; + info!("Shutting down periodic task '{}'...", &self.name); + // Notify the task to shut down. + signal_clone.shutdown().await; }; // Spawn the task runner and signal handler as asynchronous tasks. diff --git a/src/core/shutdown.rs b/src/core/shutdown.rs new file mode 100644 index 0000000..c9050ee --- /dev/null +++ b/src/core/shutdown.rs @@ -0,0 +1,25 @@ +use tokio::signal; + +pub(crate) async fn shutdown_signal() { + let ctrl_c_signal = async { + signal::ctrl_c() + .await + .expect("Error installing Ctrl+C signal handler"); + }; + + #[cfg(unix)] + let terminate_signal = async { + signal::unix::signal(signal::unix::SignalKind::terminate()) + .expect("Error installing terminate signal handler") + .recv() + .await; + }; + + #[cfg(not(unix))] + let terminate_signal = std::future::pending::<()>(); + + tokio::select! { + _ = ctrl_c_signal => {}, + _ = terminate_signal => {}, + }; +} diff --git a/src/core/status_updater.rs b/src/core/status_updater.rs index 53797c9..902d5a6 100644 --- a/src/core/status_updater.rs +++ b/src/core/status_updater.rs @@ -52,7 +52,6 @@ impl TaskStatusUpdater { UpdateRequest::PoisonPill => { poison_pill += 1; if poison_pill == processor_num { - println!("The status update worker has exited."); break; } } diff --git a/src/core/utils.rs b/src/core/utils.rs index c673d20..e7a586a 100644 --- a/src/core/utils.rs +++ b/src/core/utils.rs @@ -1,11 +1,11 @@ -#[macro_export] +#[macro_export(local_inner_macros)] macro_rules! generate_token { () => { ulid::Ulid::new().to_string() }; } -#[macro_export] +#[macro_export(local_inner_macros)] macro_rules! utc_now { () => {{ use chrono::Utc;