Skip to content

Commit

Permalink
Add Send
Browse files Browse the repository at this point in the history
  • Loading branch information
Philip-NLnetLabs committed Aug 10, 2023
1 parent a647108 commit 2119f2d
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 25 deletions.
4 changes: 2 additions & 2 deletions src/net/client/multi_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,7 @@ impl<Octs: Clone + Composer + Debug + OctetsBuilder + Send + 'static>
query_msg: &'a mut MessageBuilder<
StaticCompressor<StreamTarget<Octs>>,
>,
) -> Pin<Box<dyn Future<Output = Result<Query<Octs>, Error>> + '_>> {
) -> Pin<Box<dyn Future<Output = Result<Query<Octs>, Error>> + Send + '_>> {
return Box::pin(self.query_impl(query_msg));
}
}
Expand Down Expand Up @@ -680,7 +680,7 @@ impl<
{
fn get_result(
&mut self,
) -> Pin<Box<dyn Future<Output = Result<Message<Bytes>, Error>> + '_>>
) -> Pin<Box<dyn Future<Output = Result<Message<Bytes>, Error>> + Send + '_>>
{
Box::pin(self.get_result_impl())
}
Expand Down
8 changes: 4 additions & 4 deletions src/net/client/octet_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -769,7 +769,7 @@ impl<Octs: AsMut<[u8]> + Clone + Composer + Debug + OctetsBuilder>
}

impl<
Octs: AsMut<[u8]> + AsRef<[u8]> + Clone + Composer + Debug + OctetsBuilder,
Octs: AsMut<[u8]> + AsRef<[u8]> + Clone + Composer + Debug + OctetsBuilder + Send,
> Connection<Octs>
{
/// Constructor for [Connection].
Expand Down Expand Up @@ -822,15 +822,15 @@ impl<
}

impl<
Octs: AsMut<[u8]> + AsRef<[u8]> + Clone + Composer + Debug + OctetsBuilder,
Octs: AsMut<[u8]> + AsRef<[u8]> + Clone + Composer + Debug + OctetsBuilder + Send,
> QueryMessage<Query, Octs> for Connection<Octs>
{
fn query<'a>(
&'a self,
query_msg: &'a mut MessageBuilder<
StaticCompressor<StreamTarget<Octs>>,
>,
) -> Pin<Box<dyn Future<Output = Result<Query, Error>> + '_>> {
) -> Pin<Box<dyn Future<Output = Result<Query, Error>> + Send + '_>> {
return Box::pin(self.query_impl(query_msg));
}
}
Expand Down Expand Up @@ -891,7 +891,7 @@ impl Query {
impl GetResult for Query {
fn get_result(
&mut self,
) -> Pin<Box<dyn Future<Output = Result<Message<Bytes>, Error>> + '_>>
) -> Pin<Box<dyn Future<Output = Result<Message<Bytes>, Error>> + Send + '_>>
{
Box::pin(self.get_result_impl())
}
Expand Down
4 changes: 2 additions & 2 deletions src/net/client/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub trait QueryMessage<GR: GetResult, Octs> {
query_msg: &'a mut MessageBuilder<
StaticCompressor<StreamTarget<Octs>>,
>,
) -> Pin<Box<dyn Future<Output = Result<GR, Error>> + '_>>;
) -> Pin<Box<dyn Future<Output = Result<GR, Error>> + Send + '_>>;
}

/// Trait for getting the result of a DNS query.
Expand All @@ -32,5 +32,5 @@ pub trait GetResult {
/// This function is intended to be cancel safe.
fn get_result(
&mut self,
) -> Pin<Box<dyn Future<Output = Result<Message<Bytes>, Error>> + '_>>;
) -> Pin<Box<dyn Future<Output = Result<Message<Bytes>, Error>> + Send + '_>>;
}
22 changes: 9 additions & 13 deletions src/net/client/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ impl Connection {
}

/// Start a new DNS query.
async fn query_impl<Octs: AsRef<[u8]> + Clone>(
async fn query_impl<Octs: AsRef<[u8]> + Clone + Send>(
&self,
query_msg: &mut MessageBuilder<StaticCompressor<StreamTarget<Octs>>>,
) -> Result<Query<Octs>, Error> {
Expand All @@ -65,15 +65,15 @@ impl Connection {
}
}

impl<Octs: AsRef<[u8]> + Clone> QueryMessage<Query<Octs>, Octs>
impl<Octs: AsRef<[u8]> + Clone + Send> QueryMessage<Query<Octs>, Octs>
for Connection
{
fn query<'a>(
&'a self,
query_msg: &'a mut MessageBuilder<
StaticCompressor<StreamTarget<Octs>>,
>,
) -> Pin<Box<dyn Future<Output = Result<Query<Octs>, Error>> + '_>> {
) -> Pin<Box<dyn Future<Output = Result<Query<Octs>, Error>> + Send + '_>> {
return Box::pin(self.query_impl(query_msg));
}
}
Expand Down Expand Up @@ -117,7 +117,7 @@ pub struct Query<Octs> {
state: QueryState,
}

impl<Octs: AsRef<[u8]> + Clone> Query<Octs> {
impl<Octs: AsRef<[u8]> + Clone + Send> Query<Octs> {
/// Create new Query object.
fn new(
query_msg: &mut MessageBuilder<StaticCompressor<StreamTarget<Octs>>>,
Expand Down Expand Up @@ -168,16 +168,12 @@ impl<Octs: AsRef<[u8]> + Clone> Query<Octs> {
continue;
}
QueryState::Send => {
let dgram = self.query_msg .as_target() .as_target() .as_dgram_slice();
let sent = self
.sock
.as_ref()
.expect("socket should be present")
.send(
self.query_msg
.as_target()
.as_target()
.as_dgram_slice(),
)
.send( dgram)
.await
.map_err(|e| Error::UdpSend(Arc::new(e)))?;
if sent
Expand Down Expand Up @@ -265,10 +261,10 @@ impl<Octs: AsRef<[u8]> + Clone> Query<Octs> {
}
}

impl<Octs: AsRef<[u8]> + Clone> GetResult for Query<Octs> {
impl<Octs: AsRef<[u8]> + Clone + Send> GetResult for Query<Octs> {
fn get_result(
&mut self,
) -> Pin<Box<dyn Future<Output = Result<Message<Bytes>, Error>> + '_>>
) -> Pin<Box<dyn Future<Output = Result<Message<Bytes>, Error>> + Send + '_>>
{
Box::pin(self.get_result_impl())
}
Expand All @@ -293,7 +289,7 @@ impl InnerConnection {
}

/// Return a Query object that contains the query state.
async fn query<Octs: AsRef<[u8]> + Clone>(
async fn query<Octs: AsRef<[u8]> + Clone + Send>(
&self,
query_msg: &mut MessageBuilder<StaticCompressor<StreamTarget<Octs>>>,
conn: Connection,
Expand Down
10 changes: 6 additions & 4 deletions src/net/client/udp_tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ impl<Octs: Clone + Composer + Debug + OctetsBuilder + Send + 'static>
query_msg: &'a mut MessageBuilder<
StaticCompressor<StreamTarget<Octs>>,
>,
) -> Pin<Box<dyn Future<Output = Result<Query<Octs>, Error>> + '_>> {
) -> Pin<Box<dyn Future<Output = Result<Query<Octs>, Error>> + Send + '_>> {
return Box::pin(self.query_impl(query_msg));
}
}
Expand Down Expand Up @@ -134,9 +134,10 @@ impl<
loop {
match &mut self.state {
QueryState::StartUdpQuery => {
let mut msg = self.query_msg.clone();
let query = self
.udp_conn
.query(&mut self.query_msg.clone())
.query(&mut msg)
.await?;
self.state = QueryState::GetUdpResult(query);
continue;
Expand All @@ -150,9 +151,10 @@ impl<
return Ok(reply);
}
QueryState::StartTcpQuery => {
let mut msg = self.query_msg.clone();
let query = self
.tcp_conn
.query(&mut self.query_msg.clone())
.query(&mut msg)
.await?;
self.state = QueryState::GetTcpResult(query);
continue;
Expand All @@ -179,7 +181,7 @@ impl<
{
fn get_result(
&mut self,
) -> Pin<Box<dyn Future<Output = Result<Message<Bytes>, Error>> + '_>>
) -> Pin<Box<dyn Future<Output = Result<Message<Bytes>, Error>> + Send + '_>>
{
Box::pin(self.get_result_impl())
}
Expand Down

0 comments on commit 2119f2d

Please sign in to comment.