diff --git a/examples/cat.rs b/examples/cat.rs index d41b7ab1..824f988b 100644 --- a/examples/cat.rs +++ b/examples/cat.rs @@ -29,9 +29,7 @@ fn main() { loop { // Read a chunk - let (res, b) = file.read_at(buf, pos).await; - let n = res.unwrap(); - + let (n, b) = file.read_at(buf, pos).await.unwrap(); if n == 0 { break; } diff --git a/examples/mix.rs b/examples/mix.rs index 4e094019..a0b99995 100644 --- a/examples/mix.rs +++ b/examples/mix.rs @@ -34,15 +34,14 @@ fn main() { loop { // Read a chunk - let (res, b) = file.read_at(buf, pos).await; - let n = res.unwrap(); + let (n, b) = file.read_at(buf, pos).await.unwrap(); if n == 0 { break; } - let (res, b) = socket.write(b).submit().await; - pos += res.unwrap() as u64; + let (n, b) = socket.write(b).submit().await.unwrap(); + pos += n as u64; buf = b; } diff --git a/examples/tcp_listener.rs b/examples/tcp_listener.rs index 918503ca..45175f16 100644 --- a/examples/tcp_listener.rs +++ b/examples/tcp_listener.rs @@ -29,16 +29,14 @@ fn main() { let mut buf = vec![0u8; 4096]; loop { - let (result, nbuf) = stream.read(buf).await; + let (read, nbuf) = stream.read(buf).await.unwrap(); buf = nbuf; - let read = result.unwrap(); if read == 0 { println!("{} closed, {} total ping-ponged", socket_addr, n); break; } - let (res, slice) = stream.write_all(buf.slice(..read)).await; - let _ = res.unwrap(); + let (_, slice) = stream.write_all(buf.slice(..read)).await.unwrap(); buf = slice.into_inner(); println!("{} all {} bytes ping-ponged", socket_addr, read); n += read; diff --git a/examples/tcp_listener_fixed_buffers.rs b/examples/tcp_listener_fixed_buffers.rs index 69db2c8e..4be6f741 100644 --- a/examples/tcp_listener_fixed_buffers.rs +++ b/examples/tcp_listener_fixed_buffers.rs @@ -79,17 +79,14 @@ async fn echo_handler( // Each time through the loop, use fbuf and then get it back for the next // iteration. - let (result, fbuf1) = stream.read_fixed(fbuf).await; + let (read, fbuf1) = stream.read_fixed(fbuf).await.unwrap(); fbuf = { - let read = result.unwrap(); if read == 0 { break; } assert_eq!(4096, fbuf1.len()); // To prove a point. - let (res, nslice) = stream.write_fixed_all(fbuf1.slice(..read)).await; - - let _ = res.unwrap(); + let (_, nslice) = stream.write_fixed_all(fbuf1.slice(..read)).await.unwrap(); println!("peer {} all {} bytes ping-ponged", peer, read); n += read; diff --git a/examples/tcp_stream.rs b/examples/tcp_stream.rs index 4983ee4c..9b3d4a3a 100644 --- a/examples/tcp_stream.rs +++ b/examples/tcp_stream.rs @@ -15,11 +15,10 @@ fn main() { let stream = TcpStream::connect(socket_addr).await.unwrap(); let buf = vec![1u8; 128]; - let (result, buf) = stream.write(buf).submit().await; - println!("written: {}", result.unwrap()); + let (n, buf) = stream.write(buf).submit().await.unwrap(); + println!("written: {}", n); - let (result, buf) = stream.read(buf).await; - let read = result.unwrap(); + let (read, buf) = stream.read(buf).await.unwrap(); println!("read: {:?}", &buf[..read]); }); } diff --git a/examples/udp_socket.rs b/examples/udp_socket.rs index 01eb9b32..ddbf2138 100644 --- a/examples/udp_socket.rs +++ b/examples/udp_socket.rs @@ -15,12 +15,11 @@ fn main() { let buf = vec![0u8; 128]; - let (result, mut buf) = socket.recv_from(buf).await; - let (read, socket_addr) = result.unwrap(); + let ((read, socket_addr), mut buf) = socket.recv_from(buf).await.unwrap(); buf.resize(read, 0); println!("received from {}: {:?}", socket_addr, &buf[..]); - let (result, _buf) = socket.send_to(buf, socket_addr).await; - println!("sent to {}: {}", socket_addr, result.unwrap()); + let (n, _buf) = socket.send_to(buf, socket_addr).await.unwrap(); + println!("sent to {}: {}", socket_addr, n); }); } diff --git a/examples/unix_listener.rs b/examples/unix_listener.rs index 9e10496d..54261db8 100644 --- a/examples/unix_listener.rs +++ b/examples/unix_listener.rs @@ -20,11 +20,10 @@ fn main() { tokio_uring::spawn(async move { let buf = vec![1u8; 128]; - let (result, buf) = stream.write(buf).submit().await; - println!("written to {}: {}", &socket_addr, result.unwrap()); + let (n, buf) = stream.write(buf).submit().await.unwrap(); + println!("written to {}: {}", &socket_addr, n); - let (result, buf) = stream.read(buf).await; - let read = result.unwrap(); + let (read, buf) = stream.read(buf).await.unwrap(); println!("read from {}: {:?}", &socket_addr, &buf[..read]); }); } diff --git a/examples/unix_stream.rs b/examples/unix_stream.rs index 7caf06f9..5618f4f1 100644 --- a/examples/unix_stream.rs +++ b/examples/unix_stream.rs @@ -15,11 +15,10 @@ fn main() { let stream = UnixStream::connect(socket_addr).await.unwrap(); let buf = vec![1u8; 128]; - let (result, buf) = stream.write(buf).submit().await; - println!("written: {}", result.unwrap()); + let (n, buf) = stream.write(buf).submit().await.unwrap(); + println!("written: {}", n); - let (result, buf) = stream.read(buf).await; - let read = result.unwrap(); + let (read, buf) = stream.read(buf).await.unwrap(); println!("read: {:?}", &buf[..read]); }); } diff --git a/examples/wrk-bench.rs b/examples/wrk-bench.rs index 222df76a..9a9047e9 100644 --- a/examples/wrk-bench.rs +++ b/examples/wrk-bench.rs @@ -21,8 +21,7 @@ fn main() -> io::Result<()> { let (stream, _) = listener.accept().await?; tokio_uring::spawn(async move { - let (result, _) = stream.write(RESPONSE).submit().await; - + let result = stream.write(RESPONSE).submit().await; if let Err(err) = result { eprintln!("Client connection failed: {}", err); } diff --git a/src/fs/file.rs b/src/fs/file.rs index 9cd47f21..451223ac 100644 --- a/src/fs/file.rs +++ b/src/fs/file.rs @@ -4,6 +4,7 @@ use crate::fs::OpenOptions; use crate::io::SharedFd; use crate::runtime::driver::op::Op; +use crate::MapResult; use crate::{UnsubmittedOneshot, UnsubmittedWrite}; use std::fmt; use std::io; @@ -39,8 +40,7 @@ use std::path::Path; /// let file = File::create("hello.txt").await?; /// /// // Write some data -/// let (res, buf) = file.write_at(&b"hello world"[..], 0).submit().await; -/// let n = res?; +/// let (n, buf) = file.write_at(&b"hello world"[..], 0).submit().await?; /// /// println!("wrote {} bytes", n); /// @@ -165,8 +165,7 @@ impl File { /// let buffer = vec![0; 10]; /// /// // Read up to 10 bytes - /// let (res, buffer) = f.read_at(buffer, 0).await; - /// let n = res?; + /// let (n, buffer) = f.read_at(buffer, 0).await?; /// /// println!("The bytes: {:?}", &buffer[..n]); /// @@ -176,7 +175,7 @@ impl File { /// }) /// } /// ``` - pub async fn read_at(&self, buf: T, pos: u64) -> crate::BufResult { + pub async fn read_at(&self, buf: T, pos: u64) -> crate::Result { // Submit the read operation let op = Op::read_at(&self.fd, buf, pos).unwrap(); op.await @@ -216,8 +215,7 @@ impl File { /// let buffers = vec![Vec::::with_capacity(10), Vec::::with_capacity(10)]; /// /// // Read up to 20 bytes - /// let (res, buffer) = f.readv_at(buffers, 0).await; - /// let n = res?; + /// let (n, _) = f.readv_at(buffers, 0).await?; /// /// println!("Read {} bytes", n); /// @@ -231,7 +229,7 @@ impl File { &self, bufs: Vec, pos: u64, - ) -> crate::BufResult> { + ) -> crate::Result> { // Submit the read operation let op = Op::readv_at(&self.fd, bufs, pos).unwrap(); op.await @@ -271,8 +269,7 @@ impl File { /// /// // Writes some prefix of the byte string, not necessarily all of it. /// let bufs = vec!["some".to_owned().into_bytes(), " bytes".to_owned().into_bytes()]; - /// let (res, _) = file.writev_at(bufs, 0).await; - /// let n = res?; + /// let (n, _) = file.writev_at(bufs, 0).await?; /// /// println!("wrote {} bytes", n); /// @@ -288,7 +285,7 @@ impl File { &self, buf: Vec, pos: u64, - ) -> crate::BufResult> { + ) -> crate::Result> { let op = Op::writev_at(&self.fd, buf, pos).unwrap(); op.await } @@ -341,7 +338,7 @@ impl File { &self, buf: Vec, pos: Option, // Use None for files that can't seek - ) -> crate::BufResult> { + ) -> crate::Result> { let op = crate::io::writev_at_all(&self.fd, buf, pos); op.await } @@ -379,8 +376,7 @@ impl File { /// let buffer = Vec::with_capacity(10); /// /// // Read up to 10 bytes - /// let (res, buffer) = f.read_exact_at(buffer, 0).await; - /// res?; + /// let (_, buffer) = f.read_exact_at(buffer, 0).await?; /// /// println!("The bytes: {:?}", buffer); /// @@ -392,43 +388,37 @@ impl File { /// ``` /// /// [`ErrorKind::UnexpectedEof`]: std::io::ErrorKind::UnexpectedEof - pub async fn read_exact_at(&self, buf: T, pos: u64) -> crate::BufResult<(), T> + pub async fn read_exact_at(&self, buf: T, pos: u64) -> crate::Result<(), T> where T: BoundedBufMut, { let orig_bounds = buf.bounds(); - let (res, buf) = self.read_exact_slice_at(buf.slice_full(), pos).await; - (res, T::from_buf_bounds(buf, orig_bounds)) + self.read_exact_slice_at(buf.slice_full(), pos) + .await + .map_buf(|buf| T::from_buf_bounds(buf, orig_bounds)) } async fn read_exact_slice_at( &self, mut buf: Slice, mut pos: u64, - ) -> crate::BufResult<(), T> { + ) -> crate::Result<(), T> { if pos.checked_add(buf.bytes_total() as u64).is_none() { - return ( - Err(io::Error::new( - io::ErrorKind::InvalidInput, - "buffer too large for file", - )), + return Err(crate::Error( + io::Error::new(io::ErrorKind::InvalidInput, "buffer too large for file"), buf.into_inner(), - ); + )); } while buf.bytes_total() != 0 { - let (res, slice) = self.read_at(buf, pos).await; - match res { - Ok(0) => { - return ( - Err(io::Error::new( - io::ErrorKind::UnexpectedEof, - "failed to fill whole buffer", - )), + match self.read_at(buf, pos).await { + Ok((0, slice)) => { + return Err(crate::Error( + io::Error::new(io::ErrorKind::UnexpectedEof, "failed to fill whole buffer"), slice.into_inner(), - ) + )) } - Ok(n) => { + Ok((n, slice)) => { pos += n as u64; buf = slice.slice(n..); } @@ -437,11 +427,11 @@ impl File { // crate's design ensures we are not calling the 'wait' option // in the ENTER syscall. Only an Enter with 'wait' can generate // an EINTR according to the io_uring man pages. - Err(e) => return (Err(e), slice.into_inner()), + Err(e) => return Err(e.map(|slice| slice.into_inner())), }; } - (Ok(()), buf.into_inner()) + Ok(((), buf.into_inner())) } /// Like [`read_at`], but using a pre-mapped buffer @@ -473,8 +463,7 @@ impl File { /// let buffer = registry.check_out(2).unwrap(); /// /// // Read up to 10 bytes - /// let (res, buffer) = f.read_fixed_at(buffer, 0).await; - /// let n = res?; + /// let (n, buffer) = f.read_fixed_at(buffer, 0).await?; /// /// println!("The bytes: {:?}", &buffer[..n]); /// @@ -484,7 +473,7 @@ impl File { /// }) ///# } /// ``` - pub async fn read_fixed_at(&self, buf: T, pos: u64) -> crate::BufResult + pub async fn read_fixed_at(&self, buf: T, pos: u64) -> crate::Result where T: BoundedBufMut, { @@ -526,8 +515,7 @@ impl File { /// let file = File::create("foo.txt").await?; /// /// // Writes some prefix of the byte string, not necessarily all of it. - /// let (res, _) = file.write_at(&b"some bytes"[..], 0).submit().await; - /// let n = res?; + /// let (n, _) = file.write_at(&b"some bytes"[..], 0).submit().await?; /// /// println!("wrote {} bytes", n); /// @@ -571,8 +559,7 @@ impl File { /// let file = File::create("foo.txt").await?; /// /// // Writes some prefix of the byte string, not necessarily all of it. - /// let (res, _) = file.write_all_at(&b"some bytes"[..], 0).await; - /// res?; + /// file.write_all_at(&b"some bytes"[..], 0).await?; /// /// println!("wrote all bytes"); /// @@ -584,43 +571,37 @@ impl File { /// ``` /// /// [`write_at`]: File::write_at - pub async fn write_all_at(&self, buf: T, pos: u64) -> crate::BufResult<(), T> + pub async fn write_all_at(&self, buf: T, pos: u64) -> crate::Result<(), T> where T: BoundedBuf, { let orig_bounds = buf.bounds(); - let (res, buf) = self.write_all_slice_at(buf.slice_full(), pos).await; - (res, T::from_buf_bounds(buf, orig_bounds)) + self.write_all_slice_at(buf.slice_full(), pos) + .await + .map_buf(|buf| T::from_buf_bounds(buf, orig_bounds)) } async fn write_all_slice_at( &self, mut buf: Slice, mut pos: u64, - ) -> crate::BufResult<(), T> { + ) -> crate::Result<(), T> { if pos.checked_add(buf.bytes_init() as u64).is_none() { - return ( - Err(io::Error::new( - io::ErrorKind::InvalidInput, - "buffer too large for file", - )), + return Err(crate::Error( + io::Error::new(io::ErrorKind::InvalidInput, "buffer too large for file"), buf.into_inner(), - ); + )); } while buf.bytes_init() != 0 { - let (res, slice) = self.write_at(buf, pos).submit().await; - match res { - Ok(0) => { - return ( - Err(io::Error::new( - io::ErrorKind::WriteZero, - "failed to write whole buffer", - )), + match self.write_at(buf, pos).submit().await { + Ok((0, slice)) => { + return Err(crate::Error( + io::Error::new(io::ErrorKind::WriteZero, "failed to write whole buffer"), slice.into_inner(), - ) + )) } - Ok(n) => { + Ok((n, slice)) => { pos += n as u64; buf = slice.slice(n..); } @@ -629,11 +610,11 @@ impl File { // crate's design ensures we are not calling the 'wait' option // in the ENTER syscall. Only an Enter with 'wait' can generate // an EINTR according to the io_uring man pages. - Err(e) => return (Err(e), slice.into_inner()), + Err(e) => return Err(e.map(|slice| slice.into_inner())), }; } - (Ok(()), buf.into_inner()) + Ok(((), buf.into_inner())) } /// Like [`write_at`], but using a pre-mapped buffer @@ -666,8 +647,7 @@ impl File { /// /// // Writes some prefix of the buffer content, /// // not necessarily all of it. - /// let (res, _) = file.write_fixed_at(buffer, 0).await; - /// let n = res?; + /// let (n, _) = file.write_fixed_at(buffer, 0).await?; /// /// println!("wrote {} bytes", n); /// @@ -677,7 +657,7 @@ impl File { /// }) ///# } /// ``` - pub async fn write_fixed_at(&self, buf: T, pos: u64) -> crate::BufResult + pub async fn write_fixed_at(&self, buf: T, pos: u64) -> crate::Result where T: BoundedBuf, { @@ -704,43 +684,37 @@ impl File { /// This function will return the first error that [`write_fixed_at`] returns. /// /// [`write_fixed_at`]: Self::write_fixed_at - pub async fn write_fixed_all_at(&self, buf: T, pos: u64) -> crate::BufResult<(), T> + pub async fn write_fixed_all_at(&self, buf: T, pos: u64) -> crate::Result<(), T> where T: BoundedBuf, { let orig_bounds = buf.bounds(); - let (res, buf) = self.write_fixed_all_at_slice(buf.slice_full(), pos).await; - (res, T::from_buf_bounds(buf, orig_bounds)) + self.write_fixed_all_at_slice(buf.slice_full(), pos) + .await + .map_buf(|buf| T::from_buf_bounds(buf, orig_bounds)) } async fn write_fixed_all_at_slice( &self, mut buf: Slice, mut pos: u64, - ) -> crate::BufResult<(), FixedBuf> { + ) -> crate::Result<(), FixedBuf> { if pos.checked_add(buf.bytes_init() as u64).is_none() { - return ( - Err(io::Error::new( - io::ErrorKind::InvalidInput, - "buffer too large for file", - )), + return Err(crate::Error( + io::Error::new(io::ErrorKind::InvalidInput, "buffer too large for file"), buf.into_inner(), - ); + )); } while buf.bytes_init() != 0 { - let (res, slice) = self.write_fixed_at(buf, pos).await; - match res { - Ok(0) => { - return ( - Err(io::Error::new( - io::ErrorKind::WriteZero, - "failed to write whole buffer", - )), + match self.write_fixed_at(buf, pos).await { + Ok((0, slice)) => { + return Err(crate::Error( + io::Error::new(io::ErrorKind::WriteZero, "failed to write whole buffer"), slice.into_inner(), - ) + )) } - Ok(n) => { + Ok((n, slice)) => { pos += n as u64; buf = slice.slice(n..); } @@ -749,11 +723,11 @@ impl File { // crate's design ensures we are not calling the 'wait' option // in the ENTER syscall. Only an Enter with 'wait' can generate // an EINTR according to the io_uring man pages. - Err(e) => return (Err(e), slice.into_inner()), + Err(e) => return Err(e.map(|slice| slice.into_inner())), }; } - (Ok(()), buf.into_inner()) + Ok(((), buf.into_inner())) } /// Attempts to sync all OS-internal metadata to disk. @@ -773,8 +747,7 @@ impl File { /// fn main() -> Result<(), Box> { /// tokio_uring::start(async { /// let f = File::create("foo.txt").await?; - /// let (res, buf) = f.write_at(&b"Hello, world!"[..], 0).submit().await; - /// let n = res?; + /// f.write_at(&b"Hello, world!"[..], 0).submit().await?; /// /// f.sync_all().await?; /// @@ -810,8 +783,7 @@ impl File { /// fn main() -> Result<(), Box> { /// tokio_uring::start(async { /// let f = File::create("foo.txt").await?; - /// let (res, buf) = f.write_at(&b"Hello, world!"[..], 0).submit().await; - /// let n = res?; + /// f.write_at(&b"Hello, world!"[..], 0).submit().await?; /// /// f.sync_data().await?; /// diff --git a/src/io/read.rs b/src/io/read.rs index c3395b40..c98ed532 100644 --- a/src/io/read.rs +++ b/src/io/read.rs @@ -1,6 +1,7 @@ use crate::buf::BoundedBufMut; use crate::io::SharedFd; -use crate::BufResult; +use crate::Result; +use crate::WithBuffer; use crate::runtime::driver::op::{Completable, CqeResult, Op}; use crate::runtime::CONTEXT; @@ -43,7 +44,7 @@ impl Completable for Read where T: BoundedBufMut, { - type Output = BufResult; + type Output = Result; fn complete(self, cqe: CqeResult) -> Self::Output { // Convert the operation result to `usize` @@ -59,6 +60,6 @@ where } } - (res, buf) + res.with_buffer(buf) } } diff --git a/src/io/read_fixed.rs b/src/io/read_fixed.rs index 3cb96cdb..dba9d094 100644 --- a/src/io/read_fixed.rs +++ b/src/io/read_fixed.rs @@ -2,7 +2,8 @@ use crate::buf::fixed::FixedBuf; use crate::buf::BoundedBufMut; use crate::io::SharedFd; use crate::runtime::driver::op::{self, Completable, Op}; -use crate::BufResult; +use crate::Result; +use crate::WithBuffer; use crate::runtime::CONTEXT; use std::io; @@ -52,13 +53,13 @@ impl Completable for ReadFixed where T: BoundedBufMut, { - type Output = BufResult; + type Output = Result; fn complete(self, cqe: op::CqeResult) -> Self::Output { - // Convert the operation result to `usize` - let res = cqe.result.map(|v| v as usize); // Recover the buffer let mut buf = self.buf; + // Convert the operation result to `usize` + let res = cqe.result.map(|v| v as usize); // If the operation was successful, advance the initialized cursor. if let Ok(n) = res { @@ -68,6 +69,6 @@ where } } - (res, buf) + res.with_buffer(buf) } } diff --git a/src/io/readv.rs b/src/io/readv.rs index ff71dc79..9fc0a902 100644 --- a/src/io/readv.rs +++ b/src/io/readv.rs @@ -1,5 +1,6 @@ use crate::buf::BoundedBufMut; -use crate::BufResult; +use crate::Result; +use crate::WithBuffer; use crate::io::SharedFd; use crate::runtime::driver::op::{Completable, CqeResult, Op}; @@ -62,7 +63,7 @@ impl Completable for Readv where T: BoundedBufMut, { - type Output = BufResult>; + type Output = Result>; fn complete(self, cqe: CqeResult) -> Self::Output { // Convert the operation result to `usize` @@ -87,6 +88,6 @@ where assert_eq!(count, 0); } - (res, bufs) + res.with_buffer(bufs) } } diff --git a/src/io/recv_from.rs b/src/io/recv_from.rs index e9b360ca..2dc979a9 100644 --- a/src/io/recv_from.rs +++ b/src/io/recv_from.rs @@ -1,6 +1,7 @@ use crate::runtime::driver::op::{Completable, CqeResult, Op}; use crate::runtime::CONTEXT; -use crate::{buf::BoundedBufMut, io::SharedFd, BufResult}; +use crate::WithBuffer; +use crate::{buf::BoundedBufMut, io::SharedFd, Result}; use socket2::SockAddr; use std::{ io::IoSliceMut, @@ -57,7 +58,7 @@ impl Completable for RecvFrom where T: BoundedBufMut, { - type Output = BufResult<(usize, SocketAddr), T>; + type Output = Result<(usize, SocketAddr), T>; fn complete(self, cqe: CqeResult) -> Self::Output { // Convert the operation result to `usize` @@ -78,6 +79,6 @@ where (n, socket_addr) }); - (res, buf) + res.with_buffer(buf) } } diff --git a/src/io/recvmsg.rs b/src/io/recvmsg.rs index 3cae2e50..ad634422 100644 --- a/src/io/recvmsg.rs +++ b/src/io/recvmsg.rs @@ -1,6 +1,7 @@ use crate::runtime::driver::op::{Completable, CqeResult, Op}; use crate::runtime::CONTEXT; -use crate::{buf::BoundedBufMut, io::SharedFd, BufResult}; +use crate::WithBuffer; +use crate::{buf::BoundedBufMut, io::SharedFd, Result}; use socket2::SockAddr; use std::{ io::IoSliceMut, @@ -61,7 +62,7 @@ impl Completable for RecvMsg where T: BoundedBufMut, { - type Output = BufResult<(usize, SocketAddr), Vec>; + type Output = Result<(usize, SocketAddr), Vec>; fn complete(self, cqe: CqeResult) -> Self::Output { // Convert the operation result to `usize` @@ -92,6 +93,6 @@ where (n, socket_addr) }); - (res, bufs) + res.with_buffer(bufs) } } diff --git a/src/io/send_to.rs b/src/io/send_to.rs index 8895f5fa..9a15210c 100644 --- a/src/io/send_to.rs +++ b/src/io/send_to.rs @@ -2,7 +2,8 @@ use crate::buf::BoundedBuf; use crate::io::SharedFd; use crate::runtime::driver::op::{Completable, CqeResult, Op}; use crate::runtime::CONTEXT; -use crate::BufResult; +use crate::Result; +use crate::WithBuffer; use socket2::SockAddr; use std::io::IoSlice; use std::{boxed::Box, io, net::SocketAddr}; @@ -70,14 +71,9 @@ impl Op> { } impl Completable for SendTo { - type Output = BufResult; + type Output = Result; fn complete(self, cqe: CqeResult) -> Self::Output { - // Convert the operation result to `usize` - let res = cqe.result.map(|v| v as usize); - // Recover the buffer - let buf = self.buf; - - (res, buf) + cqe.result.map(|v| v as usize).with_buffer(self.buf) } } diff --git a/src/io/send_zc.rs b/src/io/send_zc.rs index df37722b..f15ccbe7 100644 --- a/src/io/send_zc.rs +++ b/src/io/send_zc.rs @@ -1,6 +1,7 @@ use crate::runtime::driver::op::{Completable, CqeResult, MultiCQEFuture, Op, Updateable}; use crate::runtime::CONTEXT; -use crate::{buf::BoundedBuf, io::SharedFd, BufResult}; +use crate::WithBuffer; +use crate::{buf::BoundedBuf, io::SharedFd, Result}; use std::io; pub(crate) struct SendZc { @@ -39,14 +40,12 @@ impl Op, MultiCQEFuture> { } impl Completable for SendZc { - type Output = BufResult; + type Output = Result; fn complete(self, cqe: CqeResult) -> Self::Output { - // Convert the operation result to `usize` - let res = cqe.result.map(|v| self.bytes + v as usize); - // Recover the buffer - let buf = self.buf; - (res, buf) + cqe.result + .map(|v| self.bytes + v as usize) + .with_buffer(self.buf) } } diff --git a/src/io/socket.rs b/src/io/socket.rs index dda1bb36..5cc2071e 100644 --- a/src/io/socket.rs +++ b/src/io/socket.rs @@ -47,26 +47,28 @@ impl Socket { UnsubmittedOneshot::write_at(&self.fd, buf, 0) } - pub async fn write_all(&self, buf: T) -> crate::BufResult<(), T> { + pub async fn write_all(&self, buf: T) -> crate::Result<(), T> { let orig_bounds = buf.bounds(); - let (res, buf) = self.write_all_slice(buf.slice_full()).await; - (res, T::from_buf_bounds(buf, orig_bounds)) + match self.write_all_slice(buf.slice_full()).await { + Ok((x, buf)) => Ok((x, T::from_buf_bounds(buf, orig_bounds))), + Err(e) => Err(e.map(|buf| T::from_buf_bounds(buf, orig_bounds))), + } } - async fn write_all_slice(&self, mut buf: Slice) -> crate::BufResult<(), T> { + async fn write_all_slice(&self, mut buf: Slice) -> crate::Result<(), T> { while buf.bytes_init() != 0 { let res = self.write(buf).submit().await; match res { - (Ok(0), slice) => { - return ( - Err(std::io::Error::new( + Ok((0, slice)) => { + return Err(crate::Error( + std::io::Error::new( std::io::ErrorKind::WriteZero, "failed to write whole buffer", - )), + ), slice.into_inner(), - ) + )) } - (Ok(n), slice) => { + Ok((n, slice)) => { buf = slice.slice(n..); } @@ -74,14 +76,14 @@ impl Socket { // crate's design ensures we are not calling the 'wait' option // in the ENTER syscall. Only an Enter with 'wait' can generate // an EINTR according to the io_uring man pages. - (Err(e), slice) => return (Err(e), slice.into_inner()), + Err(e) => return Err(e.map(|slice| slice.into_inner())), } } - (Ok(()), buf.into_inner()) + Ok(((), buf.into_inner())) } - pub(crate) async fn write_fixed(&self, buf: T) -> crate::BufResult + pub(crate) async fn write_fixed(&self, buf: T) -> crate::Result where T: BoundedBuf, { @@ -89,32 +91,31 @@ impl Socket { op.await } - pub(crate) async fn write_fixed_all(&self, buf: T) -> crate::BufResult<(), T> + pub(crate) async fn write_fixed_all(&self, buf: T) -> crate::Result<(), T> where T: BoundedBuf, { let orig_bounds = buf.bounds(); - let (res, buf) = self.write_fixed_all_slice(buf.slice_full()).await; - (res, T::from_buf_bounds(buf, orig_bounds)) + match self.write_fixed_all_slice(buf.slice_full()).await { + Ok((r, buf)) => Ok((r, T::from_buf_bounds(buf, orig_bounds))), + Err(e) => Err(e.map(|buf| T::from_buf_bounds(buf, orig_bounds))), + } } - async fn write_fixed_all_slice( - &self, - mut buf: Slice, - ) -> crate::BufResult<(), FixedBuf> { + async fn write_fixed_all_slice(&self, mut buf: Slice) -> crate::Result<(), FixedBuf> { while buf.bytes_init() != 0 { let res = self.write_fixed(buf).await; match res { - (Ok(0), slice) => { - return ( - Err(std::io::Error::new( + Ok((0, slice)) => { + return Err(crate::Error( + std::io::Error::new( std::io::ErrorKind::WriteZero, "failed to write whole buffer", - )), + ), slice.into_inner(), - ) + )) } - (Ok(n), slice) => { + Ok((n, slice)) => { buf = slice.slice(n..); } @@ -122,14 +123,14 @@ impl Socket { // crate's design ensures we are not calling the 'wait' option // in the ENTER syscall. Only an Enter with 'wait' can generate // an EINTR according to the io_uring man pages. - (Err(e), slice) => return (Err(e), slice.into_inner()), + Err(e) => return Err(e.map(|slice| slice.into_inner())), } } - (Ok(()), buf.into_inner()) + Ok(((), buf.into_inner())) } - pub async fn writev(&self, buf: Vec) -> crate::BufResult> { + pub async fn writev(&self, buf: Vec) -> crate::Result> { let op = Op::writev_at(&self.fd, buf, 0).unwrap(); op.await } @@ -138,12 +139,12 @@ impl Socket { &self, buf: T, socket_addr: Option, - ) -> crate::BufResult { + ) -> crate::Result { let op = Op::send_to(&self.fd, buf, socket_addr).unwrap(); op.await } - pub(crate) async fn send_zc(&self, buf: T) -> crate::BufResult { + pub(crate) async fn send_zc(&self, buf: T) -> crate::Result { let op = Op::send_zc(&self.fd, buf).unwrap(); op.await } @@ -168,12 +169,12 @@ impl Socket { op.await } - pub(crate) async fn read(&self, buf: T) -> crate::BufResult { + pub(crate) async fn read(&self, buf: T) -> crate::Result { let op = Op::read_at(&self.fd, buf, 0).unwrap(); op.await } - pub(crate) async fn read_fixed(&self, buf: T) -> crate::BufResult + pub(crate) async fn read_fixed(&self, buf: T) -> crate::Result where T: BoundedBufMut, { @@ -184,7 +185,7 @@ impl Socket { pub(crate) async fn recv_from( &self, buf: T, - ) -> crate::BufResult<(usize, SocketAddr), T> { + ) -> crate::Result<(usize, SocketAddr), T> { let op = Op::recv_from(&self.fd, buf).unwrap(); op.await } @@ -192,7 +193,7 @@ impl Socket { pub(crate) async fn recvmsg( &self, buf: Vec, - ) -> crate::BufResult<(usize, SocketAddr), Vec> { + ) -> crate::Result<(usize, SocketAddr), Vec> { let op = Op::recvmsg(&self.fd, buf).unwrap(); op.await } diff --git a/src/io/write.rs b/src/io/write.rs index 6c607f75..e2a95f32 100644 --- a/src/io/write.rs +++ b/src/io/write.rs @@ -1,4 +1,5 @@ -use crate::{buf::BoundedBuf, io::SharedFd, BufResult, OneshotOutputTransform, UnsubmittedOneshot}; +use crate::WithBuffer; +use crate::{buf::BoundedBuf, io::SharedFd, OneshotOutputTransform, Result, UnsubmittedOneshot}; use io_uring::cqueue::Entry; use std::io; use std::marker::PhantomData; @@ -21,7 +22,7 @@ pub struct WriteTransform { } impl OneshotOutputTransform for WriteTransform { - type Output = BufResult; + type Output = Result; type StoredData = WriteData; fn transform_oneshot_output(self, data: Self::StoredData, cqe: Entry) -> Self::Output { @@ -31,7 +32,7 @@ impl OneshotOutputTransform for WriteTransform { Err(io::Error::from_raw_os_error(-cqe.result())) }; - (res, data.buf) + res.with_buffer(data.buf) } } diff --git a/src/io/write_fixed.rs b/src/io/write_fixed.rs index 1d2c3e38..9119e8c9 100644 --- a/src/io/write_fixed.rs +++ b/src/io/write_fixed.rs @@ -2,7 +2,8 @@ use crate::buf::fixed::FixedBuf; use crate::buf::BoundedBuf; use crate::io::SharedFd; use crate::runtime::driver::op::{self, Completable, Op}; -use crate::BufResult; +use crate::Result; +use crate::WithBuffer; use crate::runtime::CONTEXT; use std::io; @@ -48,14 +49,9 @@ where } impl Completable for WriteFixed { - type Output = BufResult; + type Output = Result; fn complete(self, cqe: op::CqeResult) -> Self::Output { - // Convert the operation result to `usize` - let res = cqe.result.map(|v| v as usize); - // Recover the buffer - let buf = self.buf; - - (res, buf) + cqe.result.map(|v| v as usize).with_buffer(self.buf) } } diff --git a/src/io/writev.rs b/src/io/writev.rs index 86236ebc..a2833100 100644 --- a/src/io/writev.rs +++ b/src/io/writev.rs @@ -1,6 +1,7 @@ use crate::runtime::driver::op::{Completable, CqeResult, Op}; use crate::runtime::CONTEXT; -use crate::{buf::BoundedBuf, io::SharedFd, BufResult}; +use crate::WithBuffer; +use crate::{buf::BoundedBuf, io::SharedFd, Result}; use libc::iovec; use std::io; @@ -58,14 +59,9 @@ impl Completable for Writev where T: BoundedBuf, { - type Output = BufResult>; + type Output = Result>; fn complete(self, cqe: CqeResult) -> Self::Output { - // Convert the operation result to `usize` - let res = cqe.result.map(|v| v as usize); - // Recover the buffer - let buf = self.bufs; - - (res, buf) + cqe.result.map(|v| v as usize).with_buffer(self.bufs) } } diff --git a/src/io/writev_all.rs b/src/io/writev_all.rs index ef5b9d40..1182532c 100644 --- a/src/io/writev_all.rs +++ b/src/io/writev_all.rs @@ -17,7 +17,7 @@ pub(crate) async fn writev_at_all( fd: &SharedFd, mut bufs: Vec, offset: Option, -) -> crate::BufResult> { +) -> crate::Result> { // TODO decide if the function should return immediately if all the buffer lengths // were to sum to zero. That would save an allocation and one call into writev. @@ -59,7 +59,7 @@ pub(crate) async fn writev_at_all( // On error, there is no indication how many bytes were written. This is standard. // The device doesn't tell us that either. - Err(e) => return (Err(e), bufs), + Err(e) => return Err(crate::Error(e, bufs)), }; // TODO if n is zero, while there was more data to be written, should this be interpreted @@ -101,7 +101,7 @@ pub(crate) async fn writev_at_all( break; } } - (Ok(total), bufs) + Ok((total, bufs)) } struct WritevAll { diff --git a/src/lib.rs b/src/lib.rs index d1cc6e02..5405c9c5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -20,8 +20,7 @@ //! // Read some data, the buffer is passed by ownership and //! // submitted to the kernel. When the operation completes, //! // we get the buffer back. -//! let (res, buf) = file.read_at(buf, 0).await; -//! let n = res?; +//! let (n, buf) = file.read_at(buf, 0).await?; //! //! // Display the contents //! println!("{:?}", &buf[..n]); @@ -73,6 +72,7 @@ macro_rules! syscall { mod future; mod io; mod runtime; +mod types; pub mod buf; pub mod fs; @@ -82,6 +82,7 @@ pub use io::write::*; pub use runtime::driver::op::{InFlightOneshot, OneshotOutputTransform, UnsubmittedOneshot}; pub use runtime::spawn; pub use runtime::Runtime; +pub use types::*; use crate::runtime::driver::op::Op; use std::future::Future; @@ -115,8 +116,7 @@ use std::future::Future; /// // Read some data, the buffer is passed by ownership and /// // submitted to the kernel. When the operation completes, /// // we get the buffer back. -/// let (res, buf) = file.read_at(buf, 0).await; -/// let n = res?; +/// let (n, buf) = file.read_at(buf, 0).await?; /// /// // Display the contents /// println!("{:?}", &buf[..n]); @@ -233,39 +233,6 @@ impl Builder { } } -/// A specialized `Result` type for `io-uring` operations with buffers. -/// -/// This type is used as a return value for asynchronous `io-uring` methods that -/// require passing ownership of a buffer to the runtime. When the operation -/// completes, the buffer is returned whether or not the operation completed -/// successfully. -/// -/// # Examples -/// -/// ```no_run -/// use tokio_uring::fs::File; -/// -/// fn main() -> Result<(), Box> { -/// tokio_uring::start(async { -/// // Open a file -/// let file = File::open("hello.txt").await?; -/// -/// let buf = vec![0; 4096]; -/// // Read some data, the buffer is passed by ownership and -/// // submitted to the kernel. When the operation completes, -/// // we get the buffer back. -/// let (res, buf) = file.read_at(buf, 0).await; -/// let n = res?; -/// -/// // Display the contents -/// println!("{:?}", &buf[..n]); -/// -/// Ok(()) -/// }) -/// } -/// ``` -pub type BufResult = (std::io::Result, B); - /// The simplest possible operation. Just posts a completion event, nothing else. /// /// This has a place in benchmarking and sanity checking uring. diff --git a/src/net/tcp/listener.rs b/src/net/tcp/listener.rs index 2435c61b..6df26589 100644 --- a/src/net/tcp/listener.rs +++ b/src/net/tcp/listener.rs @@ -33,9 +33,9 @@ use std::{ /// let tx = TcpStream::connect("127.0.0.1:2345".parse().unwrap()).await.unwrap(); /// let rx = rx_ch.await.expect("The spawned task expected to send a TcpStream"); /// -/// tx.write(b"test" as &'static [u8]).submit().await.0.unwrap(); +/// tx.write(b"test" as &'static [u8]).submit().await.unwrap(); /// -/// let (_, buf) = rx.read(vec![0; 4]).await; +/// let (_, buf) = rx.read(vec![0; 4]).await.unwrap(); /// /// assert_eq!(buf, b"test"); /// }); diff --git a/src/net/tcp/stream.rs b/src/net/tcp/stream.rs index 2450dcb9..f135618d 100644 --- a/src/net/tcp/stream.rs +++ b/src/net/tcp/stream.rs @@ -28,8 +28,7 @@ use crate::{ /// let mut stream = TcpStream::connect("127.0.0.1:8080".parse().unwrap()).await?; /// /// // Write some data. -/// let (result, _) = stream.write(b"hello world!".as_slice()).submit().await; -/// result.unwrap(); +/// stream.write(b"hello world!".as_slice()).submit().await.unwrap(); /// /// Ok(()) /// }) @@ -74,7 +73,7 @@ impl TcpStream { /// Read some data from the stream into the buffer. /// /// Returns the original buffer and quantity of data read. - pub async fn read(&self, buf: T) -> crate::BufResult { + pub async fn read(&self, buf: T) -> crate::Result { self.inner.read(buf).await } @@ -91,7 +90,7 @@ impl TcpStream { /// In addition to errors that can be reported by `read`, /// this operation fails if the buffer is not registered in the /// current `tokio-uring` runtime. - pub async fn read_fixed(&self, buf: T) -> crate::BufResult + pub async fn read_fixed(&self, buf: T) -> crate::Result where T: BoundedBufMut, { @@ -137,15 +136,13 @@ impl TcpStream { /// let mut n = 0; /// let mut buf = vec![0u8; 4096]; /// loop { - /// let (result, nbuf) = stream.read(buf).await; + /// let (read, nbuf) = stream.read(buf).await.unwrap(); /// buf = nbuf; - /// let read = result.unwrap(); /// if read == 0 { /// break; /// } /// - /// let (res, slice) = stream.write_all(buf.slice(..read)).await; - /// let _ = res.unwrap(); + /// let (_, slice) = stream.write_all(buf.slice(..read)).await.unwrap(); /// buf = slice.into_inner(); /// n += read; /// } @@ -155,7 +152,7 @@ impl TcpStream { /// ``` /// /// [`write`]: Self::write - pub async fn write_all(&self, buf: T) -> crate::BufResult<(), T> { + pub async fn write_all(&self, buf: T) -> crate::Result<(), T> { self.inner.write_all(buf).await } @@ -172,7 +169,7 @@ impl TcpStream { /// In addition to errors that can be reported by `write`, /// this operation fails if the buffer is not registered in the /// current `tokio-uring` runtime. - pub async fn write_fixed(&self, buf: T) -> crate::BufResult + pub async fn write_fixed(&self, buf: T) -> crate::Result where T: BoundedBuf, { @@ -192,7 +189,7 @@ impl TcpStream { /// This function will return the first error that [`write_fixed`] returns. /// /// [`write_fixed`]: Self::write_fixed - pub async fn write_fixed_all(&self, buf: T) -> crate::BufResult<(), T> + pub async fn write_fixed_all(&self, buf: T) -> crate::Result<(), T> where T: BoundedBuf, { @@ -222,7 +219,7 @@ impl TcpStream { /// written to this writer. /// /// [`Ok(n)`]: Ok - pub async fn writev(&self, buf: Vec) -> crate::BufResult> { + pub async fn writev(&self, buf: Vec) -> crate::Result> { self.inner.writev(buf).await } diff --git a/src/net/udp.rs b/src/net/udp.rs index cb0cef66..0714d88e 100644 --- a/src/net/udp.rs +++ b/src/net/udp.rs @@ -43,22 +43,18 @@ use std::{ /// let buf = vec![0; 32]; /// /// // write data -/// let (result, _) = socket.write(b"hello world".as_slice()).submit().await; -/// result.unwrap(); +/// socket.write(b"hello world".as_slice()).submit().await.unwrap(); /// /// // read data -/// let (result, buf) = other_socket.read(buf).await; -/// let n_bytes = result.unwrap(); +/// let (n_bytes, buf) = other_socket.read(buf).await.unwrap(); /// /// assert_eq!(b"hello world", &buf[..n_bytes]); /// /// // write data using send on connected socket -/// let (result, _) = socket.send(b"hello world via send".as_slice()).await; -/// result.unwrap(); +/// socket.send(b"hello world via send".as_slice()).await.unwrap(); /// /// // read data -/// let (result, buf) = other_socket.read(buf).await; -/// let n_bytes = result.unwrap(); +/// let (n_bytes, buf) = other_socket.read(buf).await.unwrap(); /// /// assert_eq!(b"hello world via send", &buf[..n_bytes]); /// @@ -83,12 +79,10 @@ use std::{ /// let buf = vec![0; 32]; /// /// // write data -/// let (result, _) = socket.send_to(b"hello world".as_slice(), second_addr).await; -/// result.unwrap(); +/// socket.send_to(b"hello world".as_slice(), second_addr).await.unwrap(); /// /// // read data -/// let (result, buf) = other_socket.recv_from(buf).await; -/// let (n_bytes, addr) = result.unwrap(); +/// let ((n_bytes, addr), buf) = other_socket.recv_from(buf).await.unwrap(); /// /// assert_eq!(addr, first_addr); /// assert_eq!(b"hello world", &buf[..n_bytes]); @@ -172,14 +166,13 @@ impl UdpSocket { /// let buf = vec![0; 32]; /// /// // write data - /// let (result, _) = std_socket + /// std_socket /// .send_to(b"hello world".as_slice(), second_addr) - /// .await; - /// result.unwrap(); + /// .await + /// .unwrap(); /// /// // read data - /// let (result, buf) = other_socket.recv_from(buf).await; - /// let (n_bytes, addr) = result.unwrap(); + /// let ((n_bytes, addr), buf) = other_socket.recv_from(buf).await.unwrap(); /// /// assert_eq!(addr, std_addr); /// assert_eq!(b"hello world", &buf[..n_bytes]); @@ -212,7 +205,7 @@ impl UdpSocket { /// Sends data on the connected socket /// /// On success, returns the number of bytes written. - pub async fn send(&self, buf: T) -> crate::BufResult { + pub async fn send(&self, buf: T) -> crate::Result { self.inner.send_to(buf, None).await } @@ -223,7 +216,7 @@ impl UdpSocket { &self, buf: T, socket_addr: SocketAddr, - ) -> crate::BufResult { + ) -> crate::Result { self.inner.send_to(buf, Some(socket_addr)).await } @@ -240,7 +233,7 @@ impl UdpSocket { /// > at writes over around 10 KB. /// /// Note: Using fixed buffers [#54](https://github.com/tokio-rs/tokio-uring/pull/54), avoids the page-pinning overhead - pub async fn send_zc(&self, buf: T) -> crate::BufResult { + pub async fn send_zc(&self, buf: T) -> crate::Result { self.inner.send_zc(buf).await } @@ -299,7 +292,7 @@ impl UdpSocket { pub async fn recv_from( &self, buf: T, - ) -> crate::BufResult<(usize, SocketAddr), T> { + ) -> crate::Result<(usize, SocketAddr), T> { self.inner.recv_from(buf).await } @@ -309,14 +302,14 @@ impl UdpSocket { pub async fn recvmsg( &self, buf: Vec, - ) -> crate::BufResult<(usize, SocketAddr), Vec> { + ) -> crate::Result<(usize, SocketAddr), Vec> { self.inner.recvmsg(buf).await } /// Reads a packet of data from the socket into the buffer. /// /// Returns the original buffer and quantity of data read. - pub async fn read(&self, buf: T) -> crate::BufResult { + pub async fn read(&self, buf: T) -> crate::Result { self.inner.read(buf).await } @@ -333,7 +326,7 @@ impl UdpSocket { /// In addition to errors that can be reported by `read`, /// this operation fails if the buffer is not registered in the /// current `tokio-uring` runtime. - pub async fn read_fixed(&self, buf: T) -> crate::BufResult + pub async fn read_fixed(&self, buf: T) -> crate::Result where T: BoundedBufMut, { @@ -360,7 +353,7 @@ impl UdpSocket { /// In addition to errors that can be reported by `write`, /// this operation fails if the buffer is not registered in the /// current `tokio-uring` runtime. - pub async fn write_fixed(&self, buf: T) -> crate::BufResult + pub async fn write_fixed(&self, buf: T) -> crate::Result where T: BoundedBuf, { diff --git a/src/net/unix/listener.rs b/src/net/unix/listener.rs index ffabb5d2..b56f3f77 100644 --- a/src/net/unix/listener.rs +++ b/src/net/unix/listener.rs @@ -30,9 +30,9 @@ use std::{io, path::Path}; /// let tx = UnixStream::connect(&sock_file).await.unwrap(); /// let rx = rx_ch.await.expect("The spawned task expected to send a UnixStream"); /// -/// tx.write(b"test" as &'static [u8]).submit().await.0.unwrap(); +/// tx.write(b"test" as &'static [u8]).submit().await.unwrap(); /// -/// let (_, buf) = rx.read(vec![0; 4]).await; +/// let (_, buf) = rx.read(vec![0; 4]).await.unwrap(); /// /// assert_eq!(buf, b"test"); /// }); diff --git a/src/net/unix/stream.rs b/src/net/unix/stream.rs index 40e7ddc5..8234b479 100644 --- a/src/net/unix/stream.rs +++ b/src/net/unix/stream.rs @@ -28,8 +28,7 @@ use std::{ /// let mut stream = UnixStream::connect("/tmp/tokio-uring-unix-test.sock").await?; /// /// // Write some data. -/// let (result, _) = stream.write(b"hello world!".as_slice()).submit().await; -/// result.unwrap(); +/// stream.write(b"hello world!".as_slice()).submit().await.unwrap(); /// /// Ok(()) /// }) @@ -75,7 +74,7 @@ impl UnixStream { /// Read some data from the stream into the buffer, returning the original buffer and /// quantity of data read. - pub async fn read(&self, buf: T) -> crate::BufResult { + pub async fn read(&self, buf: T) -> crate::Result { self.inner.read(buf).await } @@ -90,7 +89,7 @@ impl UnixStream { /// In addition to errors that can be reported by `read`, /// this operation fails if the buffer is not registered in the /// current `tokio-uring` runtime. - pub async fn read_fixed(&self, buf: T) -> crate::BufResult + pub async fn read_fixed(&self, buf: T) -> crate::Result where T: BoundedBufMut, { @@ -116,7 +115,7 @@ impl UnixStream { /// This function will return the first error that [`write`] returns. /// /// [`write`]: Self::write - pub async fn write_all(&self, buf: T) -> crate::BufResult<(), T> { + pub async fn write_all(&self, buf: T) -> crate::Result<(), T> { self.inner.write_all(buf).await } @@ -131,7 +130,7 @@ impl UnixStream { /// In addition to errors that can be reported by `write`, /// this operation fails if the buffer is not registered in the /// current `tokio-uring` runtime. - pub async fn write_fixed(&self, buf: T) -> crate::BufResult + pub async fn write_fixed(&self, buf: T) -> crate::Result where T: BoundedBuf, { @@ -151,7 +150,7 @@ impl UnixStream { /// This function will return the first error that [`write_fixed`] returns. /// /// [`write_fixed`]: Self::write - pub async fn write_fixed_all(&self, buf: T) -> crate::BufResult<(), T> + pub async fn write_fixed_all(&self, buf: T) -> crate::Result<(), T> where T: BoundedBuf, { @@ -182,7 +181,7 @@ impl UnixStream { /// written to this writer. /// /// [`Ok(n)`]: Ok - pub async fn writev(&self, buf: Vec) -> crate::BufResult> { + pub async fn writev(&self, buf: Vec) -> crate::Result> { self.inner.writev(buf).await } diff --git a/src/runtime/driver/op/mod.rs b/src/runtime/driver/op/mod.rs index 32ba3e7a..7abc3030 100644 --- a/src/runtime/driver/op/mod.rs +++ b/src/runtime/driver/op/mod.rs @@ -165,6 +165,7 @@ pub(crate) enum Lifecycle { /// The submitter no longer has interest in the operation result. The state /// must be passed to the driver and held until the operation completes. + #[allow(dead_code)] Ignored(Box), /// The operation has completed with a single cqe result diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index 369c060b..394c06b5 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -10,7 +10,7 @@ pub(crate) mod driver; pub(crate) use context::RuntimeContext; thread_local! { - pub(crate) static CONTEXT: RuntimeContext = RuntimeContext::new(); + pub(crate) static CONTEXT: RuntimeContext = const {RuntimeContext::new() }; } /// The Runtime Executor diff --git a/src/types.rs b/src/types.rs new file mode 100644 index 00000000..40742bd8 --- /dev/null +++ b/src/types.rs @@ -0,0 +1,104 @@ +use std::fmt::{Debug, Display}; + +/// A specialized `Result` type for `io-uring` operations with buffers. +/// +/// This type is used as a return value for asynchronous `io-uring` methods that +/// require passing ownership of a buffer to the runtime. When the operation +/// completes, the buffer is returned both in the success tuple and as part of the error. +/// +/// # Examples +/// +/// ```no_run +/// use tokio_uring::fs::File; +/// +/// fn main() -> Result<(), Box> { +/// tokio_uring::start(async { +/// // Open a file +/// let file = File::open("hello.txt").await?; +/// +/// let buf = vec![0; 4096]; +/// // Read some data, the buffer is passed by ownership and +/// // submitted to the kernel. When the operation completes, +/// // we get the buffer back. +/// let (n, buf) = file.read_at(buf, 0).await?; +/// +/// // Display the contents +/// println!("{:?}", &buf[..n]); +/// +/// Ok(()) +/// }) +/// } +/// ``` +pub type Result = std::result::Result<(T, B), Error>; + +/// A specialized `Error` type for `io-uring` operations with buffers. +pub struct Error(pub std::io::Error, pub B); +impl Debug for Error { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + Debug::fmt(&self.0, f) + } +} + +impl Display for Error { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + Display::fmt(&self.0, f) + } +} + +impl std::error::Error for Error { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + Some(&self.0) + } +} + +impl Error { + /// Applies a function to the contained buffer, returning a new `BufError`. + pub fn map(self, f: F) -> Error + where + F: FnOnce(B) -> U, + { + Error(self.0, f(self.1)) + } +} + +mod private { + pub trait Sealed {} +} +impl private::Sealed for std::result::Result {} + +/// A Specialized trait for mapping over the buffer in both sides of a Result +pub trait MapResult: private::Sealed { + /// The result type after applying the map operation + type Output; + /// Apply a function over the buffer on both sides of the result + fn map_buf(self, f: impl FnOnce(B) -> U) -> Self::Output; +} + +/// Adapter trait to convert result::Result to crate::Result where E can be +/// converted to std::io::Error. +pub trait WithBuffer: private::Sealed { + /// Insert a buffer into each side of the result + fn with_buffer(self, buf: B) -> T; +} +impl MapResult for Result { + type Output = Result; + fn map_buf(self, f: impl FnOnce(B) -> U) -> Self::Output { + match self { + Ok((r, b)) => Ok((r, f(b))), + Err(e) => Err(e.map(f)), + } + } +} + +/// Adaptor implementation for Result to Result. +impl WithBuffer, B> for std::result::Result +where + E: Into, +{ + fn with_buffer(self, buf: B) -> Result { + match self { + Ok(res) => Ok((res, buf)), + Err(e) => Err(crate::Error(e.into(), buf)), + } + } +} diff --git a/tests/driver.rs b/tests/driver.rs index f4381dd5..2daa288d 100644 --- a/tests/driver.rs +++ b/tests/driver.rs @@ -59,7 +59,6 @@ fn complete_ops_on_drop() { 25 * 1024 * 1024, ) .await - .0 .unwrap(); }) .await; @@ -86,7 +85,6 @@ fn too_many_submissions() { file.write_at(b"hello world".to_vec(), 0) .submit() .await - .0 .unwrap(); }) .await; diff --git a/tests/fixed_buf.rs b/tests/fixed_buf.rs index a5442217..3e0562e2 100644 --- a/tests/fixed_buf.rs +++ b/tests/fixed_buf.rs @@ -42,8 +42,7 @@ fn fixed_buf_turnaround() { // for another instance. assert!(buffers.check_out(0).is_none()); - let (res, buf) = op.await; - let n = res.unwrap(); + let (n, buf) = op.await.unwrap(); assert_eq!(n, HELLO.len()); // The buffer is owned by `buf`, can't check it out @@ -81,12 +80,11 @@ fn unregister_invalidates_checked_out_buffers() { // The old buffer's index no longer matches the memory area of the // currently registered buffer, so the read operation using the old // buffer's memory should fail. - let (res, _) = file.read_fixed_at(fixed_buf, 0).await; + let res = file.read_fixed_at(fixed_buf, 0).await; assert_err!(res); let fixed_buf = buffers.check_out(0).unwrap(); - let (res, buf) = file.read_fixed_at(fixed_buf, 0).await; - let n = res.unwrap(); + let (n, buf) = file.read_fixed_at(fixed_buf, 0).await.unwrap(); assert_eq!(n, HELLO.len()); assert_eq!(&buf[..], HELLO); }); @@ -112,18 +110,17 @@ fn slicing() { let fixed_buf = buffers.check_out(0).unwrap(); // Read no more than 8 bytes into the fixed buffer. - let (res, slice) = file.read_fixed_at(fixed_buf.slice(..8), 3).await; - let n = res.unwrap(); + let (n, slice) = file.read_fixed_at(fixed_buf.slice(..8), 3).await.unwrap(); assert_eq!(n, 8); assert_eq!(slice[..], HELLO[3..11]); let fixed_buf = slice.into_inner(); // Write from the fixed buffer, starting at offset 1, // up to the end of the initialized bytes in the buffer. - let (res, slice) = file + let (n, slice) = file .write_fixed_at(fixed_buf.slice(1..), HELLO.len() as u64) - .await; - let n = res.unwrap(); + .await + .unwrap(); assert_eq!(n, 7); assert_eq!(slice[..], HELLO[4..11]); let fixed_buf = slice.into_inner(); @@ -131,8 +128,7 @@ fn slicing() { // Read into the fixed buffer, overwriting bytes starting from offset 3 // and then extending the initialized part with as many bytes as // the operation can read. - let (res, slice) = file.read_fixed_at(fixed_buf.slice(3..), 0).await; - let n = res.unwrap(); + let (n, slice) = file.read_fixed_at(fixed_buf.slice(3..), 0).await.unwrap(); assert_eq!(n, HELLO.len() + 7); assert_eq!(slice[..HELLO.len()], HELLO[..]); assert_eq!(slice[HELLO.len()..], HELLO[4..11]); @@ -167,8 +163,10 @@ fn pool_next_as_concurrency_limit() { let file = File::from_std(cloned_file); let data = [b'0' + i as u8; BUF_SIZE]; buf.put_slice(&data); - let (res, buf) = file.write_fixed_all_at(buf, BUF_SIZE as u64 * i).await; - res.unwrap(); + let (_, buf) = file + .write_fixed_all_at(buf, BUF_SIZE as u64 * i) + .await + .unwrap(); println!("[worker {}]: dropping buffer {}", i, buf.buf_index()); }); diff --git a/tests/fs_file.rs b/tests/fs_file.rs index 6ec14d43..45edf0c7 100644 --- a/tests/fs_file.rs +++ b/tests/fs_file.rs @@ -19,9 +19,7 @@ const HELLO: &[u8] = b"hello world..."; async fn read_hello(file: &File) { let buf = Vec::with_capacity(1024); - let (res, buf) = file.read_at(buf, 0).await; - let n = res.unwrap(); - + let (n, buf) = file.read_at(buf, 0).await.unwrap(); assert_eq!(n, HELLO.len()); assert_eq!(&buf[..n], HELLO); } @@ -47,8 +45,7 @@ fn basic_read_exact() { tempfile.write_all(&data).unwrap(); let file = File::open(tempfile.path()).await.unwrap(); - let (res, buf) = file.read_exact_at(buf, 0).await; - res.unwrap(); + let (_, buf) = file.read_exact_at(buf, 0).await.unwrap(); assert_eq!(buf, data); }); } @@ -60,7 +57,7 @@ fn basic_write() { let file = File::create(tempfile.path()).await.unwrap(); - file.write_at(HELLO, 0).submit().await.0.unwrap(); + file.write_at(HELLO, 0).submit().await.unwrap(); let file = std::fs::read(tempfile.path()).unwrap(); assert_eq!(file, HELLO); @@ -75,8 +72,7 @@ fn vectored_read() { let file = File::open(tempfile.path()).await.unwrap(); let bufs = vec![Vec::::with_capacity(5), Vec::::with_capacity(9)]; - let (res, bufs) = file.readv_at(bufs, 0).await; - let n = res.unwrap(); + let (n, bufs) = file.readv_at(bufs, 0).await.unwrap(); assert_eq!(n, HELLO.len()); assert_eq!(bufs[1][0], b' '); @@ -93,7 +89,7 @@ fn vectored_write() { let buf2 = " world...".to_owned().into_bytes(); let bufs = vec![buf1, buf2]; - file.writev_at(bufs, 0).await.0.unwrap(); + file.writev_at(bufs, 0).await.unwrap(); let file = std::fs::read(tempfile.path()).unwrap(); assert_eq!(file, HELLO); @@ -108,8 +104,7 @@ fn basic_write_all() { let tempfile = tempfile(); let file = File::create(tempfile.path()).await.unwrap(); - let (ret, data) = file.write_all_at(data, 0).await; - ret.unwrap(); + let (_, data) = file.write_all_at(data, 0).await.unwrap(); let file = std::fs::read(tempfile.path()).unwrap(); assert_eq!(file, data); @@ -155,7 +150,7 @@ fn drop_open() { // Do something else let file = File::create(tempfile.path()).await.unwrap(); - file.write_at(HELLO, 0).submit().await.0.unwrap(); + file.write_at(HELLO, 0).submit().await.unwrap(); let file = std::fs::read(tempfile.path()).unwrap(); assert_eq!(file, HELLO); @@ -183,7 +178,7 @@ fn sync_doesnt_kill_anything() { let file = File::create(tempfile.path()).await.unwrap(); file.sync_all().await.unwrap(); file.sync_data().await.unwrap(); - file.write_at(&b"foo"[..], 0).submit().await.0.unwrap(); + file.write_at(&b"foo"[..], 0).submit().await.unwrap(); file.sync_all().await.unwrap(); file.sync_data().await.unwrap(); }); @@ -236,16 +231,14 @@ fn read_fixed() { let fixed_buf = buffers.check_out(0).unwrap(); assert_eq!(fixed_buf.bytes_total(), 6); - let (res, buf) = file.read_fixed_at(fixed_buf.slice(..), 0).await; - let n = res.unwrap(); + let (n, buf) = file.read_fixed_at(fixed_buf.slice(..), 0).await.unwrap(); assert_eq!(n, 6); assert_eq!(&buf[..], &HELLO[..6]); let fixed_buf = buffers.check_out(1).unwrap(); assert_eq!(fixed_buf.bytes_total(), 1024); - let (res, buf) = file.read_fixed_at(fixed_buf.slice(..), 6).await; - let n = res.unwrap(); + let (n, buf) = file.read_fixed_at(fixed_buf.slice(..), 6).await.unwrap(); assert_eq!(n, HELLO.len() - 6); assert_eq!(&buf[..], &HELLO[6..]); @@ -266,16 +259,14 @@ fn write_fixed() { let mut buf = fixed_buf; buf.put_slice(&HELLO[..6]); - let (res, _) = file.write_fixed_at(buf, 0).await; - let n = res.unwrap(); + let (n, _) = file.write_fixed_at(buf, 0).await.unwrap(); assert_eq!(n, 6); let fixed_buf = buffers.check_out(1).unwrap(); let mut buf = fixed_buf; buf.put_slice(&HELLO[6..]); - let (res, _) = file.write_fixed_at(buf, 6).await; - let n = res.unwrap(); + let (n, _) = file.write_fixed_at(buf, 6).await.unwrap(); assert_eq!(n, HELLO.len() - 6); let file = std::fs::read(tempfile.path()).unwrap();