Skip to content

Commit

Permalink
Use poll() instead of select()
Browse files Browse the repository at this point in the history
* on *nix systems use poll() instead of select(); select has limitation on FD value to 1024
* test-utf8 is skipped if R does not support mbcs
  • Loading branch information
lbartnik authored Aug 6, 2018
1 parent 9ab368a commit 2b5d32a
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 102 deletions.
2 changes: 2 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

* fixes building under Solaris

* replace `select()` with `poll()`

# subprocess 0.8.2

* fixes in test cases for `testthat` 2.0
Expand Down
152 changes: 61 additions & 91 deletions src/sub-linux.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <sys/time.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <sys/poll.h>
#include <dlfcn.h>

#include <fcntl.h> /* Obtain O_* constant definitions */
Expand All @@ -31,7 +32,6 @@
* this is redundant in non-Solaris builds but fixes Solaris */
extern char ** environ;


#ifdef TRUE
#undef TRUE
#endif
Expand All @@ -56,7 +56,7 @@ string strerror (int _code, const string & _message)
message << _message << ": " << buffer.data();
}
else {
message << _message << ": system error message could not be fetched";
message << _message << ": system error message could not be fetched, errno = " << _code;
}

return message.str();
Expand Down Expand Up @@ -96,7 +96,7 @@ static void exit_on_failure ()
{
void * process_handle = dlopen(NULL, RTLD_NOW);
void * exit_handle = dlsym(process_handle, "exit");

// it's hard to imagine a situation where this symbol would not be
// present; regardless, we cause a SEGMENTATION error because the
// child needs to die;
Expand All @@ -107,7 +107,7 @@ static void exit_on_failure ()
*(int*)exit_handle = 0;
++ret; // hide compiler warning
}

typedef void (* exit_t)(int);
exit_t exit_fun = (exit_t)exit_handle;
exit_fun(EXIT_FAILURE);
Expand All @@ -118,7 +118,7 @@ static void exit_on_failure ()


/*
* Duplicate handle and zero the original
* Duplicate handle and zero the original
*/
inline void dup2 (int _from, int _to) {
if (::dup2(_from, _to) < 0) {
Expand Down Expand Up @@ -215,6 +215,7 @@ void process_handle_t::spawn (const char * _command, char *const _arguments[],
throw subprocess_exception(EALREADY, "process already started");
}

int rc = 0;
// can be addressed with PIPE_STDIN, PIPE_STDOUT, PIPE_STDERR
pipe_holder pipes[3];

Expand Down Expand Up @@ -247,7 +248,7 @@ void process_handle_t::spawn (const char * _command, char *const _arguments[],
if (_termination_mode == TERMINATION_GROUP) {
setsid();
}

/* if environment is empty, use parent's environment */
if (!_environment) {
_environment = environ;
Expand Down Expand Up @@ -284,6 +285,9 @@ void process_handle_t::spawn (const char * _command, char *const _arguments[],
pipes[PIPE_STDIN][pipe_holder::WRITE] = HANDLE_CLOSED;
pipes[PIPE_STDOUT][pipe_holder::READ] = HANDLE_CLOSED;
pipes[PIPE_STDERR][pipe_holder::READ] = HANDLE_CLOSED;

// update process state
wait(TIMEOUT_IMMEDIATE);
}


Expand Down Expand Up @@ -346,75 +350,57 @@ struct enable_block_mode {
enable_block_mode (int _fd) : fd (_fd) {
set_block(fd);
}

~enable_block_mode () {
set_non_block(fd);
}
};


struct select_reader {

fd_set set;
int max_fd;

select_reader () : max_fd(0) { }

void put_fd (int _fd) {
FD_SET(_fd, &set);
max_fd = std::max(max_fd, _fd);
}

ssize_t timed_read (process_handle_t & _handle, pipe_type _pipe, int _timeout)
{
// this should never be called with "infinite" timeout
if (_timeout < 0)
return -1;

FD_ZERO(&set);
if (_pipe & PIPE_STDOUT) {
put_fd(_handle.pipe_stdout);
_handle.stdout_.clear();
}
if (_pipe & PIPE_STDERR) {
put_fd(_handle.pipe_stderr);
_handle.stderr_.clear();
}

struct timeval timeout;
int start = clock_millisec(), timediff;
ssize_t rc;

do {
timediff = _timeout - (clock_millisec() - start);

// use max so that _timeout can be TIMEOUT_IMMEDIATE and yet
// select can be tried at least once
timeout.tv_sec = std::max(0, timediff/1000);
timeout.tv_usec = std::max(0, (timediff % 1000) * 1000);

rc = select(max_fd + 1, &set, NULL, NULL, &timeout);
if (rc == -1 && errno != EINTR && errno != EAGAIN)
return -1;

} while(rc == 0 && timediff > 0);

// nothing to read; if errno == EINTR try reading one last time
if (rc == 0 || errno == EAGAIN)
return 0;

if (FD_ISSET(_handle.pipe_stdout, &set)) {
rc = std::min(rc, (ssize_t)_handle.stdout_.read(_handle.pipe_stdout, mbcslocale));
}
if (FD_ISSET(_handle.pipe_stderr, &set)) {
rc = std::min(rc, (ssize_t)_handle.stderr_.read(_handle.pipe_stderr, mbcslocale));
ssize_t timed_read (process_handle_t & _handle, pipe_type _pipe, int _timeout)
{
struct pollfd fds[2] { { .fd = -1 }, { .fd = -1 } };

if (_pipe & PIPE_STDOUT) {
fds[0].fd = _handle.pipe_stdout;
fds[0].events = POLLIN;
_handle.stdout_.clear();
}

if (_pipe & PIPE_STDERR) {
fds[1].fd = _handle.pipe_stderr;
fds[1].events = POLLIN;
_handle.stderr_.clear();
}

time_t start = clock_millisec(), timediff = _timeout;
ssize_t rc;

do {
rc = poll(fds, 2, timediff);
timediff = _timeout - (clock_millisec() - start);

// interrupted or kernel failed to allocate internal resources
if (rc < 0 && (errno == EINTR || errno == EAGAIN)) {
rc = 0;
}

return rc;
} while (rc == 0 && timediff > 0);

// nothing to read
if (rc == 0) {
return 0;
}
};

// TODO if an error occurs in the first read() it will be lost
if (fds[0].fd != -1 && fds[0].revents == POLLIN) {
rc = std::min(rc, (ssize_t)_handle.stdout_.read(_handle.pipe_stdout, mbcslocale));
}
if (fds[1].fd != -1 && fds[1].revents == POLLIN) {
rc = std::min(rc, (ssize_t)_handle.stderr_.read(_handle.pipe_stderr, mbcslocale));
}

return rc;
}


size_t process_handle_t::read (pipe_type _pipe, int _timeout)
Expand All @@ -423,25 +409,7 @@ size_t process_handle_t::read (pipe_type _pipe, int _timeout)
throw subprocess_exception(ECHILD, "child does not exist");
}

ssize_t rc;
select_reader reader;

// infinite timeout
if (_timeout == TIMEOUT_INFINITE) {
enable_block_mode blocker_out(pipe_stdout);
enable_block_mode blocker_err(pipe_stderr);
rc = reader.timed_read(*this, _pipe, 1000);
}
// finite or no timeout
else {
rc = reader.timed_read(*this, _pipe, _timeout);

if (rc < 0 && errno == EAGAIN) {
/* stdin pipe is opened with O_NONBLOCK, so this means "would block" */
errno = 0;
return 0;
}
}
ssize_t rc = timed_read(*this, _pipe, _timeout);

if (rc < 0) {
throw subprocess_exception(errno, "could not read from child process");
Expand Down Expand Up @@ -476,16 +444,17 @@ void process_handle_t::wait (int _timeout)
return;
}

/* to wait or not to wait? */
/* to wait or not to wait? */
int options = 0;
if (_timeout >= 0)
if (_timeout >= 0) {
options = WNOHANG;

}

/* make the actual system call */
int start = clock_millisec(), rc;
do {
rc = waitpid(child_id, &return_code, options);

// there's been an error (<0)
if (rc < 0) {
throw subprocess_exception(errno, "waitpid() failed");
Expand All @@ -495,7 +464,9 @@ void process_handle_t::wait (int _timeout)
} while (rc == 0 && _timeout > 0);

// the child is still running
if (rc == 0) return;
if (rc == 0) {
return;
}

// the child has exited or has been terminated
if (WIFEXITED(return_code)) {
Expand All @@ -520,7 +491,6 @@ void process_handle_t::send_signal(int _signal)
if (!child_id) {
throw subprocess_exception(ECHILD, "child does not exist");
}

int rc = ::kill(child_id, _signal);

if (rc < 0) {
Expand Down Expand Up @@ -580,7 +550,7 @@ int main (int argc, char ** argv)
handle.spawn("/bin/bash", args, env, NULL, process_handle_t::TERMINATION_GROUP);

process_write(&handle, "echo A\n", 7);

/* read is non-blocking so the child needs time to produce output */
sleep(1);
process_read(handle, PIPE_STDOUT, TIMEOUT_INFINITE);
Expand Down
3 changes: 3 additions & 0 deletions subprocess.Rproj
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ Encoding: UTF-8
RnwWeave: knitr
LaTeX: pdfLaTeX

AutoAppendNewline: Yes
StripTrailingWhitespace: Yes

BuildType: Package
PackageUseDevtools: Yes
PackageInstallArgs: --no-multiarch --with-keep.source --debug
Expand Down
6 changes: 5 additions & 1 deletion tests/testthat/helper-processes.R
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ is_mac <- function ()
identical(tolower(Sys.info()[["sysname"]]), 'darwin')
}

is_solaris <- function()
{
identical(tolower(Sys.info()[["sysname"]]), 'sunos')
}

# --- R child ----------------------------------------------------------

Expand Down Expand Up @@ -37,7 +41,7 @@ process_exists <- function (handle)
return(length(grep(pid, output, fixed = TRUE)) > 0)
}
else {
flag <- ifelse(is_mac(), "-p", "--pid")
flag <- ifelse(is_mac() || is_solaris(), "-p", "--pid")
rc <- system2("ps", c(flag, pid), stdout = NULL, stderr = NULL)
return(rc == 0)
}
Expand Down
16 changes: 9 additions & 7 deletions tests/testthat/test-signals.R
Original file line number Diff line number Diff line change
@@ -1,25 +1,26 @@
context("signals")


test_that("sending signal in Linux", {
skip_if_not(is_linux())
test_that("sending signal in Linux/MacOS/Solaris", {
skip_if_not(is_linux() || is_mac() || is_solaris())

script_path <- file.path(getwd(), 'signal-trap.sh')
expect_true(file.exists(script_path))

bash_path <- "/bin/bash"
expect_true(file.exists(bash_path))

handle <- spawn_process(bash_path, c("-e", script_path))
expect_true(process_exists(handle))

# excluded signals kill or stop the child
for (signal in setdiff(signals, c(1, 9, 17, 19))) {
# exclude signals to kill or stop the child
skip <- c(SIGHUP, SIGKILL, SIGCHLD, SIGSTOP, if (is_solaris()) SIGQUIT)

for (signal in setdiff(signals, skip)) {
process_send_signal(handle, signal)
output <- process_read(handle, PIPE_STDOUT, TIMEOUT_INFINITE)

i <- which(signals == signal)
expect_equal(output, names(signals)[[i]])
expect_equal(output, names(signals)[[i]], info = names(signals)[[i]])
}
})

Expand Down Expand Up @@ -53,3 +54,4 @@ test_that("sending signal in Windows", {
expect_equal(process_wait(handle, TIMEOUT_INFINITE), 1)
expect_false(process_exists(handle))
})

4 changes: 2 additions & 2 deletions tests/testthat/test-termination.R
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ test_that("child process is terminated in Windows", {
#
# This test will, however, fail in plain R if termination_mode is
# set to "child_only".
test_that("child process is terminated in Linux", {
skip_if_not(is_linux())
test_that("child process is terminated in Linux/MacOS/SunOS", {
skip_if_not(is_linux() || is_mac() || is_solaris())

# the parent shell script will start "sleep" and print its PID
shell <- Sys.getenv("SHELL", '/bin/sh')
Expand Down
3 changes: 2 additions & 1 deletion tests/testthat/test-utf8.R
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ test_that("C tests pass", {


test_that("multi-byte can come in parts", {
skip_if_not(is_linux() || is_mac())
skip_if_not(is_linux() || is_mac() || is_solaris())
skip_if_not(l10n_info()$MBCS)

print_in_R <- function (handle, text) {
process_write(handle, paste0("cat('", text, "')\n"))
Expand Down

0 comments on commit 2b5d32a

Please sign in to comment.