diff --git a/Cargo.lock b/Cargo.lock index e448b43b..6d97cc9d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1492,7 +1492,7 @@ version = "0.1.0" dependencies = [ "axum 0.7.5", "futures", - "h3 0.0.5", + "h3 0.0.6", "h3-quinn", "http-body-util", "hyper 1.3.1", @@ -1913,9 +1913,9 @@ dependencies = [ [[package]] name = "h3" -version = "0.0.5" +version = "0.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d5069de1c2ac82d9e361b07f2b8a2c582ec071750e063530fc7f3b5197e24805" +checksum = "5e7675a0963b47a6d12fe44c279918b4ffb19baee838ac37f48d2722ad5bc6ab" dependencies = [ "bytes", "fastrand", @@ -1923,18 +1923,17 @@ dependencies = [ "http 1.1.0", "pin-project-lite", "tokio", - "tracing", ] [[package]] name = "h3-quinn" -version = "0.0.6" +version = "0.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b8c01d99d7cf812fd34ddf135e6c940df9e24f2e759dbc7179fb0e54d4bd6551" +checksum = "17c799f413fceeea505236c4d8132f084ff4b55a652288d91439ee93dc24d855" dependencies = [ "bytes", "futures", - "h3 0.0.5", + "h3 0.0.6", "quinn", "tokio", "tokio-util", @@ -2577,11 +2576,11 @@ checksum = "ffbee8634e0d45d258acb448e7eaab3fce7a0a467395d4d9f228e3c1f01fb2e4" [[package]] name = "matchers" -version = "0.1.0" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" +checksum = "d1525a2a28c7f4fa0fc98bb91ae755d1e2d1505079e05539e35bc876b5d65ae9" dependencies = [ - "regex-automata 0.1.10", + "regex-automata", ] [[package]] @@ -2932,7 +2931,7 @@ dependencies = [ "opentelemetry-http", "opentelemetry-proto", "opentelemetry_sdk", - "prost", + "prost 0.12.4", "thiserror", "tokio", "tonic", @@ -2946,7 +2945,7 @@ checksum = "984806e6cf27f2b49282e2a05e288f30594f3dbc74eb7a6e99422bc48ed78162" dependencies = [ "opentelemetry", "opentelemetry_sdk", - "prost", + "prost 0.12.4", "tonic", ] @@ -3058,7 +3057,7 @@ checksum = "2580e33f2292d34be285c5bc3dba5259542b083cfad6037b6d70345f24dcb735" dependencies = [ "heck 0.4.1", "itertools 0.11.0", - "prost", + "prost 0.12.4", "prost-types", ] @@ -3210,9 +3209,9 @@ dependencies = [ "nix", "once_cell", "parking_lot", - "prost", + "prost 0.12.4", "prost-build", - "prost-derive", + "prost-derive 0.12.4", "sha2", "smallvec", "symbolic-demangle", @@ -3294,7 +3293,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d0f5d036824e4761737860779c906171497f6d55681139d8312388f8fe398922" dependencies = [ "bytes", - "prost-derive", + "prost-derive 0.12.4", +] + +[[package]] +name = "prost" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e13db3d3fde688c61e2446b4d843bc27a7e8af269a69440c0308021dc92333cc" +dependencies = [ + "bytes", + "prost-derive 0.13.1", ] [[package]] @@ -3311,7 +3320,7 @@ dependencies = [ "once_cell", "petgraph", "prettyplease", - "prost", + "prost 0.12.4", "prost-types", "regex", "syn 2.0.60", @@ -3331,13 +3340,26 @@ dependencies = [ "syn 2.0.60", ] +[[package]] +name = "prost-derive" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18bec9b0adc4eba778b33684b7ba3e7137789434769ee3ce3930463ef904cfca" +dependencies = [ + "anyhow", + "itertools 0.13.0", + "proc-macro2", + "quote", + "syn 2.0.60", +] + [[package]] name = "prost-types" version = "0.12.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3235c33eb02c1f1e212abdbe34c78b264b038fb58ca612664343271e36e55ffe" dependencies = [ - "prost", + "prost 0.12.4", ] [[package]] @@ -3534,17 +3556,8 @@ checksum = "c117dbdfde9c8308975b6a18d71f3f385c89461f7b3fb054288ecf2a2058ba4c" dependencies = [ "aho-corasick", "memchr", - "regex-automata 0.4.6", - "regex-syntax 0.8.3", -] - -[[package]] -name = "regex-automata" -version = "0.1.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" -dependencies = [ - "regex-syntax 0.6.29", + "regex-automata", + "regex-syntax", ] [[package]] @@ -3555,7 +3568,7 @@ checksum = "86b83b8b9847f9bf95ef68afb0b8e6cdb80f498442f5179a29fad448fcc1eaea" dependencies = [ "aho-corasick", "memchr", - "regex-syntax 0.8.3", + "regex-syntax", ] [[package]] @@ -3564,12 +3577,6 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "30b661b2f27137bdbc16f00eda72866a92bb28af1753ffbd56744fb6e2e9cd8e" -[[package]] -name = "regex-syntax" -version = "0.6.29" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" - [[package]] name = "regex-syntax" version = "0.8.3" @@ -3950,7 +3957,7 @@ dependencies = [ "const-str", "flate2", "futures", - "h3 0.0.5", + "h3 0.0.6", "h3-quinn", "h3-webtransport", "http 1.1.0", @@ -3970,7 +3977,7 @@ dependencies = [ "pin-project", "pprof", "prometheus-client", - "prost", + "prost 0.13.1", "quinn", "rand", "regex", @@ -4030,7 +4037,7 @@ dependencies = [ "num_cpus", "once_cell", "png", - "prost", + "prost 0.12.4", "reqwest", "rgb", "scopeguard", @@ -4055,7 +4062,7 @@ version = "0.0.0" dependencies = [ "pbjson", "pbjson-build", - "prost", + "prost 0.12.4", "prost-build", "serde", "tonic", @@ -4775,7 +4782,7 @@ dependencies = [ "hyper-timeout", "percent-encoding", "pin-project", - "prost", + "prost 0.12.4", "tokio", "tokio-stream", "tower", diff --git a/foundations/Cargo.toml b/foundations/Cargo.toml index e5710901..7cc98e4f 100644 --- a/foundations/Cargo.toml +++ b/foundations/Cargo.toml @@ -28,10 +28,10 @@ itertools = { version = "0.13", optional = true } scuffle-foundations-macros = { path = "./macros", optional = true, version = "0.0.0" } pprof = { version = "0.13", optional = true, features = ["prost-codec"] } -prost = { version = "0.12", optional = true } +prost = { version = "0.13.1", optional = true } flate2 = { version = "1", optional = true } -matchers = { version = "0.1", optional = true } +matchers = { version = "0.2.0", optional = true } regex = { version = "1", optional = true } once_cell = { version = "1", optional = true } scc = { version = "2", optional = true } @@ -55,8 +55,8 @@ tower = { version = "0.4", optional = true } hyper = { version = "1", optional = true } hyper-util = { version = "0.1", optional = true } http = { version = "1", optional = true } -h3 = { version = "0.0.5", optional = true } -h3-quinn = { version = "0.0.6", optional = true } +h3 = { version = "0.0.6", optional = true } +h3-quinn = { version = "0.0.7", optional = true } h3-webtransport = { version = "0.1.0", optional = true } quinn = { version = "0.11", default-features = false, features = ["runtime-tokio", "rustls", "ring" ], optional = true } rustls = { version = "0.23", optional = true } @@ -171,7 +171,7 @@ cli = [ "settings", ] -dataloader = [ +batcher = [ "tokio/sync", "tokio/time", "runtime", @@ -246,7 +246,7 @@ default = [ "telemetry-server", "context", "signal", - "dataloader", + "batcher", "health-check", "http", "http-tls", diff --git a/foundations/examples/Cargo.toml b/foundations/examples/Cargo.toml index 954eac60..94745c21 100644 --- a/foundations/examples/Cargo.toml +++ b/foundations/examples/Cargo.toml @@ -37,6 +37,6 @@ futures = "0.3.21" tower = "0.4" quinn = "0.11" axum = "0.7" -h3 = "0.0.5" +h3 = "0.0.6" # h3-webtransport = "0.1" -h3-quinn = "0.0.6" +h3-quinn = "0.0.7" diff --git a/foundations/src/batcher/dataloader.rs b/foundations/src/batcher/dataloader.rs new file mode 100644 index 00000000..845605cc --- /dev/null +++ b/foundations/src/batcher/dataloader.rs @@ -0,0 +1,279 @@ +use std::collections::HashMap; +use std::hash::{BuildHasher, RandomState}; +use std::marker::PhantomData; + +use super::{BatchOperation, Batcher, BatcherConfig, BatcherDataloader, BatcherError}; + +#[allow(type_alias_bounds)] +pub type LoaderOutput, S: BuildHasher = RandomState> = Result, L::Error>; + +pub trait Loader { + type Key: Clone + Eq + std::hash::Hash + Send + Sync; + type Value: Clone + Send + Sync; + type Error: Clone + std::error::Error + Send + Sync; + + fn config(&self) -> BatcherConfig { + BatcherConfig { + concurrency: 10, + max_batch_size: 1000, + sleep_duration: std::time::Duration::from_millis(5), + } + } + + fn load(&self, keys: Vec) -> impl std::future::Future> + Send; +} + +pub struct DataLoader + Send + Sync, S: BuildHasher + Default + Send + Sync = RandomState>( + Batcher>, +); + +impl + Send + Sync + 'static, S: BuildHasher + Default + Send + Sync + 'static> DataLoader { + pub fn new(loader: L) -> Self { + Self(Batcher::new(Wrapper(loader, PhantomData))) + } + + pub async fn load(&self, key: L::Key) -> Result, BatcherError> { + self.load_many(std::iter::once(key.clone())) + .await + .map(|mut map| map.remove(&key)) + } + + pub async fn load_many( + &self, + keys: impl IntoIterator, + ) -> Result, BatcherError> { + self.0.execute_many(keys).await + } +} + +struct Wrapper, S: BuildHasher + Default = RandomState>(L, PhantomData); + +impl + Send + Sync, S: BuildHasher + Default + Send + Sync> BatchOperation for Wrapper { + type Error = L::Error; + type Item = L::Key; + type Mode = BatcherDataloader; + type Response = L::Value; + + fn config(&self) -> BatcherConfig { + self.0.config() + } + + fn process( + &self, + documents: >::Input, + ) -> impl std::future::Future>::OperationOutput, Self::Error>> + Send + { + async move { self.0.load(documents.into_iter().collect()).await } + } +} + +#[cfg(test)] +mod tests { + use std::collections::hash_map::RandomState; + use std::collections::HashMap; + use std::convert::Infallible; + + use super::{DataLoader, LoaderOutput}; + use crate::batcher::BatcherConfig; + + type DynBoxLoader = Box) -> HashMap + Sync + Send>; + + struct LoaderTest { + results: DynBoxLoader, + config: BatcherConfig, + } + + impl super::Loader for LoaderTest { + type Error = Infallible; + type Key = u64; + type Value = u64; + + fn config(&self) -> BatcherConfig { + self.config.clone() + } + + async fn load(&self, keys: Vec) -> LoaderOutput { + tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; + Ok((self.results)(keys)) + } + } + + #[tokio::test] + async fn test_data_loader() { + let run_count = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)); + + let loader = LoaderTest { + results: Box::new(move |keys| { + let mut results = HashMap::new(); + + assert_eq!(run_count.fetch_add(1, std::sync::atomic::Ordering::SeqCst), 0); + + assert_eq!(keys.len(), 250); + + for key in keys { + assert!(!results.contains_key(&key)); + + results.insert(key, key * 2); + } + + results + }), + config: BatcherConfig { + concurrency: 10, + max_batch_size: 1000, + sleep_duration: std::time::Duration::from_millis(5), + }, + }; + + let dataloader = DataLoader::new(loader); + + let futures = (0..250) + .map(|i| dataloader.load(i as u64)) + .chain((0..250).map(|i| dataloader.load(i as u64))); + + let results = futures::future::join_all(futures).await; + + let expected = (0..250) + .map(|i| Ok(Some(i * 2))) + .chain((0..250).map(|i| Ok(Some(i * 2)))) + .collect::>(); + + assert_eq!(results, expected); + } + + #[tokio::test] + async fn test_data_loader_larger() { + let loader = LoaderTest { + results: Box::new(move |keys| { + let mut results = HashMap::new(); + + assert!(keys.len() <= 1000); + + for key in keys { + assert!(!results.contains_key(&key)); + + results.insert(key, key * 2); + } + + results + }), + config: BatcherConfig { + concurrency: 10, + max_batch_size: 1000, + sleep_duration: std::time::Duration::from_millis(5), + }, + }; + + let dataloader = DataLoader::new(loader); + + const LIMIT: usize = 10_000; + + let results = futures::future::join_all((0..LIMIT).map(|i| dataloader.load(i as u64))).await; + + let expected = (0..LIMIT).map(|i| Ok(Some(i as u64 * 2))).collect::>(); + + assert_eq!(results, expected); + } + + #[tokio::test] + async fn test_data_loader_change_batch_size() { + let loader = LoaderTest { + results: Box::new(move |keys| { + let mut results = HashMap::new(); + + assert!(keys.len() <= 3000); + + for key in keys { + assert!(!results.contains_key(&key)); + + results.insert(key, key * 2); + } + + results + }), + config: BatcherConfig { + concurrency: 10, + max_batch_size: 3000, + sleep_duration: std::time::Duration::from_millis(5), + }, + }; + + let dataloader = DataLoader::new(loader); + + let futures = (0..5000).map(|i| dataloader.load(i as u64)); + + let results = futures::future::join_all(futures).await; + + let expected = (0..5000).map(|i| Ok(Some(i * 2))).collect::>(); + + assert_eq!(results, expected); + } + + #[tokio::test] + async fn test_data_loader_change_duration() { + let loader = LoaderTest { + results: Box::new(move |keys| { + let mut results = HashMap::new(); + + assert!(keys.len() <= 1000); + + for key in keys { + assert!(!results.contains_key(&key)); + + results.insert(key, key * 2); + } + + results + }), + config: BatcherConfig { + concurrency: 10, + max_batch_size: 1000, + sleep_duration: std::time::Duration::from_millis(100), + }, + }; + + let dataloader = DataLoader::new(loader); + + let futures = (0..250) + .map(|i| dataloader.load(i as u64)) + .chain((0..250).map(|i| dataloader.load(i as u64))); + + let results = futures::future::join_all(futures).await; + + let expected = (0..250) + .map(|i| Ok(Some(i * 2))) + .chain((0..250).map(|i| Ok(Some(i * 2)))) + .collect::>(); + + assert_eq!(results, expected); + } + + #[tokio::test] + async fn test_data_loader_value_deduplication() { + let run_count = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)); + + let loader = LoaderTest { + results: Box::new({ + let run_count = run_count.clone(); + move |keys| { + run_count.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + keys.iter().map(|&key| (key, key * 2)).collect() + } + }), + config: BatcherConfig { + concurrency: 10, + max_batch_size: 1000, + sleep_duration: std::time::Duration::from_millis(5), + }, + }; + + let dataloader = DataLoader::new(loader); + + let futures = vec![dataloader.load(5), dataloader.load(5), dataloader.load(5)]; + + let results: Vec<_> = futures::future::join_all(futures).await; + + assert_eq!(results, vec![Ok(Some(10)), Ok(Some(10)), Ok(Some(10))]); + assert_eq!(run_count.load(std::sync::atomic::Ordering::SeqCst), 1); // Ensure the loader was only called once + } +} diff --git a/foundations/src/batcher/mod.rs b/foundations/src/batcher/mod.rs new file mode 100644 index 00000000..2d562c46 --- /dev/null +++ b/foundations/src/batcher/mod.rs @@ -0,0 +1,426 @@ +use std::collections::{HashMap, HashSet}; +use std::hash::BuildHasher; +use std::marker::PhantomData; +use std::sync::atomic::{AtomicU64, AtomicUsize}; +use std::sync::Arc; + +use tokio::sync::OnceCell; + +pub mod dataloader; + +pub trait BatchMode: Sized { + type Input: Send + Sync; + type Output: Send + Sync; + type OutputItem: Send + Sync; + type OperationOutput: Send + Sync; + type FinalOutput: Send + Sync; + type Tracker: Send + Sync; + + fn new_input() -> Self::Input; + fn new_tracker() -> Self::Tracker; + fn new_output() -> Self::Output; + + fn input_add(input: &mut Self::Input, tracker: &mut Self::Tracker, item: T::Item); + fn input_len(input: &Self::Input) -> usize; + + fn tracked_output( + result: Option<&Result>>, + tracker: Self::Tracker, + output: &mut Self::Output, + ) -> Result<(), BatcherError>; + + fn final_output_into_iter( + output: Self::FinalOutput, + ) -> Result, BatcherError>; + + fn filter_item_iter(item: impl IntoIterator) -> impl IntoIterator; + + fn output_item_to_result(item: Self::OutputItem) -> Result>; + + fn output_into_final_output(output: Result>) -> Self::FinalOutput; +} + +pub struct BatcherNormalMode; + +impl BatchMode for BatcherNormalMode { + type FinalOutput = Self::Output; + type Input = Vec; + type OperationOutput = Vec>; + type Output = Vec; + type OutputItem = Result>; + type Tracker = std::ops::Range; + + fn new_input() -> Self::Input { + Vec::new() + } + + fn new_tracker() -> Self::Tracker { + 0..0 + } + + fn new_output() -> Self::Output { + Vec::new() + } + + fn input_add(input: &mut Self::Input, tracker: &mut Self::Tracker, item: T::Item) { + input.push(item); + tracker.end = input.len(); + } + + fn input_len(input: &Self::Input) -> usize { + input.len() + } + + fn tracked_output( + result: Option<&Result>>, + tracker: Self::Tracker, + output: &mut Self::Output, + ) -> Result<(), BatcherError> { + for i in tracker.into_iter() { + match result { + Some(Ok(r)) => output.push( + r.get(i) + .cloned() + .transpose() + .map_err(BatcherError::Batch) + .transpose() + .unwrap_or(Err(BatcherError::MissingResult)), + ), + Some(Err(e)) => output.push(Err(e.clone())), + None => output.push(Err(BatcherError::Panic)), + } + } + + Ok(()) + } + + fn final_output_into_iter( + output: Self::FinalOutput, + ) -> Result, BatcherError> { + Ok(output) + } + + fn filter_item_iter(item: impl IntoIterator) -> impl IntoIterator { + item + } + + fn output_item_to_result(item: Self::OutputItem) -> Result> { + item + } + + fn output_into_final_output( + output: Result::Error>>, + ) -> Self::FinalOutput { + output.expect("erro shouldnt be possible here") + } +} + +pub struct BatcherDataloader(PhantomData); + +impl BatchMode for BatcherDataloader +where + T::Item: Clone + std::hash::Hash + Eq, +{ + type FinalOutput = Result, BatcherError>; + type Input = HashSet; + type OperationOutput = HashMap; + type Output = Self::OperationOutput; + type OutputItem = T::Response; + type Tracker = Vec; + + fn new_input() -> Self::Input { + HashSet::default() + } + + fn new_tracker() -> Self::Tracker { + Vec::new() + } + + fn new_output() -> Self::Output { + HashMap::default() + } + + fn input_add(input: &mut Self::Input, tracker: &mut Self::Tracker, item: T::Item) { + input.insert(item.clone()); + tracker.push(item); + } + + fn input_len(input: &Self::Input) -> usize { + input.len() + } + + fn tracked_output( + result: Option<&Result>>, + tracker: Self::Tracker, + output: &mut Self::Output, + ) -> Result<(), BatcherError> { + for key in tracker.clone().into_iter() { + match result { + Some(Ok(res)) => { + if let Some(value) = res.get(&key).cloned() { + output.insert(key, value); + } + } + Some(Err(e)) => { + return Err(e.clone()); + } + None => { + return Err(BatcherError::Panic); + } + } + } + + Ok(()) + } + + fn final_output_into_iter( + output: Self::FinalOutput, + ) -> Result, BatcherError> { + output.map(|output| output.into_values()) + } + + fn filter_item_iter(item: impl IntoIterator) -> impl IntoIterator { + item + } + + fn output_item_to_result(item: Self::OutputItem) -> Result> { + Ok(item) + } + + fn output_into_final_output( + output: Result::Error>>, + ) -> Self::FinalOutput { + output + } +} + +pub trait BatchOperation: Send + Sync { + type Item: Send + Sync; + type Response: Clone + Send + Sync; + type Error: Clone + std::fmt::Debug + Send + Sync; + type Mode: BatchMode; + + fn config(&self) -> BatcherConfig; + + fn process( + &self, + documents: >::Input, + ) -> impl std::future::Future>::OperationOutput, Self::Error>> + Send; +} + +pub struct Batcher { + inner: Arc>, + _auto_loader_abort: CancelOnDrop, +} + +struct CancelOnDrop(tokio::task::AbortHandle); + +impl Drop for CancelOnDrop { + fn drop(&mut self) { + self.0.abort(); + } +} + +struct BatcherInner { + semaphore: tokio::sync::Semaphore, + notify: tokio::sync::Notify, + sleep_duration: AtomicU64, + batch_id: AtomicU64, + max_batch_size: AtomicUsize, + operation: T, + active_batch: tokio::sync::RwLock>>, +} + +struct Batch { + id: u64, + expires_at: tokio::time::Instant, + done: DropGuardCancellationToken, + ops: >::Input, + results: Arc>::OperationOutput, BatcherError>>>, +} + +struct DropGuardCancellationToken(tokio_util::sync::CancellationToken); + +impl Drop for DropGuardCancellationToken { + fn drop(&mut self) { + self.0.cancel(); + } +} + +impl DropGuardCancellationToken { + fn new() -> Self { + Self(tokio_util::sync::CancellationToken::new()) + } + + fn child_token(&self) -> tokio_util::sync::CancellationToken { + self.0.child_token() + } +} + +struct BatchInsertWaiter { + id: u64, + done: tokio_util::sync::CancellationToken, + tracker: >::Tracker, + results: Arc>::OperationOutput, BatcherError>>>, +} + +#[derive(thiserror::Error, Debug, Clone, PartialEq, Copy, Eq, Hash, Ord, PartialOrd)] +pub enum BatcherError { + #[error("failed to acquire semaphore")] + AcquireSemaphore, + #[error("panic in batch inserter")] + Panic, + #[error("missing result")] + MissingResult, + #[error("batch failed with: {0}")] + Batch(E), +} + +impl From for BatcherError { + fn from(value: E) -> Self { + Self::Batch(value) + } +} + +impl Batch { + async fn run(self, inner: Arc>) { + self.results + .get_or_init(|| async move { + let _ticket = inner.semaphore.acquire().await.map_err(|_| BatcherError::AcquireSemaphore)?; + Ok(inner.operation.process(self.ops).await.map_err(BatcherError::Batch)?) + }) + .await; + } +} + +#[derive(Clone)] +pub struct BatcherConfig { + pub concurrency: usize, + pub max_batch_size: usize, + pub sleep_duration: std::time::Duration, +} + +impl BatcherInner { + fn spawn_batch(self: &Arc, batch: Batch) { + tokio::spawn(batch.run(self.clone())); + } + + fn new_batch(&self) -> Batch { + let id = self.batch_id.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + let expires_at = tokio::time::Instant::now() + + tokio::time::Duration::from_nanos(self.sleep_duration.load(std::sync::atomic::Ordering::Relaxed)); + + Batch { + id, + expires_at, + ops: T::Mode::new_input(), + done: DropGuardCancellationToken::new(), + results: Arc::new(OnceCell::new()), + } + } + + async fn batch_inserts(self: &Arc, documents: impl IntoIterator) -> Vec> { + let mut waiters = vec![]; + let mut batch = self.active_batch.write().await; + let max_documents = self.max_batch_size.load(std::sync::atomic::Ordering::Relaxed); + + for document in T::Mode::filter_item_iter(documents) { + if batch + .as_ref() + .map(|b| T::Mode::input_len(&b.ops) >= max_documents) + .unwrap_or(true) + { + if let Some(b) = batch.take() { + self.spawn_batch(b); + } + + *batch = Some(self.new_batch()); + self.notify.notify_one(); + } + + let Some(b) = batch.as_mut() else { + unreachable!("batch should be Some"); + }; + + if waiters.last().map(|w: &BatchInsertWaiter| w.id != b.id).unwrap_or(true) { + waiters.push(BatchInsertWaiter { + id: b.id, + done: b.done.child_token(), + results: b.results.clone(), + tracker: T::Mode::new_tracker(), + }); + } + + let tracker = &mut waiters.last_mut().unwrap().tracker; + T::Mode::input_add(&mut b.ops, tracker, document); + } + + waiters + } +} + +impl Batcher { + pub fn new(operation: T) -> Self { + let config = operation.config(); + + let inner = Arc::new(BatcherInner { + semaphore: tokio::sync::Semaphore::new(config.concurrency), + notify: tokio::sync::Notify::new(), + batch_id: AtomicU64::new(0), + active_batch: tokio::sync::RwLock::new(None), + sleep_duration: AtomicU64::new(config.sleep_duration.as_nanos() as u64), + max_batch_size: AtomicUsize::new(config.max_batch_size), + operation, + }); + + Self { + inner: inner.clone(), + _auto_loader_abort: CancelOnDrop( + tokio::task::spawn(async move { + loop { + inner.notify.notified().await; + let Some((id, expires_at)) = inner.active_batch.read().await.as_ref().map(|b| (b.id, b.expires_at)) + else { + continue; + }; + + if expires_at > tokio::time::Instant::now() { + tokio::time::sleep_until(expires_at).await; + } + + let mut batch = inner.active_batch.write().await; + if batch.as_ref().is_some_and(|b| b.id == id) { + inner.spawn_batch(batch.take().unwrap()); + } + } + }) + .abort_handle(), + ), + } + } + + pub async fn execute(&self, document: T::Item) -> Result> { + let output = self.execute_many(std::iter::once(document)).await; + let iter = T::Mode::final_output_into_iter(output)?; + T::Mode::output_item_to_result(iter.into_iter().next().ok_or(BatcherError::MissingResult)?) + } + + #[tracing::instrument(skip_all)] + pub async fn execute_many( + &self, + documents: impl IntoIterator, + ) -> >::FinalOutput { + let waiters = self.inner.batch_inserts(documents).await; + + let mut results = >::new_output(); + + for waiter in waiters { + waiter.done.cancelled().await; + if let Err(e) = >::tracked_output(waiter.results.get(), waiter.tracker, &mut results) { + return >::output_into_final_output(Err(e)); + } + } + + >::output_into_final_output(Ok(results)) + } +} diff --git a/foundations/src/dataloader/batch_loader.rs b/foundations/src/dataloader/batch_loader.rs deleted file mode 100644 index 5a838476..00000000 --- a/foundations/src/dataloader/batch_loader.rs +++ /dev/null @@ -1,22 +0,0 @@ -use std::collections::HashSet; -use std::sync::Arc; - -use super::types::BatchState; -use super::Loader; - -pub(super) struct BatchLoader { - pub id: u64, - pub loader: Arc, - pub keys: HashSet, - pub start: tokio::time::Instant, - pub state: BatchState, -} - -impl BatchLoader { - pub async fn load(self, sephamore: Arc) { - let _ticket = sephamore.acquire().await.unwrap(); - let keys = self.keys.into_iter().collect::>(); - let result = self.loader.load(keys).await; - self.state.notify(Some(result)); - } -} diff --git a/foundations/src/dataloader/mod.rs b/foundations/src/dataloader/mod.rs deleted file mode 100644 index 23658cbf..00000000 --- a/foundations/src/dataloader/mod.rs +++ /dev/null @@ -1,353 +0,0 @@ -mod batch_loader; -mod types; -mod utils; - -use std::collections::{HashMap, HashSet}; -use std::sync::atomic::AtomicU64; -use std::sync::Arc; -use std::time::Duration; - -use self::batch_loader::BatchLoader; -pub use self::types::LoaderOutput; -use self::types::{BatchState, DataLoaderInner}; -use self::utils::new_auto_loader; -use crate::runtime; - -pub trait Loader: Send + Sync + 'static { - type Key: Eq + std::hash::Hash + Clone + Send + Sync; - type Value: Clone + Send + Sync; - type Error: Clone + Send + Sync; - - fn load(&self, key: Vec) -> impl std::future::Future> + Send; -} - -pub struct DataLoader { - batch_id: AtomicU64, - loader: Arc, - max_batch_size: usize, - inner: DataLoaderInner, - _auto_loader_abort: CancelOnDrop, - name: String, -} - -struct CancelOnDrop(tokio::task::AbortHandle); - -impl Drop for CancelOnDrop { - fn drop(&mut self) { - self.0.abort(); - } -} - -impl Default for DataLoader { - fn default() -> Self { - Self::new(std::any::type_name::(), L::default()) - } -} - -impl DataLoader { - pub fn new(name: impl ToString, loader: L) -> Self { - Self::with_concurrency_limit(name, loader, 10) - } - - pub fn with_concurrency_limit(name: impl ToString, loader: L, concurrency_limit: usize) -> Self { - let duration = Duration::from_millis(5); - - let inner = DataLoaderInner::new(concurrency_limit, duration); - - Self { - batch_id: AtomicU64::new(0), - loader: Arc::new(loader), - max_batch_size: 1000, - _auto_loader_abort: new_auto_loader(inner.clone()), - inner, - name: name.to_string(), - } - } - - pub fn set_max_batch_size(mut self, max_batch_size: usize) -> Self { - self.max_batch_size = max_batch_size; - self - } - - pub fn set_duration(self, duration: Duration) -> Self { - self.inner - .duration - .store(duration.as_nanos() as u64, std::sync::atomic::Ordering::Relaxed); - self - } - - async fn extend_loader(&self, keys: impl Iterator) -> Vec<(Vec, BatchState)> { - let mut batches = Vec::new(); - let mut current_batch = None; - - let mut active_batch = self.inner.active_batch.write().await; - - for key in keys { - if active_batch - .as_ref() - .map(|b| b.keys.len() >= self.max_batch_size) - .unwrap_or(true) - { - let batch = BatchLoader { - id: self.batch_id.fetch_add(1, std::sync::atomic::Ordering::Relaxed), - loader: self.loader.clone(), - keys: Default::default(), - start: tokio::time::Instant::now(), - state: Default::default(), - }; - - if let Some(current_batch) = current_batch.replace((Vec::new(), batch.state.clone())) { - batches.push(current_batch); - } - - if let Some(batch) = active_batch.replace(batch) { - runtime::spawn(batch.load(self.inner.semaphore.clone())); - } - - self.inner.notify.notify_waiters(); - } else if current_batch.is_none() { - current_batch = Some((Vec::new(), active_batch.as_ref().unwrap().state.clone())); - } - - let (Some(active_batch), Some((current_batch, _))) = (active_batch.as_mut(), current_batch.as_mut()) else { - unreachable!(); - }; - - active_batch.keys.insert(key.clone()); - current_batch.push(key); - } - - if let Some(current_batch) = current_batch.take() { - batches.push(current_batch); - } - - if let Some(batch) = active_batch.as_mut() { - if batch.keys.len() > self.max_batch_size { - let batch = active_batch.take().unwrap(); - runtime::spawn(batch.load(self.inner.semaphore.clone())); - } - } - - batches - } - - async fn internal_load(&self, keys: impl IntoIterator) -> LoaderOutput { - let key_set = keys.into_iter().collect::>().into_iter().collect::>(); - - if key_set.is_empty() { - return Ok(Default::default()); - } - - let batches = self.extend_loader(key_set.iter().cloned()).await; - - let (batch_keys, batches): (Vec<_>, Vec<_>) = batches.into_iter().unzip(); - - let results = futures::future::join_all(batches.iter().map(|batch| batch.wait())).await; - - results - .into_iter() - .zip(batch_keys.into_iter()) - .try_fold(HashMap::new(), |mut acc, (result, keys)| match result { - Some(Ok(result)) => { - acc.extend( - keys.into_iter() - .filter_map(|key| result.get(&key).cloned().map(|value| (key, value))), - ); - - Ok(acc) - } - Some(Err(err)) => Err(err.clone()), - None => Ok(acc), - }) - } - - #[tracing::instrument(skip(self, keys), fields(name = self.name.as_str()))] - pub async fn load_many(&self, keys: impl IntoIterator) -> LoaderOutput { - self.internal_load(keys).await - } - - #[tracing::instrument(skip(self, key), fields(name = self.name.as_str()))] - pub async fn load(&self, key: L::Key) -> Result, L::Error> { - Ok(self.internal_load(std::iter::once(key.clone())).await?.remove(&key)) - } -} - -#[cfg(test)] -mod tests { - use std::collections::hash_map::RandomState; - use std::collections::HashMap; - - use crate::dataloader::LoaderOutput; - - type DynBoxLoader = Box) -> HashMap + Sync + Send>; - - struct LoaderTest { - results: DynBoxLoader, - } - - impl crate::dataloader::Loader for LoaderTest { - type Error = (); - type Key = u64; - type Value = u64; - - async fn load(&self, keys: Vec) -> LoaderOutput { - tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; - Ok((self.results)(keys)) - } - } - - #[tokio::test] - async fn test_data_loader() { - let run_count = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)); - - let loader = LoaderTest { - results: Box::new(move |keys| { - let mut results = HashMap::new(); - - assert!(run_count.fetch_add(1, std::sync::atomic::Ordering::SeqCst) == 0); - - assert!(keys.len() <= 1000); - - for key in keys { - assert!(!results.contains_key(&key)); - - results.insert(key, key * 2); - } - - results - }), - }; - - let dataloader = crate::dataloader::DataLoader::new("test", loader); - - let futures = (0..250) - .map(|i| dataloader.load(i as u64)) - .chain((0..250).map(|i| dataloader.load(i as u64))); - - let results = futures::future::join_all(futures).await; - - let expected = (0..250) - .map(|i| Ok(Some(i * 2))) - .chain((0..250).map(|i| Ok(Some(i * 2)))) - .collect::>(); - - assert_eq!(results, expected); - } - - #[tokio::test] - async fn test_data_loader_larger() { - let loader = LoaderTest { - results: Box::new(move |keys| { - let mut results = HashMap::new(); - - assert!(keys.len() <= 1000); - - for key in keys { - assert!(!results.contains_key(&key)); - - results.insert(key, key * 2); - } - - results - }), - }; - - let dataloader = crate::dataloader::DataLoader::new("test", loader); - - const LIMIT: usize = 10_000; - - let results = futures::future::join_all((0..LIMIT).map(|i| dataloader.load(i as u64))).await; - - let expected = (0..LIMIT).map(|i| Ok(Some(i as u64 * 2))).collect::>(); - - assert_eq!(results, expected); - } - - #[tokio::test] - async fn test_data_loader_change_batch_size() { - let loader = LoaderTest { - results: Box::new(move |keys| { - let mut results = HashMap::new(); - - assert!(keys.len() <= 3000); - - for key in keys { - assert!(!results.contains_key(&key)); - - results.insert(key, key * 2); - } - - results - }), - }; - - let dataloader = crate::dataloader::DataLoader::new("test", loader).set_max_batch_size(3000); - - let futures = (0..5000).map(|i| dataloader.load(i as u64)); - - let results = futures::future::join_all(futures).await; - - let expected = (0..5000).map(|i| Ok(Some(i * 2))).collect::>(); - - assert_eq!(results, expected); - } - - #[tokio::test] - async fn test_data_loader_change_duration() { - let loader = LoaderTest { - results: Box::new(move |keys| { - let mut results = HashMap::new(); - - assert!(keys.len() <= 1000); - - for key in keys { - assert!(!results.contains_key(&key)); - - results.insert(key, key * 2); - } - - results - }), - }; - - let dataloader = - crate::dataloader::DataLoader::new("test", loader).set_duration(tokio::time::Duration::from_millis(100)); - - let futures = (0..250) - .map(|i| dataloader.load(i as u64)) - .chain((0..250).map(|i| dataloader.load(i as u64))); - - let results = futures::future::join_all(futures).await; - - let expected = (0..250) - .map(|i| Ok(Some(i * 2))) - .chain((0..250).map(|i| Ok(Some(i * 2)))) - .collect::>(); - - assert_eq!(results, expected); - } - - #[tokio::test] - async fn test_data_loader_value_deduplication() { - let run_count = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)); - - let loader = LoaderTest { - results: Box::new({ - let run_count = run_count.clone(); - move |keys| { - run_count.fetch_add(1, std::sync::atomic::Ordering::SeqCst); - keys.iter().map(|&key| (key, key * 2)).collect() - } - }), - }; - - let dataloader = crate::dataloader::DataLoader::new("test", loader); - - let futures = vec![dataloader.load(5), dataloader.load(5), dataloader.load(5)]; - - let results: Vec<_> = futures::future::join_all(futures).await; - - assert_eq!(results, vec![Ok(Some(10)), Ok(Some(10)), Ok(Some(10))]); - assert_eq!(run_count.load(std::sync::atomic::Ordering::SeqCst), 1); // Ensure the loader was only called once - } -} diff --git a/foundations/src/dataloader/types.rs b/foundations/src/dataloader/types.rs deleted file mode 100644 index bec057c0..00000000 --- a/foundations/src/dataloader/types.rs +++ /dev/null @@ -1,101 +0,0 @@ -use std::collections::HashMap; -use std::sync::atomic::AtomicU64; -use std::sync::{Arc, OnceLock}; - -use tokio::sync::Notify; -use tokio::time::Instant; - -use super::batch_loader::BatchLoader; -use super::Loader; - -#[allow(type_alias_bounds)] -pub type LoaderOutput = Result, L::Error>; - -pub struct DataLoaderInternal { - pub active_batch: tokio::sync::RwLock>>, - pub notify: tokio::sync::Notify, - pub semaphore: Arc, - pub duration: AtomicU64, -} - -pub(super) struct DataLoaderInner(Arc>); - -impl Clone for DataLoaderInner { - fn clone(&self) -> Self { - Self(self.0.clone()) - } -} - -impl DataLoaderInner { - pub fn new(concurrency: usize, duration: std::time::Duration) -> Self { - Self(Arc::new(DataLoaderInternal { - active_batch: Default::default(), - notify: Default::default(), - semaphore: Arc::new(tokio::sync::Semaphore::new(concurrency)), - duration: AtomicU64::new(duration.as_nanos() as u64), - })) - } - - pub async fn load_active_batch(&self) -> Option<(u64, Instant)> { - self.active_batch.read().await.as_ref().map(|b| (b.id, b.start)) - } - - pub fn duration(&self) -> std::time::Duration { - std::time::Duration::from_nanos(self.0.duration.load(std::sync::atomic::Ordering::Relaxed)) - } -} - -impl std::ops::Deref for DataLoaderInner { - type Target = DataLoaderInternal; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -struct InternalBatchState { - notify: Notify, - result: OnceLock>>, -} - -pub(super) struct BatchState(Arc>); - -impl Default for BatchState { - fn default() -> Self { - Self(Arc::new(InternalBatchState { - notify: Notify::new(), - result: OnceLock::new(), - })) - } -} - -impl Clone for BatchState { - fn clone(&self) -> Self { - Self(self.0.clone()) - } -} - -impl BatchState { - pub fn notify(&self, result: Option>) -> bool { - if self.0.result.set(result).is_ok() { - self.0.notify.notify_waiters(); - true - } else { - false - } - } - - pub async fn wait(&self) -> Option<&LoaderOutput> { - let notify = self.0.notify.notified(); - if let Some(result) = self.0.result.get() { - return result.as_ref(); - } - - notify.await; - - match self.0.result.get() { - Some(result) => result.as_ref(), - None => None, - } - } -} diff --git a/foundations/src/dataloader/utils.rs b/foundations/src/dataloader/utils.rs deleted file mode 100644 index 34761b7a..00000000 --- a/foundations/src/dataloader/utils.rs +++ /dev/null @@ -1,33 +0,0 @@ -use super::types::DataLoaderInner; -use super::{CancelOnDrop, Loader}; -use crate::runtime; - -pub(super) fn new_auto_loader(inner: DataLoaderInner) -> CancelOnDrop { - CancelOnDrop( - runtime::spawn(async move { - loop { - let notify = inner.notify.notified(); - let Some((batch_id, start)) = inner.load_active_batch().await else { - notify.await; - continue; - }; - - drop(notify); - - let duration = inner.duration(); - if start.elapsed() < duration { - tokio::time::sleep_until(start + duration).await; - } - - let mut active_batch = inner.active_batch.write().await; - - if active_batch.as_ref().map(|b| b.id != batch_id).unwrap_or(true) { - continue; - } - - runtime::spawn(active_batch.take().unwrap().load(inner.semaphore.clone())); - } - }) - .abort_handle(), - ) -} diff --git a/foundations/src/lib.rs b/foundations/src/lib.rs index f06b3904..2b0b6f88 100644 --- a/foundations/src/lib.rs +++ b/foundations/src/lib.rs @@ -36,8 +36,8 @@ pub mod signal; #[cfg(feature = "context")] pub mod context; -#[cfg(feature = "dataloader")] -pub mod dataloader; +#[cfg(feature = "batcher")] +pub mod batcher; #[cfg(feature = "http")] pub mod http; diff --git a/foundations/src/settings/cli.rs b/foundations/src/settings/cli.rs index f9e505bf..425588f0 100644 --- a/foundations/src/settings/cli.rs +++ b/foundations/src/settings/cli.rs @@ -83,8 +83,7 @@ impl Cli { } }; - let incoming = - toml::from_str(&contents).with_context(|| format!("Error parsing configuration file: {file}"))?; + let incoming = toml::from_str(&contents).with_context(|| format!("Error parsing configuration file: {file}"))?; Ok(Some(incoming)) } diff --git a/foundations/src/telemetry/env_filter/env/field.rs b/foundations/src/telemetry/env_filter/env/field.rs index 6d7b51f1..1ec2a400 100644 --- a/foundations/src/telemetry/env_filter/env/field.rs +++ b/foundations/src/telemetry/env_filter/env/field.rs @@ -224,7 +224,7 @@ impl ValueMatch { /// This returns an error if the string didn't contain a valid `bool`, /// `u64`, `i64`, or `f64` literal, and couldn't be parsed as a regular /// expression. - fn parse_regex(s: &str) -> Result { + fn parse_regex(s: &str) -> Result { s.parse::() .map(ValueMatch::Bool) .or_else(|_| s.parse::().map(ValueMatch::U64)) @@ -266,7 +266,7 @@ impl fmt::Display for ValueMatch { // === impl MatchPattern === impl FromStr for MatchPattern { - type Err = matchers::Error; + type Err = matchers::BuildError; fn from_str(s: &str) -> Result { let matcher = s.parse::()?; diff --git a/image-processor/src/drive/mod.rs b/image-processor/src/drive/mod.rs index 847b670f..c953b461 100644 --- a/image-processor/src/drive/mod.rs +++ b/image-processor/src/drive/mod.rs @@ -99,7 +99,7 @@ impl Drive for AnyDrive { } async fn write(&self, path: &str, data: Bytes, options: Option) -> Result<(), DriveError> { - tracing::info!("writing to drive: {}", path); + tracing::info!("writing to drive: {}", path); match self { AnyDrive::Local(drive) => drive.write(path, data, options).await, AnyDrive::S3(drive) => drive.write(path, data, options).await, diff --git a/image-processor/src/worker/process/blocking.rs b/image-processor/src/worker/process/blocking.rs index 73b0537b..ccb8dee5 100644 --- a/image-processor/src/worker/process/blocking.rs +++ b/image-processor/src/worker/process/blocking.rs @@ -59,7 +59,11 @@ impl Drop for CancelToken { } } -pub async fn spawn(task: Task, input: Bytes, permit: Arc) -> Result { +pub async fn spawn( + task: Task, + input: Bytes, + permit: Arc, +) -> Result<(DecoderInfo, JobOutput), JobError> { let cancel_token = CancelToken::new(); let _cancel_guard = cancel_token.clone(); @@ -82,7 +86,9 @@ pub async fn spawn(task: Task, input: Bytes, permit: Arc) } } - task.finish() + let info = task.decoder_info.clone(); + + task.finish().map(|out| (info, out)) }) .await? } diff --git a/image-processor/src/worker/process/decoder/ffmpeg.rs b/image-processor/src/worker/process/decoder/ffmpeg.rs index 458aace8..3928b492 100644 --- a/image-processor/src/worker/process/decoder/ffmpeg.rs +++ b/image-processor/src/worker/process/decoder/ffmpeg.rs @@ -98,6 +98,7 @@ impl<'data> FfmpegDecoder<'data> { )?; let info = DecoderInfo { + decoder: DecoderFrontend::Ffmpeg, width: decoder.width() as usize, height: decoder.height() as usize, frame_count: input_stream_frames as usize, diff --git a/image-processor/src/worker/process/decoder/libavif.rs b/image-processor/src/worker/process/decoder/libavif.rs index d7d4614b..d878d0c2 100644 --- a/image-processor/src/worker/process/decoder/libavif.rs +++ b/image-processor/src/worker/process/decoder/libavif.rs @@ -55,6 +55,7 @@ impl<'data> AvifDecoder<'data> { let image = AvifRgbImage::new(decoder.as_ref()); let info = DecoderInfo { + decoder: DecoderFrontend::LibAvif, width: image.width as usize, height: image.height as usize, loop_count: if decoder.as_ref().repetitionCount <= 0 { diff --git a/image-processor/src/worker/process/decoder/libwebp.rs b/image-processor/src/worker/process/decoder/libwebp.rs index 25298382..bfeecf62 100644 --- a/image-processor/src/worker/process/decoder/libwebp.rs +++ b/image-processor/src/worker/process/decoder/libwebp.rs @@ -66,6 +66,7 @@ impl<'data> WebpDecoder<'data> { Ok(Self { info: DecoderInfo { + decoder: DecoderFrontend::LibWebp, width: info.canvas_width as _, height: info.canvas_height as _, loop_count: match info.loop_count { diff --git a/image-processor/src/worker/process/decoder/mod.rs b/image-processor/src/worker/process/decoder/mod.rs index ee7f22af..6769083a 100644 --- a/image-processor/src/worker/process/decoder/mod.rs +++ b/image-processor/src/worker/process/decoder/mod.rs @@ -101,6 +101,7 @@ pub trait Decoder { #[derive(Debug, Clone, Copy)] pub struct DecoderInfo { + pub decoder: DecoderFrontend, pub width: usize, pub height: usize, pub loop_count: LoopCount, diff --git a/image-processor/src/worker/process/mod.rs b/image-processor/src/worker/process/mod.rs index cc686bec..cce0e7d6 100644 --- a/image-processor/src/worker/process/mod.rs +++ b/image-processor/src/worker/process/mod.rs @@ -183,10 +183,13 @@ impl ProcessJob { let job = self.job.clone(); - let JobOutput { - output: output_results, - input: input_metadata, - } = blocking::spawn(job.task.clone(), input, self.permit.clone()).await?; + let ( + decoder_info, + JobOutput { + output: output_results, + input: input_metadata, + }, + ) = blocking::spawn(job.task.clone(), input, self.permit.clone()).await?; let is_animated = output_results.iter().any(|r| r.frame_count > 1); diff --git a/image-processor/src/worker/process/resize.rs b/image-processor/src/worker/process/resize.rs index b95e1682..981cb23f 100644 --- a/image-processor/src/worker/process/resize.rs +++ b/image-processor/src/worker/process/resize.rs @@ -1,5 +1,5 @@ -use fast_image_resize::{self as fr, images::{CroppedImage, CroppedImageMut}, ResizeOptions}; - +use fast_image_resize::images::{CroppedImage, CroppedImageMut}; +use fast_image_resize::{self as fr, ResizeOptions}; use rgb::ComponentBytes; use scuffle_image_processor_proto::{output, scaling, Output, ResizeAlgorithm, ResizeMethod}; @@ -86,7 +86,12 @@ struct CropBox { impl CropBox { pub fn new(left: u32, top: u32, width: u32, height: u32) -> Self { - Self { left, top, width, height } + Self { + left, + top, + width, + height, + } } } @@ -340,7 +345,13 @@ impl ImageResizer { )?; let source_crop = previous_image.crop(); - let source_view = CroppedImage::new(&*previous_image, source_crop.left, source_crop.top, source_crop.width, source_crop.height)?; + let source_view = CroppedImage::new( + &*previous_image, + source_crop.left, + source_crop.top, + source_crop.width, + source_crop.height, + )?; let target_crop = if resize_dims != output_dims { resize_method_to_crop_dims(self.resize_method, output_dims, resize_dims)? @@ -352,7 +363,13 @@ impl ImageResizer { height: resize_dims.height as u32, } }; - let mut target_view = CroppedImageMut::new(&mut target_image, target_crop.left, target_crop.top, target_crop.width, target_crop.height)?; + let mut target_view = CroppedImageMut::new( + &mut target_image, + target_crop.left, + target_crop.top, + target_crop.width, + target_crop.height, + )?; self.resizer.resize(&source_view, &mut target_view, Some(&resize_options))?; @@ -411,30 +428,22 @@ fn resize_method_to_crop_dims( match resize_method { ResizeMethod::Fit => Err(ResizeError::Internal("fit should never be called here")), ResizeMethod::Stretch => Err(ResizeError::Internal("stretch should never be called here")), - ResizeMethod::PadLeft => { - check( - target_dims.width != padded_dims.width, - "pad left should only be called for width padding", - ) - } - ResizeMethod::PadRight => { - check( - target_dims.height != padded_dims.height, - "pad right should only be called for height padding", - ) - } - ResizeMethod::PadBottom => { - check( - target_dims.width != padded_dims.width, - "pad bottom should only be called for height padding", - ) - } - ResizeMethod::PadTop => { - check( - target_dims.width != padded_dims.width, - "pad top should only be called for height padding", - ) - } + ResizeMethod::PadLeft => check( + target_dims.width != padded_dims.width, + "pad left should only be called for width padding", + ), + ResizeMethod::PadRight => check( + target_dims.height != padded_dims.height, + "pad right should only be called for height padding", + ), + ResizeMethod::PadBottom => check( + target_dims.width != padded_dims.width, + "pad bottom should only be called for height padding", + ), + ResizeMethod::PadTop => check( + target_dims.width != padded_dims.width, + "pad top should only be called for height padding", + ), _ => Ok(()), }?;