diff --git a/NEWS.md b/NEWS.md index af0f771..cd1148c 100644 --- a/NEWS.md +++ b/NEWS.md @@ -53,6 +53,9 @@ * `write_parquet(file = ":raw:")` now works correctly for larger data frames (#77). +* New `compression_level` option to select the compression level + manually. See `?parquet_options` for details. (#91). + * `read_parquet()` can now read from an R connection (#71). * `read_parquet()` now reads `DECIMAL` values correctly from `INT32` diff --git a/R/options.R b/R/options.R index bff9c57..275d17c 100644 --- a/R/options.R +++ b/R/options.R @@ -5,6 +5,17 @@ #' in [read_parquet()]. By default nanoparquet adds the `"tbl"` class, #' so data frames are printed differently if the pillar package is #' loaded. +#' @param compression_level The compression level in [write_parquet()]. +#' `NA` is the default, and it specifies the default +#' compression level of each method. `Inf` always selects the highest +#' possible compression level. More details: +#' * Snappy does not support compression levels currently. +#' * GZIP supports levels from 0 (uncompressed), 1 (fastest), to 9 (best). +#' The default is 6. +#' * ZSTD allows positive levels up to 22 currently. 20 and above require +#' more memory. Negative levels are also allowed, the lower the level, +#' the faster the speed, at the cost of compression. Currently the +#' smallest level is -131072. The default level is 3. #' @param num_rows_per_row_group The number of rows to put into a row #' group, if row groups are not specified explicitly. It should be #' an integer scalar. Defaults to 10 million. @@ -40,6 +51,7 @@ parquet_options <- function( class = getOption("nanoparquet.class", "tbl"), + compression_level = getOption("nanoparquet.compression_level", NA_integer_), num_rows_per_row_group = getOption("nanoparquet.num_rows_per_row_group", 10000000L), use_arrow_metadata = getOption("nanoparquet.use_arrow_metadata", TRUE), write_arrow_metadata = getOption("nanoparquet.write_arrow_metadata", TRUE), @@ -58,9 +70,19 @@ parquet_options <- function( num_rows_per_row_group, "num_rows_per_row_group" ) + if (identical(compression_level, Inf)) { + compression_level <- 100000L + } else if (identical(compression_level, NA) || + identical(compression_level, NA_integer_) || + identical(compression_level, NA_real_)) { + compression_level <- NA_integer_ + } else { + compression_level <- as_integer_scalar(compression_level, "compression_level") + } list( class = class, + compression_level = compression_level, num_rows_per_row_group = num_rows_per_row_group, use_arrow_metadata = use_arrow_metadata, write_arrow_metadata = write_arrow_metadata, diff --git a/R/utils.R b/R/utils.R index 6b7bb20..af2be45 100644 --- a/R/utils.R +++ b/R/utils.R @@ -33,21 +33,30 @@ is_string <- function(x) { is.character(x) && length(x) == 1 && !is.na(x) } -is_icount <- function(x) { - is.integer(x) && length(x) == 1 && !is.na(x) && x >= 1L +is_icount <- function(x, zero = FALSE) { + is.integer(x) && length(x) == 1 && !is.na(x) && + ((zero && x >= 0L) || (!zero && x >= 1L)) } -is_dcount <- function(x) { +is_dcount <- function(x, zero = FALSE) { is.double(x) && length(x) == 1 && !is.na(x) && as.integer(x) == x && - x >= 1 + ((zero && x >= 0) || (!zero && x >= 1)) } -as_count <- function(x, name = "x") { - if (is_icount(x)) return(x) - if (is_dcount(x)) return(as.integer(x)) +as_count <- function(x, name = "x", zero = FALSE) { + if (is_icount(x, zero)) return(x) + if (is_dcount(x, zero)) return(as.integer(x)) stop(name, " must be a count, i.e. an integer scalar") } +as_integer_scalar <- function(x, name = "x") { + if (is.integer(x) && length(x) == 1 && !is.na(x)) return(x) + if (is.double(x) && length(x) == 1 && !is.na(x) && as.integer(x) == x) { + return(as.integer(x)) + } + stop(name, " must be an integer scalar") +} + is_uint32 <- function(x) { is.numeric(x) && length(x) == 1 && !is.na(x) && round(x) == x && x >= 0 && x <= 4294967295 diff --git a/R/write-parquet.R b/R/write-parquet.R index 7410656..b60a403 100644 --- a/R/write-parquet.R +++ b/R/write-parquet.R @@ -72,8 +72,18 @@ write_parquet <- function( options = parquet_options()) { file <- path.expand(file) + codecs <- c("uncompressed" = 0L, "snappy" = 1L, "gzip" = 2L, "zstd" = 6L) compression <- codecs[match.arg(compression)] + if (is.na(options[["compression_level"]])) { + # -1 is an allowed value for zstd, so we set the default here + if (compression == "zstd") { + options[["compression_level"]] <- 3L + } else { + options[["compression_level"]] <- -1L + } + } + dim <- as.integer(dim(x)) schema <- map_schema_to_df(schema, x, options) diff --git a/man/parquet_options.Rd b/man/parquet_options.Rd index f79a011..b0c0a1c 100644 --- a/man/parquet_options.Rd +++ b/man/parquet_options.Rd @@ -6,6 +6,7 @@ \usage{ parquet_options( class = getOption("nanoparquet.class", "tbl"), + compression_level = getOption("nanoparquet.compression_level", NA_integer_), num_rows_per_row_group = getOption("nanoparquet.num_rows_per_row_group", 10000000L), use_arrow_metadata = getOption("nanoparquet.use_arrow_metadata", TRUE), write_arrow_metadata = getOption("nanoparquet.write_arrow_metadata", TRUE), @@ -18,6 +19,20 @@ in \code{\link[=read_parquet]{read_parquet()}}. By default nanoparquet adds the so data frames are printed differently if the pillar package is loaded.} +\item{compression_level}{The compression level in \code{\link[=write_parquet]{write_parquet()}}. +\code{NA} is the default, and it specifies the default +compression level of each method. \code{Inf} always selects the highest +possible compression level. More details: +\itemize{ +\item Snappy does not support compression levels currently. +\item GZIP supports levels from 0 (uncompressed), 1 (fastest), to 9 (best). +The default is 6. +\item ZSTD allows positive levels up to 22 currently. 20 and above require +more memory. Negative levels are also allowed, the lower the level, +the faster the speed, at the cost of compression. Currently the +smallest level is -131072. The default level is 3. +}} + \item{num_rows_per_row_group}{The number of rows to put into a row group, if row groups are not specified explicitly. It should be an integer scalar. Defaults to 10 million.} diff --git a/src/lib/ParquetOutFile.cpp b/src/lib/ParquetOutFile.cpp index be9006e..51f9bb3 100644 --- a/src/lib/ParquetOutFile.cpp +++ b/src/lib/ParquetOutFile.cpp @@ -35,9 +35,11 @@ static string type_to_string(Type::type t) { ParquetOutFile::ParquetOutFile( std::string filename, parquet::CompressionCodec::type codec, + int compression_level, vector &row_group_starts) : pfile(pfile_), num_rows(0), num_cols(0), num_rows_set(false), - codec(codec), row_group_starts(row_group_starts), + codec(codec), compression_level(compression_level), + row_group_starts(row_group_starts), mem_buffer(new TMemoryBuffer(1024 * 1024)), // 1MB, what if not enough? tproto(tproto_factory.getProtocol(mem_buffer)) { @@ -54,9 +56,11 @@ ParquetOutFile::ParquetOutFile( ParquetOutFile::ParquetOutFile( std::ostream &stream, parquet::CompressionCodec::type codec, + int compression_level, vector &row_group_starts) : pfile(stream), num_rows(0), num_cols(0), num_rows_set(false), - codec(codec), row_group_starts(row_group_starts), + codec(codec), compression_level(compression_level), + row_group_starts(row_group_starts), mem_buffer(new TMemoryBuffer(1024 * 1024)), // 1MB, what if not enough? tproto(tproto_factory.getProtocol(mem_buffer)) { @@ -408,6 +412,13 @@ size_t ParquetOutFile::compress( } else if (codec == CompressionCodec::GZIP) { miniz::MiniZStream mzs; + if (compression_level < 0) { + mzs.compression_level = miniz::MZ_DEFAULT_LEVEL; + } else if (compression_level >= 9) { + mzs.compression_level = 9; + } else { + mzs.compression_level = compression_level; + } size_t tgt_size_est = mzs.MaxCompressedLength(src_size - skip); tgt.reset(tgt_size_est + skip); if (skip > 0) memcpy(tgt.ptr, src.ptr, skip); @@ -420,12 +431,22 @@ size_t ParquetOutFile::compress( size_t tgt_size_est = zstd::ZSTD_compressBound(src_size - skip); tgt.reset(tgt_size_est); if (skip > 0) memcpy(tgt.ptr, src.ptr, skip); + int level; + int minlevel = zstd::ZSTD_minCLevel(); + int maxlevel = zstd::ZSTD_maxCLevel(); + if (compression_level < minlevel) { + level = minlevel; + } else if (compression_level > maxlevel) { + level = maxlevel; + } else { + level = compression_level; + } size_t tgt_size = zstd::ZSTD_compress( tgt.ptr + skip, tgt_size_est, src.ptr + skip, src_size - skip, - ZSTD_CLEVEL_DEFAULT + level ); if (zstd::ZSTD_isError(tgt_size)) { std::stringstream ss; diff --git a/src/lib/ParquetOutFile.h b/src/lib/ParquetOutFile.h index 494bc6f..4adce01 100644 --- a/src/lib/ParquetOutFile.h +++ b/src/lib/ParquetOutFile.h @@ -16,11 +16,13 @@ class ParquetOutFile { ParquetOutFile( std::string filename, parquet::CompressionCodec::type codec, + int compression_level, std::vector &row_group_starts ); ParquetOutFile( std::ostream &stream, parquet::CompressionCodec::type codec, + int compression_level, std::vector &row_group_starts ); void set_num_rows(uint32_t nr); @@ -93,6 +95,7 @@ class ParquetOutFile { uint32_t num_rows, num_cols; bool num_rows_set; parquet::CompressionCodec::type codec; + int compression_level; std::vector encodings; std::vector schemas; diff --git a/src/miniz/miniz_wrapper.hpp b/src/miniz/miniz_wrapper.hpp index 670b631..f4a1985 100644 --- a/src/miniz/miniz_wrapper.hpp +++ b/src/miniz/miniz_wrapper.hpp @@ -42,6 +42,8 @@ struct MiniZStream { break; } } + int compression_level = miniz::MZ_DEFAULT_LEVEL; + void FormatException(std::string error_msg) { throw std::runtime_error(error_msg); } @@ -104,7 +106,7 @@ struct MiniZStream { } void Compress(const char *uncompressed_data, size_t uncompressed_size, char *out_data, size_t *out_size) { - auto mz_ret = mz_deflateInit2(&stream, miniz::MZ_DEFAULT_LEVEL, MZ_DEFLATED, -MZ_DEFAULT_WINDOW_BITS, 1, 0); + auto mz_ret = mz_deflateInit2(&stream, compression_level, MZ_DEFLATED, -MZ_DEFAULT_WINDOW_BITS, 1, 0); if (mz_ret != miniz::MZ_OK) { FormatException("Failed to initialize miniz", mz_ret); } diff --git a/src/write.cpp b/src/write.cpp index ce3616f..71eb8c9 100644 --- a/src/write.cpp +++ b/src/write.cpp @@ -96,11 +96,13 @@ class RParquetOutFile : public ParquetOutFile { RParquetOutFile( std::string filename, parquet::CompressionCodec::type codec, + int compression_level, std::vector &row_groups ); RParquetOutFile( std::ostream &stream, parquet::CompressionCodec::type codec, + int compsession_level, std::vector &row_groups ); void write_int32(std::ostream &file, uint32_t idx, uint64_t from, @@ -173,15 +175,17 @@ class RParquetOutFile : public ParquetOutFile { RParquetOutFile::RParquetOutFile( std::string filename, parquet::CompressionCodec::type codec, + int compression_level, std::vector &row_groups) : - ParquetOutFile(filename, codec, row_groups) { + ParquetOutFile(filename, codec, compression_level, row_groups) { } RParquetOutFile::RParquetOutFile( std::ostream &stream, parquet::CompressionCodec::type codec, + int compression_level, std::vector &row_groups) : - ParquetOutFile(stream, codec, row_groups) { + ParquetOutFile(stream, codec, compression_level, row_groups) { } void RParquetOutFile::create_dictionary(uint32_t idx, int64_t from, @@ -2519,6 +2523,7 @@ SEXP nanoparquet_write_(SEXP dfsxp, SEXP filesxp, SEXP dim, SEXP compression, } int dp_ver = INTEGER(get_list_element(options, "write_data_page_version"))[0]; + int comp_level = INTEGER(get_list_element(options, "compression_level"))[0]; R_xlen_t nrg = Rf_xlength(row_group_starts); std::vector row_groups(nrg); @@ -2531,7 +2536,7 @@ SEXP nanoparquet_write_(SEXP dfsxp, SEXP filesxp, SEXP dim, SEXP compression, if (fname == ":raw:") { MemStream ms; std::ostream &os = ms.stream(); - RParquetOutFile of(os, codec, row_groups); + RParquetOutFile of(os, codec, comp_level, row_groups); of.data_page_version = dp_ver; of.write(dfsxp, dim, metadata, required, options, schema, encoding); R_xlen_t bufsize = ms.size(); @@ -2539,7 +2544,7 @@ SEXP nanoparquet_write_(SEXP dfsxp, SEXP filesxp, SEXP dim, SEXP compression, ms.copy(RAW(res), bufsize); return res; } else { - RParquetOutFile of(fname, codec, row_groups); + RParquetOutFile of(fname, codec, comp_level, row_groups); of.data_page_version = dp_ver; of.write(dfsxp, dim, metadata, required, options, schema, encoding); return R_NilValue; diff --git a/tests/testthat/test-write-parquet-compression.R b/tests/testthat/test-write-parquet-compression.R new file mode 100644 index 0000000..1f9fdac --- /dev/null +++ b/tests/testthat/test-write-parquet-compression.R @@ -0,0 +1,21 @@ +test_that("gzip compression levels", { + tmp <- tempfile(fileext = ".parquet") + on.exit(unlink(tmp), add = TRUE) + + df <- test_df() + for (level in c(NA_integer_, 0:11)) { + write_parquet(df, tmp, compression = "gzip", options = parquet_options(compression_level = level)) + expect_equal(as.data.frame(read_parquet(tmp)), as.data.frame(df)) + } +}) + +test_that("zstd compression levels", { + tmp <- tempfile(fileext = ".parquet") + on.exit(unlink(tmp), add = TRUE) + + df <- test_df() + for (level in c(NA_integer_, (-22):(22))) { + write_parquet(df, tmp, compression = "zstd", options = parquet_options(compression_level = level)) + expect_equal(as.data.frame(read_parquet(tmp)), as.data.frame(df)) + } +})