From 91336cd4eaadbec212fd5a191329d173cf2dc3ea Mon Sep 17 00:00:00 2001 From: Edgar Ruiz Date: Wed, 24 Jan 2024 12:47:29 -0600 Subject: [PATCH 01/28] Adds Arrow conf sets --- R/sparklyr-spark-connect.R | 2 ++ 1 file changed, 2 insertions(+) diff --git a/R/sparklyr-spark-connect.R b/R/sparklyr-spark-connect.R index 41847ae..34a1c47 100644 --- a/R/sparklyr-spark-connect.R +++ b/R/sparklyr-spark-connect.R @@ -173,6 +173,8 @@ initialize_connection <- function( get_version <- try(session$version, silent = TRUE) if (inherits(get_version, "try-error")) databricks_dbr_error(get_version) session$conf$set("spark.sql.session.localRelationCacheThreshold", 1048576L) + session$conf$set("spark.sql.execution.arrow.pyspark.enabled", "true") + session$conf$set("spark.sql.execution.arrow.pyspark.fallback.enabled", "false") # do we need this `spark_context` object? spark_context <- list(spark_context = session) From 83b21bba60a9fd2401ccecfffbc8c467ad93ed44 Mon Sep 17 00:00:00 2001 From: Edgar Ruiz Date: Wed, 31 Jan 2024 08:54:20 -0600 Subject: [PATCH 02/28] Adds spark_apply method and copies initial code --- NAMESPACE | 2 + R/package.R | 2 +- R/sparklyr-spark-apply.R | 79 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 82 insertions(+), 1 deletion(-) create mode 100644 R/sparklyr-spark-apply.R diff --git a/NAMESPACE b/NAMESPACE index 65e278f..8bb4839 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -40,6 +40,7 @@ S3method(sdf_copy_to,pyspark_connection) S3method(sdf_read_column,spark_pyjobj) S3method(sdf_register,spark_pyobj) S3method(sdf_schema,tbl_pyspark) +S3method(spark_apply,tbl_pyspark) S3method(spark_connect_method,spark_method_databricks_connect) S3method(spark_connect_method,spark_method_spark_connect) S3method(spark_connection,pyspark_connection) @@ -157,6 +158,7 @@ importFrom(sparklyr,sdf_copy_to) importFrom(sparklyr,sdf_read_column) importFrom(sparklyr,sdf_register) importFrom(sparklyr,sdf_schema) +importFrom(sparklyr,spark_apply) importFrom(sparklyr,spark_connect_method) importFrom(sparklyr,spark_connection) importFrom(sparklyr,spark_dataframe) diff --git a/R/package.R b/R/package.R index bd0fadf..b6bb07f 100644 --- a/R/package.R +++ b/R/package.R @@ -11,7 +11,7 @@ #' @importFrom sparklyr spark_write_orc spark_write_json spark_write_table #' @importFrom sparklyr ml_pipeline ml_predict ml_transform ml_fit #' @importFrom sparklyr ml_logistic_regression ft_standard_scaler ft_max_abs_scaler -#' @importFrom sparklyr ml_save ml_load spark_jobj spark_install_find +#' @importFrom sparklyr ml_save ml_load spark_jobj spark_install_find spark_apply #' @importFrom tidyselect tidyselect_data_has_predicates #' @importFrom dplyr tbl collect tibble same_src compute as_tibble group_vars #' @importFrom dplyr sample_n sample_frac slice_sample select tbl_ptype group_by diff --git a/R/sparklyr-spark-apply.R b/R/sparklyr-spark-apply.R new file mode 100644 index 0000000..ae61933 --- /dev/null +++ b/R/sparklyr-spark-apply.R @@ -0,0 +1,79 @@ +#' @export +spark_apply.tbl_pyspark <- function( + x, + f, + columns = NULL, + memory = TRUE, + group_by = NULL, + packages = NULL, + context = NULL, + name = NULL, + barrier = NULL, + fetch_result_as_sdf = TRUE, + partition_index_param = "", + arrow_max_records_per_batch = NULL, + auto_deps = FALSE, + ...) { + sa_in_pandas( + x = x, + .f = f, + .schema = columns, + .group_by = group_by + ) +} + +sa_in_pandas <- function(x, .f, ..., .schema = "x double", .group_by = NULL) { + fn <- sa_function_to_string(.f = .f, .group_by = .group_by, ... = ...) + py_run_string(fn) + main <- reticulate::import_main() + df <- x[[1]]$session + if (!is.null(.group_by)) { + # TODO: Add support for multiple grouping columns + renamed_gp <- paste0("_", .group_by) + w_gp <- df$withColumn(colName = renamed_gp, col = df[.group_by]) + tbl_gp <- w_gp$groupby(renamed_gp) + ret <- tbl_gp$applyInPandas(main$r_apply, schema = .schema)$toPandas() + } else { + ret <- df$mapInPandas(main$r_apply, schema = .schema)$toPandas() + } + ret +} + +sa_function_to_string <- function(.f, .group_by = NULL, ...) { + path_scripts <- system.file("udf", package = "pysparklyr") + if (!is.null(.group_by)) { + udf_r <- "udf-apply.R" + udf_py <- "udf-apply.py" + } else { + udf_r <- "udf-map.R" + udf_py <- "udf-map.py" + } + fn_r <- paste0( + readLines(path(path_scripts, udf_r)), + collapse = "" + ) + fn_python <- paste0( + readLines(path(path_scripts, udf_py)), + collapse = "\n" + ) + if (!is.null(.group_by)) { + fn_r <- gsub( + "gp_field <- 'am'", + paste0("gp_field <- '", .group_by, "'"), + fn_r + ) + } + fn <- purrr::as_mapper(.f = .f, ... = ...) + fn_str <- paste0(deparse(fn), collapse = "") + if (inherits(fn, "rlang_lambda_function")) { + fn_str <- paste0( + "function(...) {x <- (", + fn_str, + "); x(...)}" + ) + } + fn_str <- gsub("\"", "'", fn_str) + fn_rep <- "function\\(\\.\\.\\.\\) 1" + fn_r_new <- gsub(fn_rep, fn_str, fn_r) + gsub(fn_rep, fn_r_new, fn_python) +} From d2e4f7a087427f2700d10cdde666d3ceb7b39e4c Mon Sep 17 00:00:00 2001 From: Edgar Ruiz Date: Wed, 31 Jan 2024 08:54:30 -0600 Subject: [PATCH 03/28] Adds udf scripts to inst --- inst/udf/udf-apply.R | 14 ++++++++++++++ inst/udf/udf-apply.py | 8 ++++++++ inst/udf/udf-map.R | 11 +++++++++++ inst/udf/udf-map.py | 8 ++++++++ 4 files changed, 41 insertions(+) create mode 100644 inst/udf/udf-apply.R create mode 100644 inst/udf/udf-apply.py create mode 100644 inst/udf/udf-map.R create mode 100644 inst/udf/udf-map.py diff --git a/inst/udf/udf-apply.R b/inst/udf/udf-apply.R new file mode 100644 index 0000000..c8acdb4 --- /dev/null +++ b/inst/udf/udf-apply.R @@ -0,0 +1,14 @@ +function(df = mtcars) { + library(arrow); + fn <- function(...) 1; + fn_run <- fn(df); + gp_field <- 'am'; + gp <- df[1, gp_field]; + if(is.vector(fn_run)) { + ret <- data.frame(x = fn_run); + } else { + ret <- as.data.frame(fn_run); + }; + ret[gp_field] <- gp; + ret +} diff --git a/inst/udf/udf-apply.py b/inst/udf/udf-apply.py new file mode 100644 index 0000000..50d4648 --- /dev/null +++ b/inst/udf/udf-apply.py @@ -0,0 +1,8 @@ +import pandas as pd +import rpy2.robjects as robjects +from rpy2.robjects import pandas2ri +def r_apply(pdf: pd.DataFrame) -> pd.DataFrame: + pandas2ri.activate() + r_func =robjects.r("function(...) 1") + ret = r_func(pdf) + return pandas2ri.rpy2py_dataframe(ret) diff --git a/inst/udf/udf-map.R b/inst/udf/udf-map.R new file mode 100644 index 0000000..2838bc8 --- /dev/null +++ b/inst/udf/udf-map.R @@ -0,0 +1,11 @@ +function(df = mtcars) { + library(arrow); + fn <- function(...) 1; + fn_run <- fn(df); + if(is.vector(fn_run)) { + ret <- data.frame(x = fn_run); + } else { + ret <- as.data.frame(fn_run); + }; + ret +} diff --git a/inst/udf/udf-map.py b/inst/udf/udf-map.py new file mode 100644 index 0000000..47c02e2 --- /dev/null +++ b/inst/udf/udf-map.py @@ -0,0 +1,8 @@ +import rpy2.robjects as robjects +from rpy2.robjects import pandas2ri +def r_apply(iterator): + for pdf in iterator: + pandas2ri.activate() + r_func = robjects.r("function(...) 1") + ret = r_func(pdf) + yield pandas2ri.rpy2py_dataframe(ret) From 91ca9fd5c60de8b71f27bb48740c03a128e7b1a2 Mon Sep 17 00:00:00 2001 From: Edgar Ruiz Date: Wed, 31 Jan 2024 09:39:23 -0600 Subject: [PATCH 04/28] Passes default schema if not is passed --- R/sparklyr-spark-apply.R | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/R/sparklyr-spark-apply.R b/R/sparklyr-spark-apply.R index ae61933..b956bbd 100644 --- a/R/sparklyr-spark-apply.R +++ b/R/sparklyr-spark-apply.R @@ -17,14 +17,16 @@ spark_apply.tbl_pyspark <- function( sa_in_pandas( x = x, .f = f, - .schema = columns, - .group_by = group_by + .schema = columns %||% "x double", + .group_by = group_by, + ... = ... ) } sa_in_pandas <- function(x, .f, ..., .schema = "x double", .group_by = NULL) { - fn <- sa_function_to_string(.f = .f, .group_by = .group_by, ... = ...) - py_run_string(fn) + .f %>% + sa_function_to_string(.group_by = .group_by, ... = ...) %>% + py_run_string() main <- reticulate::import_main() df <- x[[1]]$session if (!is.null(.group_by)) { From aec4e152d9f70bda4b19fe968e827285317d8210 Mon Sep 17 00:00:00 2001 From: Edgar Ruiz Date: Wed, 31 Jan 2024 10:19:56 -0600 Subject: [PATCH 05/28] Adds aborts for some unsopported args --- R/sparklyr-spark-apply.R | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/R/sparklyr-spark-apply.R b/R/sparklyr-spark-apply.R index b956bbd..2304937 100644 --- a/R/sparklyr-spark-apply.R +++ b/R/sparklyr-spark-apply.R @@ -14,6 +14,14 @@ spark_apply.tbl_pyspark <- function( arrow_max_records_per_batch = NULL, auto_deps = FALSE, ...) { + cli_div(theme = cli_colors()) + args <- as.list(environment()) + if(!is.null(args$packages)) { + cli_abort("`packages` is not yet supported for this backend") + } + if(!is.null(args$context)) { + cli_abort("`context` is not supported for this backend") + } sa_in_pandas( x = x, .f = f, @@ -21,6 +29,7 @@ spark_apply.tbl_pyspark <- function( .group_by = group_by, ... = ... ) + cli_end() } sa_in_pandas <- function(x, .f, ..., .schema = "x double", .group_by = NULL) { From 4875d7730fa839422aaded200948f033a37145a2 Mon Sep 17 00:00:00 2001 From: Edgar Ruiz Date: Wed, 31 Jan 2024 10:42:48 -0600 Subject: [PATCH 06/28] Adds support to as_sdf and naming --- R/sparklyr-spark-apply.R | 29 ++++++++++++++++++++++++----- 1 file changed, 24 insertions(+), 5 deletions(-) diff --git a/R/sparklyr-spark-apply.R b/R/sparklyr-spark-apply.R index 2304937..dab30ca 100644 --- a/R/sparklyr-spark-apply.R +++ b/R/sparklyr-spark-apply.R @@ -22,30 +22,49 @@ spark_apply.tbl_pyspark <- function( if(!is.null(args$context)) { cli_abort("`context` is not supported for this backend") } + cli_end() sa_in_pandas( x = x, .f = f, .schema = columns %||% "x double", .group_by = group_by, + .as_sdf = fetch_result_as_sdf, + .name = name, ... = ... ) - cli_end() } -sa_in_pandas <- function(x, .f, ..., .schema = "x double", .group_by = NULL) { +sa_in_pandas <- function( + x, + .f, + ..., + .schema = "x double", + .group_by = NULL, + .as_sdf = TRUE, + .name = NULL + ) { .f %>% sa_function_to_string(.group_by = .group_by, ... = ...) %>% py_run_string() main <- reticulate::import_main() - df <- x[[1]]$session + df <- python_sdf(x) if (!is.null(.group_by)) { # TODO: Add support for multiple grouping columns renamed_gp <- paste0("_", .group_by) w_gp <- df$withColumn(colName = renamed_gp, col = df[.group_by]) tbl_gp <- w_gp$groupby(renamed_gp) - ret <- tbl_gp$applyInPandas(main$r_apply, schema = .schema)$toPandas() + p_df <- tbl_gp$applyInPandas(main$r_apply, schema = .schema) + } else { + p_df <- df$mapInPandas(main$r_apply, schema = .schema) + } + if(.as_sdf) { + ret <- tbl_pyspark_temp( + x = p_df, + conn = spark_connection(x), + tmp_name = .name + ) } else { - ret <- df$mapInPandas(main$r_apply, schema = .schema)$toPandas() + ret <- to_pandas_cleaned(p_df) } ret } From f9261f729aeb647ccc5362166fcfd82de375db52 Mon Sep 17 00:00:00 2001 From: Edgar Ruiz Date: Wed, 31 Jan 2024 11:32:36 -0600 Subject: [PATCH 07/28] simplifies script selection --- R/sparklyr-spark-apply.R | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/R/sparklyr-spark-apply.R b/R/sparklyr-spark-apply.R index dab30ca..3e19491 100644 --- a/R/sparklyr-spark-apply.R +++ b/R/sparklyr-spark-apply.R @@ -71,19 +71,13 @@ sa_in_pandas <- function( sa_function_to_string <- function(.f, .group_by = NULL, ...) { path_scripts <- system.file("udf", package = "pysparklyr") - if (!is.null(.group_by)) { - udf_r <- "udf-apply.R" - udf_py <- "udf-apply.py" - } else { - udf_r <- "udf-map.R" - udf_py <- "udf-map.py" - } + udf_fn <- ifelse(is.null(.group_by), "map", "apply") fn_r <- paste0( - readLines(path(path_scripts, udf_r)), + readLines(path(path_scripts, glue("udf-{udf_fn}.R"))), collapse = "" ) fn_python <- paste0( - readLines(path(path_scripts, udf_py)), + readLines(path(path_scripts, glue("udf-{udf_fn}.py"))), collapse = "\n" ) if (!is.null(.group_by)) { From 1b295460d0bffb5242d336adf941b1181f70c4b8 Mon Sep 17 00:00:00 2001 From: Edgar Ruiz Date: Wed, 31 Jan 2024 11:59:46 -0600 Subject: [PATCH 08/28] Supresses yet another warning --- R/sparklyr-spark-connect.R | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/R/sparklyr-spark-connect.R b/R/sparklyr-spark-connect.R index 34a1c47..74c97a5 100644 --- a/R/sparklyr-spark-connect.R +++ b/R/sparklyr-spark-connect.R @@ -169,6 +169,10 @@ initialize_connection <- function( message = "'SparkSession' object has no attribute 'setLocalProperty'", module = "pyspark" ) + warnings$filterwarnings( + "ignore", + message = "Index.format is deprecated and will be removed in a future version" + ) session <- conn$getOrCreate() get_version <- try(session$version, silent = TRUE) if (inherits(get_version, "try-error")) databricks_dbr_error(get_version) From b4346c8727879e269cb41831f291f7c7b14d3300 Mon Sep 17 00:00:00 2001 From: Edgar Ruiz Date: Wed, 31 Jan 2024 12:01:48 -0600 Subject: [PATCH 09/28] Converts to pandas if df is a lazy table --- R/sparklyr-spark-apply.R | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/R/sparklyr-spark-apply.R b/R/sparklyr-spark-apply.R index 3e19491..cdfa340 100644 --- a/R/sparklyr-spark-apply.R +++ b/R/sparklyr-spark-apply.R @@ -48,6 +48,11 @@ sa_in_pandas <- function( py_run_string() main <- reticulate::import_main() df <- python_sdf(x) + if(is.null(df)) { + df <- x %>% + compute() %>% + python_sdf() + } if (!is.null(.group_by)) { # TODO: Add support for multiple grouping columns renamed_gp <- paste0("_", .group_by) From cd5586e6aab5345b077e88aa138f2b8d5f229255 Mon Sep 17 00:00:00 2001 From: Edgar Ruiz Date: Wed, 31 Jan 2024 12:26:22 -0600 Subject: [PATCH 10/28] Adds support for barrier --- R/sparklyr-spark-apply.R | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/R/sparklyr-spark-apply.R b/R/sparklyr-spark-apply.R index cdfa340..2129f51 100644 --- a/R/sparklyr-spark-apply.R +++ b/R/sparklyr-spark-apply.R @@ -30,6 +30,7 @@ spark_apply.tbl_pyspark <- function( .group_by = group_by, .as_sdf = fetch_result_as_sdf, .name = name, + .barrier = barrier, ... = ... ) } @@ -41,7 +42,8 @@ sa_in_pandas <- function( .schema = "x double", .group_by = NULL, .as_sdf = TRUE, - .name = NULL + .name = NULL, + .barrier = NULL ) { .f %>% sa_function_to_string(.group_by = .group_by, ... = ...) %>% @@ -58,9 +60,16 @@ sa_in_pandas <- function( renamed_gp <- paste0("_", .group_by) w_gp <- df$withColumn(colName = renamed_gp, col = df[.group_by]) tbl_gp <- w_gp$groupby(renamed_gp) - p_df <- tbl_gp$applyInPandas(main$r_apply, schema = .schema) + p_df <- tbl_gp$applyInPandas( + main$r_apply, + schema = .schema + ) } else { - p_df <- df$mapInPandas(main$r_apply, schema = .schema) + p_df <- df$mapInPandas( + main$r_apply, + schema = .schema, + barrier = .barrier %||% FALSE + ) } if(.as_sdf) { ret <- tbl_pyspark_temp( From 4efa31677f555a91f9f9484131a52355e4b48c8d Mon Sep 17 00:00:00 2001 From: Edgar Ruiz Date: Wed, 31 Jan 2024 13:14:03 -0600 Subject: [PATCH 11/28] Adds auto_deps and partition_index_param failure if used --- R/sparklyr-spark-apply.R | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/R/sparklyr-spark-apply.R b/R/sparklyr-spark-apply.R index 2129f51..bdf86f6 100644 --- a/R/sparklyr-spark-apply.R +++ b/R/sparklyr-spark-apply.R @@ -15,13 +15,18 @@ spark_apply.tbl_pyspark <- function( auto_deps = FALSE, ...) { cli_div(theme = cli_colors()) - args <- as.list(environment()) - if(!is.null(args$packages)) { + if(!is.null(packages)) { cli_abort("`packages` is not yet supported for this backend") } - if(!is.null(args$context)) { + if(!is.null(context)) { cli_abort("`context` is not supported for this backend") } + if(auto_deps) { + cli_abort("`auto_deps` is not supported for this backend") + } + if(partition_index_param != "") { + cli_abort("`partition_index_param` is not supported for this backend") + } cli_end() sa_in_pandas( x = x, From fd8cc2fbc5d58f12d34932663fd37fb2ba2399f7 Mon Sep 17 00:00:00 2001 From: Edgar Ruiz Date: Wed, 31 Jan 2024 13:31:42 -0600 Subject: [PATCH 12/28] Failure if batch arrow size is passed --- R/sparklyr-spark-apply.R | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/R/sparklyr-spark-apply.R b/R/sparklyr-spark-apply.R index bdf86f6..eed1973 100644 --- a/R/sparklyr-spark-apply.R +++ b/R/sparklyr-spark-apply.R @@ -27,6 +27,11 @@ spark_apply.tbl_pyspark <- function( if(partition_index_param != "") { cli_abort("`partition_index_param` is not supported for this backend") } + if(i!s.null(arrow_max_records_per_batch)) { + cli_abort( + "`arrow_max_records_per_batch` is not yet supported for this backend" + ) + } cli_end() sa_in_pandas( x = x, From 2bf086cdc02ffad268246c4a2ba9bc2e9b652cb1 Mon Sep 17 00:00:00 2001 From: Edgar Ruiz Date: Wed, 31 Jan 2024 13:54:58 -0600 Subject: [PATCH 13/28] Ability to pull resulting R function only --- R/sparklyr-spark-apply.R | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/R/sparklyr-spark-apply.R b/R/sparklyr-spark-apply.R index eed1973..2a4b6c5 100644 --- a/R/sparklyr-spark-apply.R +++ b/R/sparklyr-spark-apply.R @@ -27,7 +27,7 @@ spark_apply.tbl_pyspark <- function( if(partition_index_param != "") { cli_abort("`partition_index_param` is not supported for this backend") } - if(i!s.null(arrow_max_records_per_batch)) { + if(!is.null(arrow_max_records_per_batch)) { cli_abort( "`arrow_max_records_per_batch` is not yet supported for this backend" ) @@ -93,7 +93,7 @@ sa_in_pandas <- function( ret } -sa_function_to_string <- function(.f, .group_by = NULL, ...) { +sa_function_to_string <- function(.f, .group_by = NULL, .r_only = FALSE, ...) { path_scripts <- system.file("udf", package = "pysparklyr") udf_fn <- ifelse(is.null(.group_by), "map", "apply") fn_r <- paste0( @@ -123,5 +123,10 @@ sa_function_to_string <- function(.f, .group_by = NULL, ...) { fn_str <- gsub("\"", "'", fn_str) fn_rep <- "function\\(\\.\\.\\.\\) 1" fn_r_new <- gsub(fn_rep, fn_str, fn_r) - gsub(fn_rep, fn_r_new, fn_python) + if(.r_only) { + ret <- fn_r_new + } else { + ret <- gsub(fn_rep, fn_r_new, fn_python) + } + ret } From 67b59ed1deb6b654c2e8e81cd59001161874fc10 Mon Sep 17 00:00:00 2001 From: Edgar Ruiz Date: Wed, 31 Jan 2024 18:30:56 -0600 Subject: [PATCH 14/28] Makes sure that grouping column is first on R UDF --- inst/udf/udf-apply.R | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/inst/udf/udf-apply.R b/inst/udf/udf-apply.R index c8acdb4..dff682e 100644 --- a/inst/udf/udf-apply.R +++ b/inst/udf/udf-apply.R @@ -3,12 +3,15 @@ function(df = mtcars) { fn <- function(...) 1; fn_run <- fn(df); gp_field <- 'am'; - gp <- df[1, gp_field]; + gp <- df[1, gp_field]; if(is.vector(fn_run)) { ret <- data.frame(x = fn_run); } else { ret <- as.data.frame(fn_run); }; ret[gp_field] <- gp; - ret + cols <- colnames(ret); + gp_cols <- cols == gp_field; + new_cols <- c(cols[gp_cols], cols[!gp_cols]); + ret[, new_cols] } From 04e67dd4555f403a73491b7e6a18d673914b1fc8 Mon Sep 17 00:00:00 2001 From: Edgar Ruiz Date: Wed, 31 Jan 2024 18:31:24 -0600 Subject: [PATCH 15/28] Adds ability to automatically create the .schema --- R/sparklyr-spark-apply.R | 28 ++++++++++++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) diff --git a/R/sparklyr-spark-apply.R b/R/sparklyr-spark-apply.R index 2a4b6c5..8b21e54 100644 --- a/R/sparklyr-spark-apply.R +++ b/R/sparklyr-spark-apply.R @@ -36,7 +36,7 @@ spark_apply.tbl_pyspark <- function( sa_in_pandas( x = x, .f = f, - .schema = columns %||% "x double", + .schema = columns, .group_by = group_by, .as_sdf = fetch_result_as_sdf, .name = name, @@ -49,12 +49,36 @@ sa_in_pandas <- function( x, .f, ..., - .schema = "x double", + .schema = NULL, .group_by = NULL, .as_sdf = TRUE, .name = NULL, .barrier = NULL ) { + if(is.null(.schema)) { + r_fn <- .f %>% + sa_function_to_string( + .r_only = TRUE, + .group_by = .group_by, + ... = ... + ) %>% + rlang::parse_expr() %>% + eval() + r_df <- x %>% + head(10) %>% + collect() + r_exec <- r_fn(r_df) + .schema <- r_exec %>% + imap(~ { + x_class <- class(.x) + if("POSIXt" %in% x_class) x_class <- "timestamp" + if(x_class == "character") x_class <- "string" + if(x_class == "numeric") x_class <- "double" + if(x_class == "integer") x_class <- "long" + paste0(.y, " ", x_class) + }) %>% + paste0(collapse = ", ") + } .f %>% sa_function_to_string(.group_by = .group_by, ... = ...) %>% py_run_string() From ccb9b9c1db302080c9d528429946801bbe8a4a97 Mon Sep 17 00:00:00 2001 From: Edgar Ruiz Date: Thu, 1 Feb 2024 08:42:49 -0600 Subject: [PATCH 16/28] Adds message with schema if one is not passed, adds timestamp field conversion --- R/sparklyr-spark-apply.R | 54 ++++++++++++++++++++++++---------------- 1 file changed, 33 insertions(+), 21 deletions(-) diff --git a/R/sparklyr-spark-apply.R b/R/sparklyr-spark-apply.R index 8b21e54..b3dfe2d 100644 --- a/R/sparklyr-spark-apply.R +++ b/R/sparklyr-spark-apply.R @@ -15,23 +15,23 @@ spark_apply.tbl_pyspark <- function( auto_deps = FALSE, ...) { cli_div(theme = cli_colors()) - if(!is.null(packages)) { + if (!is.null(packages)) { cli_abort("`packages` is not yet supported for this backend") } - if(!is.null(context)) { + if (!is.null(context)) { cli_abort("`context` is not supported for this backend") } - if(auto_deps) { + if (auto_deps) { cli_abort("`auto_deps` is not supported for this backend") } - if(partition_index_param != "") { + if (partition_index_param != "") { cli_abort("`partition_index_param` is not supported for this backend") } - if(!is.null(arrow_max_records_per_batch)) { + if (!is.null(arrow_max_records_per_batch)) { cli_abort( "`arrow_max_records_per_batch` is not yet supported for this backend" ) - } + } cli_end() sa_in_pandas( x = x, @@ -42,7 +42,7 @@ spark_apply.tbl_pyspark <- function( .name = name, .barrier = barrier, ... = ... - ) + ) } sa_in_pandas <- function( @@ -50,18 +50,19 @@ sa_in_pandas <- function( .f, ..., .schema = NULL, + .schema_arg = "columns", .group_by = NULL, .as_sdf = TRUE, .name = NULL, - .barrier = NULL - ) { - if(is.null(.schema)) { + .barrier = NULL) { + schema_msg <- FALSE + if (is.null(.schema)) { r_fn <- .f %>% sa_function_to_string( .r_only = TRUE, .group_by = .group_by, ... = ... - ) %>% + ) %>% rlang::parse_expr() %>% eval() r_df <- x %>% @@ -71,20 +72,21 @@ sa_in_pandas <- function( .schema <- r_exec %>% imap(~ { x_class <- class(.x) - if("POSIXt" %in% x_class) x_class <- "timestamp" - if(x_class == "character") x_class <- "string" - if(x_class == "numeric") x_class <- "double" - if(x_class == "integer") x_class <- "long" + if ("POSIXt" %in% x_class) x_class <- "timestamp" + if (x_class == "character") x_class <- "string" + if (x_class == "numeric") x_class <- "double" + if (x_class == "integer") x_class <- "long" paste0(.y, " ", x_class) }) %>% paste0(collapse = ", ") + schema_msg <- TRUE } .f %>% sa_function_to_string(.group_by = .group_by, ... = ...) %>% py_run_string() main <- reticulate::import_main() df <- python_sdf(x) - if(is.null(df)) { + if (is.null(df)) { df <- x %>% compute() %>% python_sdf() @@ -97,23 +99,33 @@ sa_in_pandas <- function( p_df <- tbl_gp$applyInPandas( main$r_apply, schema = .schema - ) + ) } else { p_df <- df$mapInPandas( main$r_apply, schema = .schema, barrier = .barrier %||% FALSE - ) + ) } - if(.as_sdf) { + if (.as_sdf) { ret <- tbl_pyspark_temp( x = p_df, conn = spark_connection(x), tmp_name = .name - ) + ) } else { ret <- to_pandas_cleaned(p_df) } + if(schema_msg) { + schema_arg <- .schema_arg + schema <- .schema + cli_div(theme = cli_colors()) + cli_inform(c( + "{.header To increase performance, use the following schema:}", + "{.emph {schema_arg} = \"{schema}\" }" + )) + cli_end() + } ret } @@ -147,7 +159,7 @@ sa_function_to_string <- function(.f, .group_by = NULL, .r_only = FALSE, ...) { fn_str <- gsub("\"", "'", fn_str) fn_rep <- "function\\(\\.\\.\\.\\) 1" fn_r_new <- gsub(fn_rep, fn_str, fn_r) - if(.r_only) { + if (.r_only) { ret <- fn_r_new } else { ret <- gsub(fn_rep, fn_r_new, fn_python) From 4409e4b10bb91f4a4d9b79ca24733b4e3bd9c412 Mon Sep 17 00:00:00 2001 From: Edgar Ruiz Date: Thu, 1 Feb 2024 10:09:44 -0600 Subject: [PATCH 17/28] Adds support for maxRecordsPerBatch for Arrow --- R/sparklyr-spark-apply.R | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/R/sparklyr-spark-apply.R b/R/sparklyr-spark-apply.R index b3dfe2d..5306685 100644 --- a/R/sparklyr-spark-apply.R +++ b/R/sparklyr-spark-apply.R @@ -28,9 +28,16 @@ spark_apply.tbl_pyspark <- function( cli_abort("`partition_index_param` is not supported for this backend") } if (!is.null(arrow_max_records_per_batch)) { - cli_abort( - "`arrow_max_records_per_batch` is not yet supported for this backend" - ) + sc <- python_sdf(x)$sparkSession + conf_name <- "spark.sql.execution.arrow.maxRecordsPerBatch" + conf_curr <- sc$conf$get(conf_name) + conf_req <- as.character(arrow_max_records_per_batch) + if(conf_curr != conf_req) { + cli_div(theme = cli_colors()) + cli_inform("{.header Changing {conf_name} to: {conf_req}}") + cli_end() + sc$conf$set(conf_name, conf_req) + } } cli_end() sa_in_pandas( From 16456fbde829acd77135ead678cdf5d9ff1c99c8 Mon Sep 17 00:00:00 2001 From: Edgar Ruiz Date: Thu, 1 Feb 2024 10:18:13 -0600 Subject: [PATCH 18/28] Msg improvements --- R/sparklyr-spark-apply.R | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/R/sparklyr-spark-apply.R b/R/sparklyr-spark-apply.R index 5306685..5671bc9 100644 --- a/R/sparklyr-spark-apply.R +++ b/R/sparklyr-spark-apply.R @@ -34,7 +34,9 @@ spark_apply.tbl_pyspark <- function( conf_req <- as.character(arrow_max_records_per_batch) if(conf_curr != conf_req) { cli_div(theme = cli_colors()) - cli_inform("{.header Changing {conf_name} to: {conf_req}}") + cli_inform( + "{.header Changing {.emph {conf_name}} to: {prettyNum(conf_req, big.mark = ',')}}" + ) cli_end() sc$conf$set(conf_name, conf_req) } From 83078fdc54d3ec06eb1c4aa892662ac4891f70fc Mon Sep 17 00:00:00 2001 From: Edgar Ruiz Date: Thu, 1 Feb 2024 10:35:01 -0600 Subject: [PATCH 19/28] Updates DESCRIPTION and NEWS --- DESCRIPTION | 2 +- NEWS.md | 6 ++++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/DESCRIPTION b/DESCRIPTION index 5fcc279..4f9243f 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -1,6 +1,6 @@ Package: pysparklyr Title: Provides a 'PySpark' Back-End for the 'sparklyr' Package -Version: 0.1.3 +Version: 0.1.3.9000 Authors@R: c( person("Edgar", "Ruiz", , "edgar@posit.co", role = c("aut", "cre")), person(given = "Posit Software, PBC", role = c("cph", "fnd")) diff --git a/NEWS.md b/NEWS.md index 52eafe4..47310e0 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,3 +1,9 @@ +# pysparklyr dev + +### New + +* Adds support for `spark_apply()` via the `rpy2` Python library. + # pysparklyr 0.1.3 ### New From 538f95660505a78363c3f8cd04493a8e0798f937 Mon Sep 17 00:00:00 2001 From: Edgar Ruiz Date: Thu, 1 Feb 2024 11:26:47 -0600 Subject: [PATCH 20/28] Adds rpy2 to installation list --- R/python-install.R | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/R/python-install.R b/R/python-install.R index 5bdc4f4..05286ae 100644 --- a/R/python-install.R +++ b/R/python-install.R @@ -217,7 +217,8 @@ install_environment <- function( "PyArrow", "grpcio", "google-api-python-client", - "grpcio_status" + "grpcio_status", + "rpy2" ) if (add_torch && install_ml) { From 666d1d8e5ce8d003b30372911addd9649e01b338 Mon Sep 17 00:00:00 2001 From: Edgar Ruiz Date: Thu, 1 Feb 2024 11:27:19 -0600 Subject: [PATCH 21/28] Makes the ML library checker into a generic one, and uses it to install `rpy2` on the fly --- R/ml-utils.R | 30 +++++------------------------- R/sparklyr-spark-apply.R | 4 ++++ R/utils.R | 30 ++++++++++++++++++++++++++++++ 3 files changed, 39 insertions(+), 25 deletions(-) diff --git a/R/ml-utils.R b/R/ml-utils.R index 3fb5ba8..e16a9ad 100644 --- a/R/ml-utils.R +++ b/R/ml-utils.R @@ -130,29 +130,9 @@ get_params <- function(x) { } ml_installed <- function(envname = NULL) { - ml_libraries <- pysparklyr_env$ml_libraries - installed_libraries <- py_list_packages(envname = envname)$package - find_ml <- map_lgl(ml_libraries, ~ .x %in% installed_libraries) - if (!all(find_ml)) { - cli_div(theme = cli_colors()) - msg1 <- "Required Python libraries to run ML functions are missing" - if (check_interactive()) { - missing_ml <- ml_libraries[!find_ml] - cli_alert_warning(msg1) - cli_bullets(c( - " " = "{.header Could not find: {missing_ml}}", - " " = "Do you wish to install? {.class (This will be a one time operation)}" - )) - choice <- menu(choices = c("Yes", "Cancel")) - if (choice == 1) { - py_install(missing_ml) - } - if (choice == 2) { - stop_quietly() - } - } else { - cli_abort(msg1) - } - cli_end() - } + py_check_installed( + envname = envname, + libraries = pysparklyr_env$ml_libraries, + msg = "Required Python libraries to run ML functions are missing" + ) } diff --git a/R/sparklyr-spark-apply.R b/R/sparklyr-spark-apply.R index 5671bc9..fc4d1ee 100644 --- a/R/sparklyr-spark-apply.R +++ b/R/sparklyr-spark-apply.R @@ -14,6 +14,10 @@ spark_apply.tbl_pyspark <- function( arrow_max_records_per_batch = NULL, auto_deps = FALSE, ...) { + py_check_installed( + libraries = "rpy2", + msg = "Requires an additional Python library" + ) cli_div(theme = cli_colors()) if (!is.null(packages)) { cli_abort("`packages` is not yet supported for this backend") diff --git a/R/utils.R b/R/utils.R index 57de702..50a72aa 100644 --- a/R/utils.R +++ b/R/utils.R @@ -93,6 +93,36 @@ current_product_connect <- function() { out } +py_check_installed <- function( + envname = NULL, + libraries = "", + msg = "" + ) { + installed_libraries <- py_list_packages(envname = envname)$package + find_libs <- map_lgl(libraries, ~ .x %in% installed_libraries) + if (!all(find_libs)) { + cli_div(theme = cli_colors()) + if (check_interactive()) { + missing_lib <- libraries[!find_libs] + cli_alert_warning(msg) + cli_bullets(c( + " " = "{.header Could not find: {missing_lib}}", + " " = "Do you wish to install? {.class (This will be a one time operation)}" + )) + choice <- menu(choices = c("Yes", "Cancel")) + if (choice == 1) { + py_install(missing_lib) + } + if (choice == 2) { + stop_quietly() + } + } else { + cli_abort(msg) + } + cli_end() + } +} + stop_quietly <- function() { opt <- options(show.error.messages = FALSE) on.exit(options(opt)) From dad56db8d93b2ab4bdea3a5a9c8bc82fc76cf6f3 Mon Sep 17 00:00:00 2001 From: Edgar Ruiz Date: Thu, 1 Feb 2024 14:56:23 -0600 Subject: [PATCH 22/28] Converts periods to underscores in colnames --- R/sparklyr-spark-apply.R | 35 +++++++++++++++++++++++++++++++++-- inst/udf/udf-apply.R | 5 ++++- inst/udf/udf-map.R | 2 ++ 3 files changed, 39 insertions(+), 3 deletions(-) diff --git a/R/sparklyr-spark-apply.R b/R/sparklyr-spark-apply.R index fc4d1ee..264e545 100644 --- a/R/sparklyr-spark-apply.R +++ b/R/sparklyr-spark-apply.R @@ -74,6 +74,7 @@ sa_in_pandas <- function( sa_function_to_string( .r_only = TRUE, .group_by = .group_by, + .colnames = NULL, ... = ... ) %>% rlang::parse_expr() %>% @@ -82,6 +83,9 @@ sa_in_pandas <- function( head(10) %>% collect() r_exec <- r_fn(r_df) + col_names <- colnames(r_exec) + col_names <- gsub("\\.", "_", col_names) + colnames(r_exec) <- col_names .schema <- r_exec %>% imap(~ { x_class <- class(.x) @@ -93,9 +97,17 @@ sa_in_pandas <- function( }) %>% paste0(collapse = ", ") schema_msg <- TRUE + } else { + fields <- unlist(strsplit(.schema, ",")) + col_names <- map_chr(fields, ~ unlist(strsplit(trimws(.x), " "))[[1]]) + col_names <- gsub("\\.", "_", col_names) } .f %>% - sa_function_to_string(.group_by = .group_by, ... = ...) %>% + sa_function_to_string( + .group_by = .group_by, + .colnames = col_names, + ... = ... + ) %>% py_run_string() main <- reticulate::import_main() df <- python_sdf(x) @@ -142,8 +154,17 @@ sa_in_pandas <- function( ret } -sa_function_to_string <- function(.f, .group_by = NULL, .r_only = FALSE, ...) { +sa_function_to_string <- function( + .f, + .group_by = NULL, + .r_only = FALSE, + .colnames = NULL, + ... + ) { path_scripts <- system.file("udf", package = "pysparklyr") + if(dir_exists("inst/udf")) { + path_scripts <- path_expand("inst/udf") + } udf_fn <- ifelse(is.null(.group_by), "map", "apply") fn_r <- paste0( readLines(path(path_scripts, glue("udf-{udf_fn}.R"))), @@ -160,6 +181,16 @@ sa_function_to_string <- function(.f, .group_by = NULL, .r_only = FALSE, ...) { fn_r ) } + if(is.null(.colnames)) { + .colnames <- "NULL" + } else { + .colnames <- paste0("'", .colnames, "'", collapse = ", ") + } + fn_r <- gsub( + "col_names <- c\\('am', 'x'\\)", + paste0("col_names <- c(", .colnames, ")"), + fn_r + ) fn <- purrr::as_mapper(.f = .f, ... = ...) fn_str <- paste0(deparse(fn), collapse = "") if (inherits(fn, "rlang_lambda_function")) { diff --git a/inst/udf/udf-apply.R b/inst/udf/udf-apply.R index dff682e..ea6a62b 100644 --- a/inst/udf/udf-apply.R +++ b/inst/udf/udf-apply.R @@ -3,6 +3,7 @@ function(df = mtcars) { fn <- function(...) 1; fn_run <- fn(df); gp_field <- 'am'; + col_names <- c('am', 'x'); gp <- df[1, gp_field]; if(is.vector(fn_run)) { ret <- data.frame(x = fn_run); @@ -13,5 +14,7 @@ function(df = mtcars) { cols <- colnames(ret); gp_cols <- cols == gp_field; new_cols <- c(cols[gp_cols], cols[!gp_cols]); - ret[, new_cols] + ret <- ret[, new_cols]; + if(!is.null(col_names)) colnames(ret) <- col_names; + ret } diff --git a/inst/udf/udf-map.R b/inst/udf/udf-map.R index 2838bc8..1638d27 100644 --- a/inst/udf/udf-map.R +++ b/inst/udf/udf-map.R @@ -1,11 +1,13 @@ function(df = mtcars) { library(arrow); fn <- function(...) 1; + col_names <- c('am', 'x'); fn_run <- fn(df); if(is.vector(fn_run)) { ret <- data.frame(x = fn_run); } else { ret <- as.data.frame(fn_run); }; + if(!is.null(col_names)) colnames(ret) <- col_names; ret } From 6b2110cb4c87c649b4dc99c9a4d2cb78a75296dd Mon Sep 17 00:00:00 2001 From: Edgar Ruiz Date: Fri, 2 Feb 2024 11:26:30 -0600 Subject: [PATCH 23/28] Initial tests --- tests/testthat/test-sparklyr-spark-apply.R | 15 +++++++++++++++ 1 file changed, 15 insertions(+) create mode 100644 tests/testthat/test-sparklyr-spark-apply.R diff --git a/tests/testthat/test-sparklyr-spark-apply.R b/tests/testthat/test-sparklyr-spark-apply.R new file mode 100644 index 0000000..879321c --- /dev/null +++ b/tests/testthat/test-sparklyr-spark-apply.R @@ -0,0 +1,15 @@ +test_that("spark_apply() works", { + tbl_mtcars <- use_test_table_mtcars() + expect_s3_class( + spark_apply(tbl_mtcars, nrow, group_by = "am"), + "tbl_spark" + ) + expect_s3_class( + spark_apply(tbl_mtcars, function(x) x), + "tbl_spark" + ) + expect_s3_class( + spark_apply(tbl_mtcars, function(x) x, fetch_result_as_sdf = FALSE), + "data.frame" + ) +}) From b725615c19ce3a97fce71206b123e07f0fdadefd Mon Sep 17 00:00:00 2001 From: Edgar Ruiz Date: Fri, 2 Feb 2024 12:12:08 -0600 Subject: [PATCH 24/28] Adds more tests --- tests/testthat/test-sparklyr-spark-apply.R | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/tests/testthat/test-sparklyr-spark-apply.R b/tests/testthat/test-sparklyr-spark-apply.R index 879321c..be14e26 100644 --- a/tests/testthat/test-sparklyr-spark-apply.R +++ b/tests/testthat/test-sparklyr-spark-apply.R @@ -1,7 +1,7 @@ test_that("spark_apply() works", { tbl_mtcars <- use_test_table_mtcars() expect_s3_class( - spark_apply(tbl_mtcars, nrow, group_by = "am"), + spark_apply(tbl_mtcars, nrow, group_by = "am", columns = "am double, x long"), "tbl_spark" ) expect_s3_class( @@ -12,4 +12,23 @@ test_that("spark_apply() works", { spark_apply(tbl_mtcars, function(x) x, fetch_result_as_sdf = FALSE), "data.frame" ) + expect_s3_class( + spark_apply(tbl_mtcars, function(x) x, arrow_max_records_per_batch = 5000), + "tbl_spark" + ) + expect_s3_class( + spark_apply(tbl_mtcars, ~ .x), + "tbl_spark" + ) + expect_s3_class( + spark_apply(dplyr::filter(tbl_mtcars, am == 0), ~ .x), + "tbl_spark" + ) +}) + +test_that("Errors are output by specific params", { + expect_error(spark_apply(tbl_mtcars, nrow, packages = "test")) + expect_error(spark_apply(tbl_mtcars, nrow, context = "")) + expect_error(spark_apply(tbl_mtcars, nrow, auto_deps = TRUE)) + expect_error(spark_apply(tbl_mtcars, nrow, partition_index_param = "test")) }) From 55c19f1fed96fc1cfe62df3834ecb492023764d6 Mon Sep 17 00:00:00 2001 From: Edgar Ruiz Date: Fri, 2 Feb 2024 13:25:20 -0600 Subject: [PATCH 25/28] Updates installation tests snapshots, changes sparklyr dep to use dev version --- DESCRIPTION | 4 +++- tests/testthat/_snaps/python-install.md | 11 ++++++----- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/DESCRIPTION b/DESCRIPTION index 4f9243f..e499b23 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -22,7 +22,7 @@ Imports: reticulate (>= 1.33), methods, rlang, - sparklyr (>= 1.8.4), + sparklyr (>= 1.8.4.9004), tidyselect, fs, magrittr, @@ -41,3 +41,5 @@ Suggests: tibble, withr Config/testthat/edition: 3 +Remotes: + sparklyr/sparklyr diff --git a/tests/testthat/_snaps/python-install.md b/tests/testthat/_snaps/python-install.md index f0c1aec..7cad6c0 100644 --- a/tests/testthat/_snaps/python-install.md +++ b/tests/testthat/_snaps/python-install.md @@ -4,11 +4,11 @@ x Output $packages - [1] "pyspark==3.5.0" "pandas!=2.1.0" - [3] "PyArrow" "grpcio" - [5] "google-api-python-client" "grpcio_status" - [7] "torch" "torcheval" - [9] "scikit-learn" + [1] "pyspark==3.5.0" "pandas!=2.1.0" + [3] "PyArrow" "grpcio" + [5] "google-api-python-client" "grpcio_status" + [7] "rpy2" "torch" + [9] "torcheval" "scikit-learn" $envname unavailable @@ -37,6 +37,7 @@ [1] "pyspark==3.5.*" "pandas!=2.1.0" [3] "PyArrow" "grpcio" [5] "google-api-python-client" "grpcio_status" + [7] "rpy2" $envname unavailable From 8ed2e18633e473da55ba0b8f37e46db26a96f8c3 Mon Sep 17 00:00:00 2001 From: Edgar Ruiz Date: Fri, 2 Feb 2024 13:44:43 -0600 Subject: [PATCH 26/28] Updates data-write and dplyr snapshots. Needed after updating to sparklyr dev --- tests/testthat/_snaps/data-write.md | 18 ++++++++++++------ tests/testthat/_snaps/dplyr.md | 14 +++++++++----- 2 files changed, 21 insertions(+), 11 deletions(-) diff --git a/tests/testthat/_snaps/data-write.md b/tests/testthat/_snaps/data-write.md index 6777464..d8cc2e1 100644 --- a/tests/testthat/_snaps/data-write.md +++ b/tests/testthat/_snaps/data-write.md @@ -4,7 +4,8 @@ spark_read_csv(sc = sc, name = "csv_1", path = file_name, overwrite = TRUE, repartition = 2) Output - # Source: spark [?? x 11] + # Source: table [?? x 11] + # Database: spark_connection mpg cyl disp hp drat wt qsec vs am gear carb 1 21 6 160 110 3.9 2.62 16.5 0 1 4 4 @@ -25,7 +26,8 @@ spark_read_csv(sc = sc, name = "csv_2", path = file_name, overwrite = TRUE, columns = paste0(names(mtcars), "t")) Output - # Source: spark [?? x 11] + # Source: table [?? x 11] + # Database: spark_connection mpgt cylt dispt hpt dratt wtt qsect vst amt geart carbt 1 21.0 6.0 160.0 110.0 3.9 2.62 16.46 0.0 1.0 4.0 4.0 @@ -46,7 +48,8 @@ spark_read_csv(sc = sc, name = "csv_3", path = file_name, overwrite = TRUE, memory = TRUE) Output - # Source: spark [?? x 11] + # Source: table [?? x 11] + # Database: spark_connection mpg cyl disp hp drat wt qsec vs am gear carb 1 21 6 160 110 3.9 2.62 16.5 0 1 4 4 @@ -66,7 +69,8 @@ Code spark_read_parquet(sc, "csv_1", file_name, overwrite = TRUE) Output - # Source: spark [?? x 11] + # Source: table [?? x 11] + # Database: spark_connection mpg cyl disp hp drat wt qsec vs am gear carb 1 21 6 160 110 3.9 2.62 16.5 0 1 4 4 @@ -86,7 +90,8 @@ Code spark_read_orc(sc, "csv_1", file_name, overwrite = TRUE) Output - # Source: spark [?? x 11] + # Source: table [?? x 11] + # Database: spark_connection mpg cyl disp hp drat wt qsec vs am gear carb 1 21 6 160 110 3.9 2.62 16.5 0 1 4 4 @@ -106,7 +111,8 @@ Code spark_read_json(sc, "csv_1", file_name, overwrite = TRUE) Output - # Source: spark [?? x 11] + # Source: table [?? x 11] + # Database: spark_connection am carb cyl disp drat gear hp mpg qsec vs wt 1 1 4 6 160 3.9 4 110 21 16.5 0 2.62 diff --git a/tests/testthat/_snaps/dplyr.md b/tests/testthat/_snaps/dplyr.md index 736d66e..ab91029 100644 --- a/tests/testthat/_snaps/dplyr.md +++ b/tests/testthat/_snaps/dplyr.md @@ -3,7 +3,8 @@ Code tbl_ordered Output - # Source: spark [?? x 11] + # Source: SQL [?? x 11] + # Database: spark_connection # Ordered by: mpg, qsec, hp mpg cyl disp hp drat wt qsec vs am gear carb @@ -24,7 +25,8 @@ Code print(head(tbl_ordered)) Output - # Source: spark [?? x 11] + # Source: SQL [6 x 11] + # Database: spark_connection # Ordered by: mpg, qsec, hp mpg cyl disp hp drat wt qsec vs am gear carb @@ -40,8 +42,9 @@ Code tbl_am[1] Output - # Source: spark [?? x 1] - # Groups: am + # Source: SQL [2 x 1] + # Database: spark_connection + # Groups: am am 1 0 @@ -52,7 +55,8 @@ Code tbl_join Output - # Source: spark [?? x 11] + # Source: SQL [?? x 11] + # Database: spark_connection # Ordered by: mpg, qsec, hp mpg cyl disp hp drat wt qsec vs am gear carb From 48c471545a6aa8d080c2afc8d438be28ca429c3e Mon Sep 17 00:00:00 2001 From: Edgar Ruiz Date: Fri, 2 Feb 2024 14:12:19 -0600 Subject: [PATCH 27/28] Updates pivot_longer tests snapshot, adds arrow package to CI --- .github/workflows/spark-tests.yaml | 4 +++- .github/workflows/test-coverage.yaml | 1 + tests/testthat/_snaps/pivot-longer.md | 3 ++- 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/.github/workflows/spark-tests.yaml b/.github/workflows/spark-tests.yaml index a726221..6cd45cf 100644 --- a/.github/workflows/spark-tests.yaml +++ b/.github/workflows/spark-tests.yaml @@ -38,7 +38,9 @@ jobs: - uses: r-lib/actions/setup-r-dependencies@v2 with: - extra-packages: any::devtools + extra-packages: | + any::devtools + any::arrow needs: check - name: Set up Python 3.10 diff --git a/.github/workflows/test-coverage.yaml b/.github/workflows/test-coverage.yaml index dd88f03..a45c4f8 100644 --- a/.github/workflows/test-coverage.yaml +++ b/.github/workflows/test-coverage.yaml @@ -33,6 +33,7 @@ jobs: extra-packages: | any::covr any::devtools + any::arrow needs: coverage - name: Cache Spark diff --git a/tests/testthat/_snaps/pivot-longer.md b/tests/testthat/_snaps/pivot-longer.md index 7fe0842..7c47f0d 100644 --- a/tests/testthat/_snaps/pivot-longer.md +++ b/tests/testthat/_snaps/pivot-longer.md @@ -3,7 +3,8 @@ Code tbl_pivot %>% tidyr::pivot_longer(-id, names_to = c(".value", "n"), names_sep = "_") Output - # Source: spark [?? x 5] + # Source: table [4 x 5] + # Database: spark_connection id n z y x 1 A 1 1 2 3 From a3005101b99813bab59d7e936de2559e884b159a Mon Sep 17 00:00:00 2001 From: Edgar Ruiz Date: Fri, 2 Feb 2024 14:55:07 -0600 Subject: [PATCH 28/28] Accounts for new Spark dataframe header output, skips spark_apply() with barrier if < Spark 3.5 --- tests/testthat/_snaps/pivot-longer.md | 6 +++--- tests/testthat/test-pivot-longer.R | 3 ++- tests/testthat/test-sparklyr-spark-apply.R | 1 + 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/tests/testthat/_snaps/pivot-longer.md b/tests/testthat/_snaps/pivot-longer.md index 7c47f0d..775f2e5 100644 --- a/tests/testthat/_snaps/pivot-longer.md +++ b/tests/testthat/_snaps/pivot-longer.md @@ -1,10 +1,10 @@ # Pivot longer Code - tbl_pivot %>% tidyr::pivot_longer(-id, names_to = c(".value", "n"), names_sep = "_") + tbl_pivot %>% tidyr::pivot_longer(-id, names_to = c(".value", "n"), names_sep = "_") %>% + collect() Output - # Source: table [4 x 5] - # Database: spark_connection + # A tibble: 4 x 5 id n z y x 1 A 1 1 2 3 diff --git a/tests/testthat/test-pivot-longer.R b/tests/testthat/test-pivot-longer.R index 91f9937..31a049a 100644 --- a/tests/testthat/test-pivot-longer.R +++ b/tests/testthat/test-pivot-longer.R @@ -12,7 +12,8 @@ test_that("Pivot longer", { tidyr::pivot_longer( -id, names_to = c(".value", "n"), names_sep = "_" - ) + ) %>% + collect() ) }) diff --git a/tests/testthat/test-sparklyr-spark-apply.R b/tests/testthat/test-sparklyr-spark-apply.R index be14e26..3784239 100644 --- a/tests/testthat/test-sparklyr-spark-apply.R +++ b/tests/testthat/test-sparklyr-spark-apply.R @@ -4,6 +4,7 @@ test_that("spark_apply() works", { spark_apply(tbl_mtcars, nrow, group_by = "am", columns = "am double, x long"), "tbl_spark" ) + skip_spark_min_version(3.5) expect_s3_class( spark_apply(tbl_mtcars, function(x) x), "tbl_spark"