Skip to content

Commit

Permalink
refactor: use leaky bucket rate limiter (#3567)
Browse files Browse the repository at this point in the history
* refactor: use leaky bucket rate limiter

Signed-off-by: Wei Zhang <[email protected]>

* [autofix.ci] apply automated fixes

* chore: use rps to do rate limit

Signed-off-by: Wei Zhang <[email protected]>

---------

Signed-off-by: Wei Zhang <[email protected]>
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
  • Loading branch information
zwpaper and autofix-ci[bot] authored Dec 15, 2024
1 parent 011f04b commit 7dad093
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 69 deletions.
15 changes: 13 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/http-api-bindings/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ tabby-common = { path = "../tabby-common" }
tabby-inference = { path = "../tabby-inference" }
ollama-api-bindings = { path = "../ollama-api-bindings" }
async-openai.workspace = true
ratelimit.workspace = true
tokio.workspace = true
tracing.workspace = true
leaky-bucket = "1.1.2"

[dev-dependencies]
tokio = { workspace = true, features = ["rt", "macros"] }
86 changes: 21 additions & 65 deletions crates/http-api-bindings/src/rate_limit.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,27 @@
use std::time::Duration;

use async_openai::{
error::{ApiError, OpenAIError},
error::OpenAIError,
types::{
ChatCompletionResponseStream, CreateChatCompletionRequest, CreateChatCompletionResponse,
},
};
use async_trait::async_trait;
use futures::stream::BoxStream;
use ratelimit::Ratelimiter;
use leaky_bucket::RateLimiter;
use tabby_inference::{ChatCompletionStream, CompletionOptions, CompletionStream, Embedding};
use tracing::warn;

fn new_rate_limiter(rpm: u64) -> Ratelimiter {
Ratelimiter::builder(rpm/60, Duration::from_secs(1))
.max_tokens(rpm)
.initial_available(rpm)
use tokio::time::Duration;

fn new_rate_limiter(rpm: u64) -> RateLimiter {
let rps = (rpm as f64 / 60.0).ceil() as usize;
RateLimiter::builder()
.initial(rps)
.interval(Duration::from_secs(1))
.refill(rps)
.build()
.expect("Failed to create RateLimiter, please check the HttpModelConfig.rate_limit configuration")
}

pub struct RateLimitedEmbedding {
embedding: Box<dyn Embedding>,
rate_limiter: Ratelimiter,
rate_limiter: RateLimiter,
}

pub fn new_embedding(embedding: Box<dyn Embedding>, request_per_minute: u64) -> impl Embedding {
Expand All @@ -35,22 +34,14 @@ pub fn new_embedding(embedding: Box<dyn Embedding>, request_per_minute: u64) ->
#[async_trait]
impl Embedding for RateLimitedEmbedding {
async fn embed(&self, prompt: &str) -> anyhow::Result<Vec<f32>> {
for _ in 0..60 {
if let Err(sleep) = self.rate_limiter.try_wait() {
tokio::time::sleep(sleep).await;
continue;
}

return self.embedding.embed(prompt).await;
}

anyhow::bail!("Failed to acquire request quota for embedding");
self.rate_limiter.acquire(1).await;
self.embedding.embed(prompt).await
}
}

pub struct RateLimitedCompletion {
completion: Box<dyn CompletionStream>,
rate_limiter: Ratelimiter,
rate_limiter: RateLimiter,
}

pub fn new_completion(
Expand All @@ -66,23 +57,14 @@ pub fn new_completion(
#[async_trait]
impl CompletionStream for RateLimitedCompletion {
async fn generate(&self, prompt: &str, options: CompletionOptions) -> BoxStream<String> {
for _ in 0..60 {
if let Err(sleep) = self.rate_limiter.try_wait() {
tokio::time::sleep(sleep).await;
continue;
}

return self.completion.generate(prompt, options).await;
}

warn!("Failed to acquire request quota for completion");
Box::pin(futures::stream::empty())
self.rate_limiter.acquire(1).await;
self.completion.generate(prompt, options).await
}
}

pub struct RateLimitedChatStream {
completion: Box<dyn ChatCompletionStream>,
rate_limiter: Ratelimiter,
rate_limiter: RateLimiter,
}

pub fn new_chat(
Expand All @@ -101,41 +83,15 @@ impl ChatCompletionStream for RateLimitedChatStream {
&self,
request: CreateChatCompletionRequest,
) -> Result<CreateChatCompletionResponse, OpenAIError> {
for _ in 0..60 {
if let Err(sleep) = self.rate_limiter.try_wait() {
tokio::time::sleep(sleep).await;
continue;
}

return self.completion.chat(request).await;
}

Err(OpenAIError::ApiError(ApiError {
message: "Failed to acquire request quota for chat".to_owned(),
r#type: None,
param: None,
code: None,
}))
self.rate_limiter.acquire(1).await;
self.completion.chat(request).await
}

async fn chat_stream(
&self,
request: CreateChatCompletionRequest,
) -> Result<ChatCompletionResponseStream, OpenAIError> {
for _ in 0..60 {
if let Err(sleep) = self.rate_limiter.try_wait() {
tokio::time::sleep(sleep).await;
continue;
}

return self.completion.chat_stream(request).await;
}

Err(OpenAIError::ApiError(ApiError {
message: "Failed to acquire request quota for chat stream".to_owned(),
r#type: None,
param: None,
code: None,
}))
self.rate_limiter.acquire(1).await;
self.completion.chat_stream(request).await
}
}
2 changes: 1 addition & 1 deletion crates/tabby-index/src/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ impl<T: ToIndexId> TantivyDocBuilder<T> {
yield tokio::spawn(async move {
match chunk_doc.await {
Ok(Ok(doc)) => {
Some(doc)
Some(doc)
}
Ok(Err(e)) => {
warn!("Failed to build chunk for document '{}': {}", doc_id, e);
Expand Down

0 comments on commit 7dad093

Please sign in to comment.