Skip to content

Commit

Permalink
Merge pull request #91 from r-lib/feature/compression-level
Browse files Browse the repository at this point in the history
Support compression levels in `write_parquet()`
  • Loading branch information
gaborcsardi authored Sep 14, 2024
2 parents 94d536f + e312d98 commit d6f9984
Show file tree
Hide file tree
Showing 10 changed files with 126 additions and 15 deletions.
3 changes: 3 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@
* `write_parquet(file = ":raw:")` now works correctly for larger data
frames (#77).

* New `compression_level` option to select the compression level
manually. See `?parquet_options` for details. (#91).

* `read_parquet()` can now read from an R connection (#71).

* `read_parquet()` now reads `DECIMAL` values correctly from `INT32`
Expand Down
22 changes: 22 additions & 0 deletions R/options.R
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,17 @@
#' in [read_parquet()]. By default nanoparquet adds the `"tbl"` class,
#' so data frames are printed differently if the pillar package is
#' loaded.
#' @param compression_level The compression level in [write_parquet()].
#' `NA` is the default, and it specifies the default
#' compression level of each method. `Inf` always selects the highest
#' possible compression level. More details:
#' * Snappy does not support compression levels currently.
#' * GZIP supports levels from 0 (uncompressed), 1 (fastest), to 9 (best).
#' The default is 6.
#' * ZSTD allows positive levels up to 22 currently. 20 and above require
#' more memory. Negative levels are also allowed, the lower the level,
#' the faster the speed, at the cost of compression. Currently the
#' smallest level is -131072. The default level is 3.
#' @param num_rows_per_row_group The number of rows to put into a row
#' group, if row groups are not specified explicitly. It should be
#' an integer scalar. Defaults to 10 million.
Expand Down Expand Up @@ -40,6 +51,7 @@

parquet_options <- function(
class = getOption("nanoparquet.class", "tbl"),
compression_level = getOption("nanoparquet.compression_level", NA_integer_),
num_rows_per_row_group = getOption("nanoparquet.num_rows_per_row_group", 10000000L),
use_arrow_metadata = getOption("nanoparquet.use_arrow_metadata", TRUE),
write_arrow_metadata = getOption("nanoparquet.write_arrow_metadata", TRUE),
Expand All @@ -58,9 +70,19 @@ parquet_options <- function(
num_rows_per_row_group,
"num_rows_per_row_group"
)
if (identical(compression_level, Inf)) {
compression_level <- 100000L
} else if (identical(compression_level, NA) ||
identical(compression_level, NA_integer_) ||
identical(compression_level, NA_real_)) {
compression_level <- NA_integer_
} else {
compression_level <- as_integer_scalar(compression_level, "compression_level")
}

list(
class = class,
compression_level = compression_level,
num_rows_per_row_group = num_rows_per_row_group,
use_arrow_metadata = use_arrow_metadata,
write_arrow_metadata = write_arrow_metadata,
Expand Down
23 changes: 16 additions & 7 deletions R/utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -33,21 +33,30 @@ is_string <- function(x) {
is.character(x) && length(x) == 1 && !is.na(x)
}

is_icount <- function(x) {
is.integer(x) && length(x) == 1 && !is.na(x) && x >= 1L
is_icount <- function(x, zero = FALSE) {
is.integer(x) && length(x) == 1 && !is.na(x) &&
((zero && x >= 0L) || (!zero && x >= 1L))
}

is_dcount <- function(x) {
is_dcount <- function(x, zero = FALSE) {
is.double(x) && length(x) == 1 && !is.na(x) && as.integer(x) == x &&
x >= 1
((zero && x >= 0) || (!zero && x >= 1))
}

as_count <- function(x, name = "x") {
if (is_icount(x)) return(x)
if (is_dcount(x)) return(as.integer(x))
as_count <- function(x, name = "x", zero = FALSE) {
if (is_icount(x, zero)) return(x)
if (is_dcount(x, zero)) return(as.integer(x))
stop(name, " must be a count, i.e. an integer scalar")
}

as_integer_scalar <- function(x, name = "x") {
if (is.integer(x) && length(x) == 1 && !is.na(x)) return(x)
if (is.double(x) && length(x) == 1 && !is.na(x) && as.integer(x) == x) {
return(as.integer(x))
}
stop(name, " must be an integer scalar")
}

is_uint32 <- function(x) {
is.numeric(x) && length(x) == 1 && !is.na(x) &&
round(x) == x && x >= 0 && x <= 4294967295
Expand Down
10 changes: 10 additions & 0 deletions R/write-parquet.R
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,18 @@ write_parquet <- function(
options = parquet_options()) {

file <- path.expand(file)

codecs <- c("uncompressed" = 0L, "snappy" = 1L, "gzip" = 2L, "zstd" = 6L)
compression <- codecs[match.arg(compression)]
if (is.na(options[["compression_level"]])) {
# -1 is an allowed value for zstd, so we set the default here
if (compression == "zstd") {
options[["compression_level"]] <- 3L
} else {
options[["compression_level"]] <- -1L
}
}

dim <- as.integer(dim(x))

schema <- map_schema_to_df(schema, x, options)
Expand Down
15 changes: 15 additions & 0 deletions man/parquet_options.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 24 additions & 3 deletions src/lib/ParquetOutFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,11 @@ static string type_to_string(Type::type t) {
ParquetOutFile::ParquetOutFile(
std::string filename,
parquet::CompressionCodec::type codec,
int compression_level,
vector<int64_t> &row_group_starts) :
pfile(pfile_), num_rows(0), num_cols(0), num_rows_set(false),
codec(codec), row_group_starts(row_group_starts),
codec(codec), compression_level(compression_level),
row_group_starts(row_group_starts),
mem_buffer(new TMemoryBuffer(1024 * 1024)), // 1MB, what if not enough?
tproto(tproto_factory.getProtocol(mem_buffer)) {

Expand All @@ -54,9 +56,11 @@ ParquetOutFile::ParquetOutFile(
ParquetOutFile::ParquetOutFile(
std::ostream &stream,
parquet::CompressionCodec::type codec,
int compression_level,
vector<int64_t> &row_group_starts) :
pfile(stream), num_rows(0), num_cols(0), num_rows_set(false),
codec(codec), row_group_starts(row_group_starts),
codec(codec), compression_level(compression_level),
row_group_starts(row_group_starts),
mem_buffer(new TMemoryBuffer(1024 * 1024)), // 1MB, what if not enough?
tproto(tproto_factory.getProtocol(mem_buffer)) {

Expand Down Expand Up @@ -408,6 +412,13 @@ size_t ParquetOutFile::compress(

} else if (codec == CompressionCodec::GZIP) {
miniz::MiniZStream mzs;
if (compression_level < 0) {
mzs.compression_level = miniz::MZ_DEFAULT_LEVEL;
} else if (compression_level >= 9) {
mzs.compression_level = 9;
} else {
mzs.compression_level = compression_level;
}
size_t tgt_size_est = mzs.MaxCompressedLength(src_size - skip);
tgt.reset(tgt_size_est + skip);
if (skip > 0) memcpy(tgt.ptr, src.ptr, skip);
Expand All @@ -420,12 +431,22 @@ size_t ParquetOutFile::compress(
size_t tgt_size_est = zstd::ZSTD_compressBound(src_size - skip);
tgt.reset(tgt_size_est);
if (skip > 0) memcpy(tgt.ptr, src.ptr, skip);
int level;
int minlevel = zstd::ZSTD_minCLevel();
int maxlevel = zstd::ZSTD_maxCLevel();
if (compression_level < minlevel) {
level = minlevel;
} else if (compression_level > maxlevel) {
level = maxlevel;
} else {
level = compression_level;
}
size_t tgt_size = zstd::ZSTD_compress(
tgt.ptr + skip,
tgt_size_est,
src.ptr + skip,
src_size - skip,
ZSTD_CLEVEL_DEFAULT
level
);
if (zstd::ZSTD_isError(tgt_size)) {
std::stringstream ss;
Expand Down
3 changes: 3 additions & 0 deletions src/lib/ParquetOutFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@ class ParquetOutFile {
ParquetOutFile(
std::string filename,
parquet::CompressionCodec::type codec,
int compression_level,
std::vector<int64_t> &row_group_starts
);
ParquetOutFile(
std::ostream &stream,
parquet::CompressionCodec::type codec,
int compression_level,
std::vector<int64_t> &row_group_starts
);
void set_num_rows(uint32_t nr);
Expand Down Expand Up @@ -93,6 +95,7 @@ class ParquetOutFile {
uint32_t num_rows, num_cols;
bool num_rows_set;
parquet::CompressionCodec::type codec;
int compression_level;

std::vector<parquet::Encoding::type> encodings;
std::vector<parquet::SchemaElement> schemas;
Expand Down
4 changes: 3 additions & 1 deletion src/miniz/miniz_wrapper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ struct MiniZStream {
break;
}
}
int compression_level = miniz::MZ_DEFAULT_LEVEL;

void FormatException(std::string error_msg) {
throw std::runtime_error(error_msg);
}
Expand Down Expand Up @@ -104,7 +106,7 @@ struct MiniZStream {
}

void Compress(const char *uncompressed_data, size_t uncompressed_size, char *out_data, size_t *out_size) {
auto mz_ret = mz_deflateInit2(&stream, miniz::MZ_DEFAULT_LEVEL, MZ_DEFLATED, -MZ_DEFAULT_WINDOW_BITS, 1, 0);
auto mz_ret = mz_deflateInit2(&stream, compression_level, MZ_DEFLATED, -MZ_DEFAULT_WINDOW_BITS, 1, 0);
if (mz_ret != miniz::MZ_OK) {
FormatException("Failed to initialize miniz", mz_ret);
}
Expand Down
13 changes: 9 additions & 4 deletions src/write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,13 @@ class RParquetOutFile : public ParquetOutFile {
RParquetOutFile(
std::string filename,
parquet::CompressionCodec::type codec,
int compression_level,
std::vector<int64_t> &row_groups
);
RParquetOutFile(
std::ostream &stream,
parquet::CompressionCodec::type codec,
int compsession_level,
std::vector<int64_t> &row_groups
);
void write_int32(std::ostream &file, uint32_t idx, uint64_t from,
Expand Down Expand Up @@ -173,15 +175,17 @@ class RParquetOutFile : public ParquetOutFile {
RParquetOutFile::RParquetOutFile(
std::string filename,
parquet::CompressionCodec::type codec,
int compression_level,
std::vector<int64_t> &row_groups) :
ParquetOutFile(filename, codec, row_groups) {
ParquetOutFile(filename, codec, compression_level, row_groups) {
}

RParquetOutFile::RParquetOutFile(
std::ostream &stream,
parquet::CompressionCodec::type codec,
int compression_level,
std::vector<int64_t> &row_groups) :
ParquetOutFile(stream, codec, row_groups) {
ParquetOutFile(stream, codec, compression_level, row_groups) {
}

void RParquetOutFile::create_dictionary(uint32_t idx, int64_t from,
Expand Down Expand Up @@ -2519,6 +2523,7 @@ SEXP nanoparquet_write_(SEXP dfsxp, SEXP filesxp, SEXP dim, SEXP compression,
}

int dp_ver = INTEGER(get_list_element(options, "write_data_page_version"))[0];
int comp_level = INTEGER(get_list_element(options, "compression_level"))[0];

R_xlen_t nrg = Rf_xlength(row_group_starts);
std::vector<int64_t> row_groups(nrg);
Expand All @@ -2531,15 +2536,15 @@ SEXP nanoparquet_write_(SEXP dfsxp, SEXP filesxp, SEXP dim, SEXP compression,
if (fname == ":raw:") {
MemStream ms;
std::ostream &os = ms.stream();
RParquetOutFile of(os, codec, row_groups);
RParquetOutFile of(os, codec, comp_level, row_groups);
of.data_page_version = dp_ver;
of.write(dfsxp, dim, metadata, required, options, schema, encoding);
R_xlen_t bufsize = ms.size();
SEXP res = Rf_allocVector(RAWSXP, bufsize);
ms.copy(RAW(res), bufsize);
return res;
} else {
RParquetOutFile of(fname, codec, row_groups);
RParquetOutFile of(fname, codec, comp_level, row_groups);
of.data_page_version = dp_ver;
of.write(dfsxp, dim, metadata, required, options, schema, encoding);
return R_NilValue;
Expand Down
21 changes: 21 additions & 0 deletions tests/testthat/test-write-parquet-compression.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
test_that("gzip compression levels", {
tmp <- tempfile(fileext = ".parquet")
on.exit(unlink(tmp), add = TRUE)

df <- test_df()
for (level in c(NA_integer_, 0:11)) {
write_parquet(df, tmp, compression = "gzip", options = parquet_options(compression_level = level))
expect_equal(as.data.frame(read_parquet(tmp)), as.data.frame(df))
}
})

test_that("zstd compression levels", {
tmp <- tempfile(fileext = ".parquet")
on.exit(unlink(tmp), add = TRUE)

df <- test_df()
for (level in c(NA_integer_, (-22):(22))) {
write_parquet(df, tmp, compression = "zstd", options = parquet_options(compression_level = level))
expect_equal(as.data.frame(read_parquet(tmp)), as.data.frame(df))
}
})

0 comments on commit d6f9984

Please sign in to comment.