-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathREADME.Rmd
290 lines (250 loc) · 11.1 KB
/
README.Rmd
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
---
output:
md_document:
variant: gfm
---
```{r, include=FALSE}
knitr::opts_chunk$set(cache = TRUE)
# knitr hook function to allow an output.lines option
# e.g.,
# output.lines=12 prints lines 1:12 ...
# output.lines=1:12 does the same
# output.lines=3:15 prints lines ... 3:15 ...
# output.lines=-(1:8) removes lines 1:8 and prints ... 9:n ...
# No allowance for anything but a consecutive range of lines
#
# adopted from https://stackoverflow.com/a/23205752
create_output_hook <- function(type) {
hook_output <- knitr::knit_hooks$get(type)
function(x, options) {
lines <- options$output.lines
if (is.null(lines)) {
return(hook_output(x, options)) # pass to default hook
}
x <- unlist(strsplit(x, "\n"))
more <- "..."
if (length(lines) == 1) { # first n lines
if (length(x) > lines) {
# truncate the output, but add ...
x <- c(head(x, lines), more)
}
} else {
x <- c(if (abs(lines[1]) > 1) more else NULL,
x[lines],
if (length(x) > lines[abs(length(lines))]) more else NULL
)
}
# paste these lines together
x <- paste(c(x, ""), collapse = "\n")
hook_output(x, options)
}
}
knitr::knit_hooks$set(output = create_output_hook("output"))
knitr::knit_hooks$set(error = create_output_hook("error"))
knitr::knit_hooks$set(warning = create_output_hook("warning"))
knitr::knit_hooks$set(message = create_output_hook("message"))
```
# bettermc
[![CRAN version](https://www.r-pkg.org/badges/version/bettermc)](https://cran.r-project.org/package=bettermc)
[![CRAN status](https://cranchecks.info/badges/flavor/release/bettermc)](https://cran.r-project.org/web/checks/check_results_bettermc.html)
[![downloads](https://cranlogs.r-pkg.org/badges/grand-total/bettermc)](https://cran.r-project.org/package=bettermc)
[![R build status](https://github.com/gfkse/bettermc/workflows/R-CMD-check/badge.svg)](https://github.com/gfkse/bettermc/actions?workflow=R-CMD-check)
[![codecov](https://codecov.io/gh/gfkse/bettermc/branch/master/graph/badge.svg?token=FYYM156COF)](https://codecov.io/gh/gfkse/bettermc)
[![rchk](https://github.com/gfkse/bettermc/workflows/rchk/badge.svg)](https://github.com/gfkse/bettermc/actions?workflow=rchk)
The `bettermc` package provides a wrapper around the `parallel::mclapply` function for better performance, error handling, seeding and UX.
## Installation of the Development Version
```{r, eval = FALSE}
# install.packages("devtools")
devtools::install_github("gfkse/bettermc")
```
## Supported Platforms
`bettermc` was originally developed for 64-bit Linux.
By now it should also compile and run on 32-bit systems, and on macOS and Solaris.
However, as stated in the respective help pages, not all features are supported on macOS.
Porting to other POSIX-compliant Unix flavors should be fairly straightforward.
The Windows support is very limited and mainly provided for compatibility reasons only, i.e. to allow the *serial* execution of code using `bettermc::mclapply`, which was originally developed for Linux or macOS.
## Features
Here is a short overview on its main features ...
### Progress Bar
![progress bar](progress.png)
### Error Handling, Tracebacks and Crashdumps
By default, crashdumps and full tracebacks are generated on errors in child processes:
```{r traceback, error=TRUE, output.lines=10}
g <- function(x) x + 1
f <- function(x) g(as.character(x))
bettermc::mclapply(1:2, f)
```
```{r crashdump}
# in a non-interactive session a file "last.dump.rds" is created
last.dump <- readRDS("last.dump.rds")
# in an interactive session use debugger() instead of print() for actual debugging
print(attr(last.dump[[1L]], "dump.frames"))
```
As shown in the example above, `bettermc` by default fails if there are errors in child processes.
This behavior can be changed to merely warn about both fatal and non-fatal error:
```{r allow_errors, output.lines=10}
ret <- bettermc::mclapply(1:4, function(i) {
if (i == 1L)
stop(i)
else if (i == 4L)
system(paste0("kill ", Sys.getpid()))
NULL
}, mc.allow.fatal = TRUE, mc.allow.error = TRUE, mc.preschedule = FALSE)
```
Also in this case, full tracebacks and crash dumps are available:
```{r crashdump_allow_errors}
stopifnot(inherits(ret[[1]], "try-error"))
names(attributes(ret[[1L]]))
```
Additionally, results affected by fatal errors are clearly indicated and can be differentiated from legitimate `NULL` values:
```{r fatal-error}
lapply(ret, class)
```
You can use `mc.allow.fatal = NULL` to instead return `NULL` on fatal errors as does `parallel::mclapply`.
### Output, Messages and Warnings
In contrast to `parallel::mclapply`, neither output nor messages nor warnings from the child processes are lost.
All of these can be forwarded to the parent process and are prefixed with the index of the function invocation from which they originate:
```{r output}
f <- function(i) {
if (i == 1) message(letters[i])
else if (i == 2) warning(letters[i])
else print(letters[i])
i
}
ret <- bettermc::mclapply(1:4, f)
```
Similarly, other conditions can also be re-signaled in the parent process.
### Reproducible Seeding
By default, `bettermc` reproducibly seeds all function calls:
```{r seeding}
set.seed(538)
a <- bettermc::mclapply(1:4, function(i) runif(1), mc.cores = 3)
set.seed(538)
b <- bettermc::mclapply(1:4, function(i) runif(1), mc.cores = 1)
a
stopifnot(identical(a, b))
```
Compare with `parallel`:
```{r seeding_parallel, error=TRUE}
set.seed(594)
a <- parallel::mclapply(1:4, function(i) runif(1), mc.cores = 3)
set.seed(594)
b <- parallel::mclapply(1:4, function(i) runif(1), mc.cores = 3)
stopifnot(identical(a, b))
```
### POSIX Shared Memory
Many types of objects can be returned from the child processes using POSIX shared memory.
This includes e.g. logical, numeric, complex and raw vectors and arrays as well as factors.
In doing so, the overhead of getting larger results back into the parent R process is reduced:
```{r shm_performance}
X <- data.frame(
x = runif(3e7),
y = sample(c(TRUE, FALSE), 3e7, TRUE),
z = 1:3e7
)
f <- function(i) X
microbenchmark::microbenchmark(
bettermc1 = bettermc::mclapply(1:2, f, mc.share.copy = FALSE),
bettermc2 = bettermc::mclapply(1:2, f),
bettermc3 = bettermc::mclapply(1:2, f, mc.share.vectors = FALSE),
bettermc4 = bettermc::mclapply(1:2, f, mc.share.vectors = FALSE, mc.shm.ipc = FALSE),
parallel = parallel::mclapply(1:2, f),
times = 10, setup = gc()
)
```
In examples `bettermc1` and `bettermc2`, the child processes place the columns of the return value `X` in shared memory.
The object which needs to be serialized for transfer from child to parent processes hence becomes:
```{r vectors2shm}
X_shm <- bettermc:::vectors2shm(X, name_prefix = "/bettermc_README_")
str(X_shm)
```
Column `z` is an `ALTREP` and, because it can be serialized efficiently, is left alone by default.
The parent process can recover the original object from `X_shm`:
```{r shm2vectors}
Y <- bettermc:::shm2vectors(X_shm)
stopifnot(identical(X, Y))
```
The internal functions `vectors2shm()` and `shm2vectors()` recursively walk the return value and apply the exported functions `copy2shm()` and `allocate_from_shm()`, respectively.
In `bettermc1`, the shared memory objects are used directly by the parent process.
In `bettermc2`, which is the default, new vectors are allocated in the parent process and the data is merely copied from the shared memory objects, which are freed afterwards. See `?copy2shm` for more details on this topic and why the slower `mc.share.copy = TRUE` might be a sensible default.
In `bettermc3`, the original `X` is serialized and the resulting raw vector is placed in shared memory, from where it is deserialized in the parent process.
`bettermc4` does not involve any POSIX shared memory and hence is equivalent to `parallel`, i.e. the original `X` is serialized and transferred to the parent process using pipes.
### Character Compression
In practice, character vectors often contain a substantial amount of duplicated values.
This is exploited by `bettermc` to speed up the returning of larger character vectors from child processes:
```{r char_compression}
X <- rep(as.character(runif(1e6)), 30)
f <- function(i) X
microbenchmark::microbenchmark(
bettermc1 = bettermc::mclapply(1:2, f),
parallel = parallel::mclapply(1:2, f),
times = 1, setup = gc()
)
```
By default, `bettermc` replaces character vectors with objects of type `char_map` before returning them to the parent process:
```{r compress_chars}
X_comp <- bettermc::compress_chars(X)
str(X_comp)
```
The important detail here is the length of the `chars` vector, which just contains the unique elements of `X` and hence is significantly faster to (de)serialize than the original vector. The parent process can recover the original character vectors:
```{r uncompress_chars}
Y <- bettermc::uncompress_chars(X_comp)
stopifnot(identical(X, Y))
```
The functions `compress_chars()` and `uncompress_chars()` recursively walk the return value and apply the functions `char_map()` and `map2char()`, respectively.
`char_map()` is implemented using a radix sort, which makes it very efficient:
```{r char_map}
microbenchmark::microbenchmark(
char_map = bettermc::char_map(X),
match = {chars <- unique(X); idx <- match(X, chars)},
times = 3, setup = gc()
)
```
### Retries
`bettermc` supports automatic retries on both fatal and non-fatal errors.
`mc.force.fork` ensures that `FUN` is called in a child process, even if `X` is of length 1.
This is useful if `FUN` might encounter a fatal error and we want to protect the parent process against it.
With retires, `length(X)` might drop to 1 if all other values could already be processed.
This is also why we need `mc.force.fork` in the following example:
```{r retry, output.lines=10}
set.seed(456)
res <-
bettermc::mclapply(1:20, function(i) {
r <- runif(1)
if (r < 0.25)
system(paste0("kill ", Sys.getpid()))
else if (r < 0.5)
stop(i)
else
i
}, mc.retry = 50, mc.cores = 10, mc.force.fork = TRUE)
stopifnot(identical(res, as.list(1:20)))
```
Additionally, it is possible to automatically decrease the number of cores with every retry by specifying a negative value for `mc.retry`.
This is useful if we expect failures to be caused simply by too many concurrent processes, e.g. if system load or the size of input data is unpredictable and might lead to the Linux Out Of Memory Killer stepping in.
In such a case it makes sense to retry using fewer concurrent processes:
```{r neg_retry}
ppid <- Sys.getpid()
res <-
bettermc::mclapply(1:20, function(i) {
Sys.sleep(0.25) # wait for the other child processes
number_of_child_processes <- length(system(paste0("pgrep -P ", ppid), intern = TRUE))
if (number_of_child_processes >= 5) system(paste0("kill ", Sys.getpid()))
i
}, mc.retry = -3, mc.cores = 10, mc.force.fork = TRUE)
stopifnot(identical(res, as.list(1:20)))
```
If there are still errors after the retries, we regularly fail:
```{r retry_failing, error=TRUE, output.lines=10}
set.seed(123)
res <-
bettermc::mclapply(1:20, function(i) {
r <- runif(1)
if (r < 0.25)
system(paste0("kill ", Sys.getpid()))
else if (r < 0.5)
stop(i)
else
i
}, mc.retry = 1, mc.cores = 10, mc.force.fork = TRUE)
```