From df3e195ec51aa17d7c3814f232b97f510a065c62 Mon Sep 17 00:00:00 2001 From: "zhuxiujia@qq.com" Date: Thu, 14 Dec 2023 19:29:59 +0800 Subject: [PATCH] bump v0.2.1 --- Cargo.toml | 2 +- src/stub.rs | 63 +++++++++++++++++++++++++++++------------------------ 2 files changed, 36 insertions(+), 29 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 131164d..8cda17c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,7 +5,7 @@ members = [ [package] name = "drpc" -version = "0.2.0" +version = "0.2.1" edition = "2021" description = "Rust High Performance Async RPC Framework" readme = "Readme.md" diff --git a/src/stub.rs b/src/stub.rs index 5d38be7..864d39d 100644 --- a/src/stub.rs +++ b/src/stub.rs @@ -91,43 +91,50 @@ impl ClientStub { data: e.to_string().into_bytes(), }; } - let mut time_start = None; - if self.timeout.is_some() { - time_start = Some(std::time::Instant::now()); - } - loop { - // deserialize the rsp - let rsp_frame = Frame::decode_from(&mut stream) - .await - .map_err(|e| Error::from(e)); - if rsp_frame.is_err() { + let timeout = self.get_timeout(); + let v = tokio::time::timeout(timeout, async { + loop { + // deserialize the rsp + let rsp_frame = Frame::decode_from(&mut stream) + .await + .map_err(|e| Error::from(e)); + if rsp_frame.is_err() { + return Frame { + id, + ok: 0, + data: rsp_frame.err().unwrap().to_string().into_bytes(), + }; + } + let rsp_frame = rsp_frame.unwrap(); + // discard the rsp that is is not belong to us + if rsp_frame.id == id { + debug!("get response id = {}", id); + return rsp_frame; + } + } + }) + .await; + match v { + Ok(v) => v, + Err(_e) => { return Frame { id, ok: 0, - data: rsp_frame.err().unwrap().to_string().into_bytes(), + data: "rpc call timeout!".to_string().into_bytes(), }; } - let rsp_frame = rsp_frame.unwrap(); - // discard the rsp that is is not belong to us - if rsp_frame.id == id { - debug!("get response id = {}", id); - return rsp_frame; - } - if let Some(timeout) = self.timeout { - if let Some(time_start) = &time_start { - if time_start.elapsed() > timeout { - return Frame { - id, - ok: 0, - data: "rpc call timeout!".to_string().into_bytes(), - }; - } - } - } } }) .await } + + pub fn get_timeout(&self) -> Duration { + if let Some(t) = &self.timeout { + t.clone() + } else { + Duration::from_secs(30) + } + } } /// Receives the message sent by the client, unpacks the message, and invokes the local method.