Skip to content

Commit

Permalink
Support writing to stdout
Browse files Browse the repository at this point in the history
  • Loading branch information
gaborcsardi committed Oct 2, 2024
1 parent 495c0c1 commit e4e3ace
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 4 deletions.
5 changes: 4 additions & 1 deletion R/write-parquet.R
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@
#' @param x Data frame to write.
#' @param file Path to the output file. If this is the string `":raw:"`,
#' then the data frame is written to a memory buffer, and the memory
#' buffer is returned as a raw vector.
#' buffer is returned as a raw vector. If `":stdout:"`, then it is
#' written to the standard output. (When writing to the standard output,
#' special care is needed to make sure no regular R output gets mixed
#' up with the Parquet bytes!)
#' @param schema Parquet schema. Specify a schema to tweak the default
#' nanoparquet R -> Parquet type mappings. Use [parquet_schema()] to
#' create a schema that you can use here, or [read_parquet_schema()] to
Expand Down
5 changes: 4 additions & 1 deletion man/write_parquet.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/lib/memstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <memory>
#include <cstring>
#include <vector>
#include <unistd.h>

// A growing buffer that can be used as an output stream.
// It stores that data in an array of buffers, each bigger than
Expand Down
10 changes: 8 additions & 2 deletions src/write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2926,7 +2926,7 @@ SEXP nanoparquet_write_(SEXP dfsxp, SEXP filesxp, SEXP dim, SEXP compression,
}

std::string fname = (char *)CHAR(STRING_ELT(filesxp, 0));
if (fname == ":raw:") {
if (fname == ":raw:" || fname == ":stdout:") {
MemStream ms;
std::ostream &os = ms.stream();
RParquetOutFile of(os, codec, comp_level, row_groups);
Expand All @@ -2935,7 +2935,13 @@ SEXP nanoparquet_write_(SEXP dfsxp, SEXP filesxp, SEXP dim, SEXP compression,
R_xlen_t bufsize = ms.size();
SEXP res = Rf_allocVector(RAWSXP, bufsize);
ms.copy(RAW(res), bufsize);
return res;
if (fname == ":raw:") {
return res;
} else {
std::cout.write((const char*) RAW(res), bufsize);
std::cout << std::flush;
return R_NilValue;
}
} else {
RParquetOutFile of(fname, codec, comp_level, row_groups);
of.data_page_version = dp_ver;
Expand Down
8 changes: 8 additions & 0 deletions tests/testthat/helper.R
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,11 @@ test_write <- function(d, schema = NULL, encoding = NULL) {
as.data.frame(read_parquet(tmp))
})
}

rscript <- function() {
if (.Platform$OS.type == "windows") {
file.path(R.home("bin"), "Rscript.exe")
} else {
file.path(R.home("bin"), "Rscript")
}
}
20 changes: 20 additions & 0 deletions tests/testthat/test-write-parquet-2.R
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,26 @@ test_that("write_parquet() to memory 2", {
expect_equal(r1, r2)
})

test_that("write_parquet() to stdout", {
skip_on_cran()
tmp1 <- tempfile(fileext = ".parquet")
tmp2 <- tempfile(fileext = ".parquet")
script <- tempfile(fileext = ".R")
on.exit(unlink(c(tmp1, tmp2, script)), add = TRUE)

txt <- c(
if (!is_rcmd_check()) "pkgload::load_all()",
"nanoparquet::write_parquet(mtcars, \":stdout:\")"
)
writeLines(txt, script)
processx::run(rscript(), script, stdout = tmp1, stderr = NULL)
r1 <- readBin(tmp1, "raw", file.size(tmp1))
write_parquet(mtcars, tmp2)
r2 <- readBin(tmp2, "raw", file.size(tmp2))
expect_equal(file.size(tmp1), file.size(tmp2))
expect_equal(r1, r2)
})

test_that("gzip compression", {
d <- test_df(missing = TRUE)
tmp <- tempfile(fileext = ".parquet")
Expand Down

0 comments on commit e4e3ace

Please sign in to comment.