Skip to content

Commit

Permalink
bump v0.2.1
Browse files Browse the repository at this point in the history
  • Loading branch information
zhuxiujia committed Dec 14, 2023
1 parent 77ff0e3 commit df3e195
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 29 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ members = [

[package]
name = "drpc"
version = "0.2.0"
version = "0.2.1"
edition = "2021"
description = "Rust High Performance Async RPC Framework"
readme = "Readme.md"
Expand Down
63 changes: 35 additions & 28 deletions src/stub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,43 +91,50 @@ impl ClientStub {
data: e.to_string().into_bytes(),
};
}
let mut time_start = None;
if self.timeout.is_some() {
time_start = Some(std::time::Instant::now());
}
loop {
// deserialize the rsp
let rsp_frame = Frame::decode_from(&mut stream)
.await
.map_err(|e| Error::from(e));
if rsp_frame.is_err() {
let timeout = self.get_timeout();
let v = tokio::time::timeout(timeout, async {
loop {
// deserialize the rsp
let rsp_frame = Frame::decode_from(&mut stream)
.await
.map_err(|e| Error::from(e));
if rsp_frame.is_err() {
return Frame {
id,
ok: 0,
data: rsp_frame.err().unwrap().to_string().into_bytes(),
};
}
let rsp_frame = rsp_frame.unwrap();
// discard the rsp that is is not belong to us
if rsp_frame.id == id {
debug!("get response id = {}", id);
return rsp_frame;
}
}
})
.await;
match v {
Ok(v) => v,
Err(_e) => {
return Frame {
id,
ok: 0,
data: rsp_frame.err().unwrap().to_string().into_bytes(),
data: "rpc call timeout!".to_string().into_bytes(),
};
}
let rsp_frame = rsp_frame.unwrap();
// discard the rsp that is is not belong to us
if rsp_frame.id == id {
debug!("get response id = {}", id);
return rsp_frame;
}
if let Some(timeout) = self.timeout {
if let Some(time_start) = &time_start {
if time_start.elapsed() > timeout {
return Frame {
id,
ok: 0,
data: "rpc call timeout!".to_string().into_bytes(),
};
}
}
}
}
})
.await
}

pub fn get_timeout(&self) -> Duration {
if let Some(t) = &self.timeout {
t.clone()
} else {
Duration::from_secs(30)
}
}
}

/// Receives the message sent by the client, unpacks the message, and invokes the local method.
Expand Down

0 comments on commit df3e195

Please sign in to comment.