Skip to content

Commit

Permalink
feat: introduce cron context
Browse files Browse the repository at this point in the history
TODO: handle pipe to backends better
  • Loading branch information
geofmureithi committed Dec 24, 2024
1 parent 5365a67 commit c4ee787
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 48 deletions.
30 changes: 9 additions & 21 deletions examples/cron/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,19 @@
use apalis::prelude::*;

use apalis_cron::CronContext;
use apalis_cron::CronStream;
use apalis_cron::Schedule;
use chrono::{DateTime, Utc};
use chrono::Local;
use std::str::FromStr;
use std::time::Duration;
// use std::time::Duration;
use tower::load_shed::LoadShedLayer;

#[derive(Clone)]
struct FakeService;
impl FakeService {
fn execute(&self, item: Reminder) {
dbg!(&item.0);
}
}
#[derive(Debug, Default)]
struct Reminder;

#[derive(Default, Debug, Clone)]
struct Reminder(DateTime<Utc>);
impl From<DateTime<Utc>> for Reminder {
fn from(t: DateTime<Utc>) -> Self {
Reminder(t)
}
}
async fn send_reminder(job: Reminder, svc: Data<FakeService>) {
svc.execute(job);
async fn send_reminder(_job: Reminder, ctx: CronContext<Local>) {
println!("Running cronjob for timestamp: {}", ctx.get_timestamp())
// Do something
}

#[tokio::main]
Expand All @@ -34,8 +23,7 @@ async fn main() {
.enable_tracing()
.layer(LoadShedLayer::new()) // Important when you have layers that block the service
.rate_limit(1, Duration::from_secs(2))
.data(FakeService)
.backend(CronStream::new(schedule))
.backend(CronStream::new_with_timezone(schedule, Local))
.build_fn(send_reminder);
Monitor::new().register(worker).run().await.unwrap();
worker.run().await;
}
26 changes: 10 additions & 16 deletions examples/persisted-cron/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,24 @@ use apalis_cron::CronStream;
use apalis_cron::Schedule;
use apalis_sql::sqlite::SqliteStorage;
use apalis_sql::sqlx::SqlitePool;
use chrono::{DateTime, Utc};
use chrono::DateTime;
use chrono::Local;
use serde::Deserialize;
use serde::Serialize;
use std::str::FromStr;
use std::time::Duration;

#[derive(Clone)]
struct FakeService;
impl FakeService {
fn execute(&self, item: Reminder) {
dbg!(&item.0);
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct Reminder(DateTime<Local>);

#[derive(Default, Debug, Clone, Serialize, Deserialize)]
struct Reminder(DateTime<Utc>);
impl From<DateTime<Utc>> for Reminder {
fn from(t: DateTime<Utc>) -> Self {
Reminder(t)
impl Default for Reminder {
fn default() -> Self {
Self(Local::now())
}
}
async fn send_reminder(job: Reminder, svc: Data<FakeService>) {
svc.execute(job);

async fn send_reminder(job: Reminder) {
println!("Reminder {:?}", job);
}

#[tokio::main]
Expand All @@ -50,7 +45,6 @@ async fn main() {
let worker = WorkerBuilder::new("morning-cereal")
.enable_tracing()
.rate_limit(1, Duration::from_secs(2))
.data(FakeService)
.backend(backend)
.build_fn(send_reminder);
Monitor::new().register(worker).run().await.unwrap();
Expand Down
16 changes: 12 additions & 4 deletions packages/apalis-core/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,12 @@ pub struct Parts<Ctx> {
//TODO: add State
}

impl<T, Ctx: Default> Request<T, Ctx> {
impl<T, Ctx> Request<T, Ctx> {
/// Creates a new [Request]
pub fn new(args: T) -> Self {
pub fn new(args: T) -> Self
where
Ctx: Default,
{
Self::new_with_data(args, Extensions::default(), Ctx::default())
}

Expand All @@ -63,7 +66,10 @@ impl<T, Ctx: Default> Request<T, Ctx> {
args: req,
parts: Parts {
context: ctx,
..Default::default()
task_id: Default::default(),
attempt: Default::default(),
data: Default::default(),
namespace: Default::default(),
},
}
}
Expand All @@ -74,8 +80,10 @@ impl<T, Ctx: Default> Request<T, Ctx> {
args: req,
parts: Parts {
context: ctx,
task_id: Default::default(),
attempt: Default::default(),
data,
..Default::default()
namespace: Default::default(),
},
}
}
Expand Down
49 changes: 42 additions & 7 deletions packages/apalis-cron/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ use apalis_core::request::RequestStream;
use apalis_core::storage::Storage;
use apalis_core::task::namespace::Namespace;
use apalis_core::worker::{Context, Worker};
use apalis_core::{error::Error, request::Request};
use apalis_core::{error::Error, request::Request, service_fn::FromRequest};
use chrono::{DateTime, TimeZone, Utc};
pub use cron::Schedule;
use futures::StreamExt;
Expand Down Expand Up @@ -111,12 +111,12 @@ where
}
impl<Req, Tz> CronStream<Req, Tz>
where
Req: From<DateTime<Tz>> + Send + Sync + 'static,
Req: Default + Send + Sync + 'static,
Tz: TimeZone + Send + Sync + 'static,
Tz::Offset: Send + Sync,
{
/// Convert to consumable
fn into_stream(self) -> RequestStream<Request<Req, ()>> {
fn into_stream(self) -> RequestStream<Request<Req, CronContext<Tz>>> {
let timezone = self.timezone.clone();
let stream = async_stream::stream! {
let mut schedule = self.schedule.upcoming_owned(timezone.clone());
Expand All @@ -129,7 +129,7 @@ where
apalis_core::sleep(to_sleep).await;
let timestamp = timezone.from_utc_datetime(&Utc::now().naive_utc());
let namespace = Namespace(format!("{}:{timestamp:?}", self.schedule));
let mut req = Request::new(Req::from(timestamp));
let mut req = Request::new_with_ctx(Req::default(), CronContext::new(timestamp));
req.parts.namespace = Some(namespace);
yield Ok(Some(req));
},
Expand Down Expand Up @@ -203,13 +203,48 @@ where
}
}

impl<Req, Tz, Res> Backend<Request<Req, ()>, Res> for CronStream<Req, Tz>
/// Context for all cron jobs
#[derive(Debug, Clone)]
pub struct CronContext<Tz: TimeZone> {
timestamp: DateTime<Tz>,
}

impl<Tz: TimeZone> Default for CronContext<Tz>
where
DateTime<Tz>: Default,
{
fn default() -> Self {
Self {
timestamp: Default::default(),
}
}
}

impl<Tz: TimeZone> CronContext<Tz> {
/// Create a new context provided a timestamp
pub fn new(timestamp: DateTime<Tz>) -> Self {
Self { timestamp }
}

/// Get the inner timestamp
pub fn get_timestamp(&self) -> &DateTime<Tz> {
&self.timestamp
}
}

impl<Req, Tz: TimeZone> FromRequest<Request<Req, CronContext<Tz>>> for CronContext<Tz> {
fn from_request(req: &Request<Req, CronContext<Tz>>) -> Result<Self, Error> {
Ok(req.parts.context.clone())
}
}

impl<Req, Tz, Res> Backend<Request<Req, CronContext<Tz>>, Res> for CronStream<Req, Tz>
where
Req: From<DateTime<Tz>> + Send + Sync + 'static,
Req: Default + Send + Sync + 'static,
Tz: TimeZone + Send + Sync + 'static,
Tz::Offset: Send + Sync,
{
type Stream = RequestStream<Request<Req, ()>>;
type Stream = RequestStream<Request<Req, CronContext<Tz>>>;

type Layer = Identity;

Expand Down

0 comments on commit c4ee787

Please sign in to comment.