Skip to content

Commit

Permalink
wrote some tests for reading from stdin
Browse files Browse the repository at this point in the history
  • Loading branch information
ashfordneil committed Nov 15, 2019
1 parent f54d170 commit e1dbe92
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 11 deletions.
19 changes: 13 additions & 6 deletions src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ impl Executor {
// futures to remove
let mut completed = Vec::<usize>::new();

log::trace!("Initial poll to get everything warmed up");

// start by polling every future we have, just in case.
for (id, future) in self.tasks.iter_mut() {
if id == self.separate_task {
Expand Down Expand Up @@ -112,23 +114,24 @@ impl Executor {
});

let output = loop {
log::trace!("Looking in backlog for futures");
let future_to_poll = match self.to_do.try_recv() {
Ok(id) => id,
Err(TryRecvError::Disconnected) => {
unreachable!()
}
Err(TryRecvError::Disconnected) => unreachable!(),
Err(TryRecvError::Empty) => {
Reactor::spin()?;
continue;
}
};

if future_to_poll == self.separate_task {
log::trace!("Polling the main future");
let mut ctx = Context::from_waker(&waker);
if let Poll::Ready(result) = main_future.as_mut().poll(&mut ctx) {
break result;
}
} else {
log::trace!("Polling future {}", future_to_poll);
// we know that future is a valid value as long as the ID isn't
// self.separate_task, as that is the only ID in the slab associated
// with an uninitialised value.
Expand All @@ -146,7 +149,11 @@ impl Executor {

#[cfg(test)]
mod test {
use std::{future::Future, pin::Pin, task::{Context, Poll}};
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};

use crate::executor::Executor;

Expand All @@ -169,8 +176,8 @@ mod test {
ctx.waker().wake_by_ref();
self.0 = true;
Poll::Pending
},
true => Poll::Ready(())
}
true => Poll::Ready(()),
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/executor/waker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ impl Waker {
/// Drop the waker.
unsafe fn drop(raw: *const ()) {
let waker = Box::from_raw(raw as *mut Waker);
drop(waker);
}

/// The v table necessary for dynamic waker dispatch.
Expand All @@ -56,6 +57,7 @@ impl Waker {

/// Actually wake the waker.
fn do_wake(&self) {
log::trace!("Waking task {}", self.id);
self.sender.send(self.id).unwrap();
}
}
78 changes: 73 additions & 5 deletions src/reactor/stdin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ mod lock {
/// An asynchronous wrapper around stdin.
pub struct Stdin {
// only one handle can exist at a time
lock: lock::Guard,
_lock: lock::Guard,
// to reset stdin using fcntl when we are done
old_state: c_int,
// the stdin object itself for reading from
Expand All @@ -57,7 +57,7 @@ pub struct Stdin {
impl Stdin {
/// Create a new wrapper around stdin.
pub fn new() -> Result<Self, Error> {
let lock = lock::Guard::take()?;
let _lock = lock::Guard::take()?;

let old_state = unsafe {
let old_state = match libc::fcntl(libc::STDIN_FILENO, libc::F_GETFD) {
Expand Down Expand Up @@ -88,14 +88,22 @@ impl Stdin {
)?;

Ok(Stdin {
lock,
_lock,
old_state,
inner,
handle,
})
}
}

impl Drop for Stdin {
fn drop(&mut self) {
unsafe {
libc::fcntl(libc::STDIN_FILENO, libc::F_SETFD, self.old_state);
}
}
}

impl AsyncRead for Stdin {
fn poll_read(
mut self: Pin<&mut Self>,
Expand All @@ -118,12 +126,17 @@ mod test {
fs::File,
io::Write,
process::{Command, Stdio},
thread,
time::{Duration, Instant},
};

use futures::{executor, io::AsyncReadExt};
use futures::{
executor,
io::{AsyncBufReadExt, AsyncReadExt, BufReader},
};
use rusty_fork::{fork, rusty_fork_id, ChildWrapper};

use crate::reactor::{Reactor, Stdin};
use crate::{executor::Executor, reactor::Stdin};

fn pipe_stdin(cmd: &mut Command) {
cmd.stdin(Stdio::piped());
Expand Down Expand Up @@ -158,4 +171,59 @@ mod test {
)
.unwrap();
}

#[test]
fn run_in_executor() {
fn parent(child: &mut ChildWrapper, _: &mut File) {
let mut pipe = child.inner_mut().stdin.take().unwrap();

write!(pipe, "Hello, world\n").unwrap();

thread::sleep(Duration::from_secs(5));

write!(pipe, "Goodbye for now\n").unwrap();

drop(pipe);

let status = child.wait().unwrap();
assert!(status.success());
}

fn child() {
let future = async {
let input = Stdin::new().unwrap();
let mut input = BufReader::new(input);
let mut buffer = String::new();

let start = Instant::now();
input.read_line(&mut buffer).await.unwrap();
let time = start.elapsed();

// if it took more than 5 seconds to read, that means that we weren't able to
// separate the two read calls from eachother for some reason
assert!(time < Duration::from_secs(5));

assert_eq!("Hello, world\n", buffer);
buffer.clear();

input.read_line(&mut buffer).await.unwrap();
assert_eq!("Goodbye for now\n", buffer);
buffer.clear();

assert_eq!(0, input.read_to_string(&mut buffer).await.unwrap());
};

let mut executor = Executor::new();
executor.complete(future).unwrap();
}

fork(
"reactor::stdin::test::run_in_executor",
rusty_fork_id!(),
pipe_stdin,
parent,
child,
)
.unwrap();
}
}

0 comments on commit e1dbe92

Please sign in to comment.