diff --git a/Cargo.lock b/Cargo.lock index 277a335e04bd..bcd6211b7e2c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "addr2line" @@ -1913,8 +1913,8 @@ dependencies = [ "async-stream", "async-trait", "futures", + "leaky-bucket", "ollama-api-bindings", - "ratelimit", "reqwest", "reqwest-eventsource", "serde", @@ -2403,6 +2403,17 @@ dependencies = [ "spin 0.5.2", ] +[[package]] +name = "leaky-bucket" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0a396bb213c2d09ed6c5495fd082c991b6ab39c9daf4fff59e6727f85c73e4c5" +dependencies = [ + "parking_lot", + "pin-project-lite", + "tokio", +] + [[package]] name = "lettre" version = "0.11.7" diff --git a/crates/http-api-bindings/Cargo.toml b/crates/http-api-bindings/Cargo.toml index 7037c5958b46..03dd3861241e 100644 --- a/crates/http-api-bindings/Cargo.toml +++ b/crates/http-api-bindings/Cargo.toml @@ -18,9 +18,9 @@ tabby-common = { path = "../tabby-common" } tabby-inference = { path = "../tabby-inference" } ollama-api-bindings = { path = "../ollama-api-bindings" } async-openai.workspace = true -ratelimit.workspace = true tokio.workspace = true tracing.workspace = true +leaky-bucket = "1.1.2" [dev-dependencies] tokio = { workspace = true, features = ["rt", "macros"] } diff --git a/crates/http-api-bindings/src/rate_limit.rs b/crates/http-api-bindings/src/rate_limit.rs index 85c9f388b6a5..167adb9a64aa 100644 --- a/crates/http-api-bindings/src/rate_limit.rs +++ b/crates/http-api-bindings/src/rate_limit.rs @@ -1,28 +1,27 @@ -use std::time::Duration; - use async_openai::{ - error::{ApiError, OpenAIError}, + error::OpenAIError, types::{ ChatCompletionResponseStream, CreateChatCompletionRequest, CreateChatCompletionResponse, }, }; use async_trait::async_trait; use futures::stream::BoxStream; -use ratelimit::Ratelimiter; +use leaky_bucket::RateLimiter; use tabby_inference::{ChatCompletionStream, CompletionOptions, CompletionStream, Embedding}; -use tracing::warn; - -fn new_rate_limiter(rpm: u64) -> Ratelimiter { - Ratelimiter::builder(rpm/60, Duration::from_secs(1)) - .max_tokens(rpm) - .initial_available(rpm) +use tokio::time::Duration; + +fn new_rate_limiter(rpm: u64) -> RateLimiter { + let rps = (rpm as f64 / 60.0).ceil() as usize; + RateLimiter::builder() + .initial(rps) + .interval(Duration::from_secs(1)) + .refill(rps) .build() - .expect("Failed to create RateLimiter, please check the HttpModelConfig.rate_limit configuration") } pub struct RateLimitedEmbedding { embedding: Box, - rate_limiter: Ratelimiter, + rate_limiter: RateLimiter, } pub fn new_embedding(embedding: Box, request_per_minute: u64) -> impl Embedding { @@ -35,22 +34,14 @@ pub fn new_embedding(embedding: Box, request_per_minute: u64) -> #[async_trait] impl Embedding for RateLimitedEmbedding { async fn embed(&self, prompt: &str) -> anyhow::Result> { - for _ in 0..60 { - if let Err(sleep) = self.rate_limiter.try_wait() { - tokio::time::sleep(sleep).await; - continue; - } - - return self.embedding.embed(prompt).await; - } - - anyhow::bail!("Failed to acquire request quota for embedding"); + self.rate_limiter.acquire(1).await; + self.embedding.embed(prompt).await } } pub struct RateLimitedCompletion { completion: Box, - rate_limiter: Ratelimiter, + rate_limiter: RateLimiter, } pub fn new_completion( @@ -66,23 +57,14 @@ pub fn new_completion( #[async_trait] impl CompletionStream for RateLimitedCompletion { async fn generate(&self, prompt: &str, options: CompletionOptions) -> BoxStream { - for _ in 0..60 { - if let Err(sleep) = self.rate_limiter.try_wait() { - tokio::time::sleep(sleep).await; - continue; - } - - return self.completion.generate(prompt, options).await; - } - - warn!("Failed to acquire request quota for completion"); - Box::pin(futures::stream::empty()) + self.rate_limiter.acquire(1).await; + self.completion.generate(prompt, options).await } } pub struct RateLimitedChatStream { completion: Box, - rate_limiter: Ratelimiter, + rate_limiter: RateLimiter, } pub fn new_chat( @@ -101,41 +83,15 @@ impl ChatCompletionStream for RateLimitedChatStream { &self, request: CreateChatCompletionRequest, ) -> Result { - for _ in 0..60 { - if let Err(sleep) = self.rate_limiter.try_wait() { - tokio::time::sleep(sleep).await; - continue; - } - - return self.completion.chat(request).await; - } - - Err(OpenAIError::ApiError(ApiError { - message: "Failed to acquire request quota for chat".to_owned(), - r#type: None, - param: None, - code: None, - })) + self.rate_limiter.acquire(1).await; + self.completion.chat(request).await } async fn chat_stream( &self, request: CreateChatCompletionRequest, ) -> Result { - for _ in 0..60 { - if let Err(sleep) = self.rate_limiter.try_wait() { - tokio::time::sleep(sleep).await; - continue; - } - - return self.completion.chat_stream(request).await; - } - - Err(OpenAIError::ApiError(ApiError { - message: "Failed to acquire request quota for chat stream".to_owned(), - r#type: None, - param: None, - code: None, - })) + self.rate_limiter.acquire(1).await; + self.completion.chat_stream(request).await } } diff --git a/crates/tabby-index/src/indexer.rs b/crates/tabby-index/src/indexer.rs index 7831b7ef882d..0735aa454907 100644 --- a/crates/tabby-index/src/indexer.rs +++ b/crates/tabby-index/src/indexer.rs @@ -84,7 +84,7 @@ impl TantivyDocBuilder { yield tokio::spawn(async move { match chunk_doc.await { Ok(Ok(doc)) => { - Some(doc) + Some(doc) } Ok(Err(e)) => { warn!("Failed to build chunk for document '{}': {}", doc_id, e);