From 462f3efcf1209605fb0960cfeced01860a7d87cb Mon Sep 17 00:00:00 2001 From: Edgar Ruiz Date: Fri, 29 Sep 2023 19:06:04 -0500 Subject: [PATCH 1/3] Fixes #32 --- R/connections-pane.R | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/R/connections-pane.R b/R/connections-pane.R index 23ddcce..b2accca 100644 --- a/R/connections-pane.R +++ b/R/connections-pane.R @@ -10,6 +10,10 @@ spark_ide_objects.pyspark_connection <- function( df_tables <- data.frame() df_cat <- data.frame() + limit <- as.numeric( + Sys.getenv("SPARKLYR_CONNECTION_OBJECT_LIMIT", unset = 1000) + ) + sc_catalog <- python_conn(con)$catalog current_catalog <- sc_catalog$currentCatalog() if (is.null(catalog)) { @@ -18,8 +22,7 @@ spark_ide_objects.pyspark_connection <- function( if (length(tables) > 0) { temps <- tables[map_lgl(tables, ~ .x$isTemporary)] df_tables <- temps %>% - rs_tables() %>% - rbind(df_cat) + rs_tables() } catalogs <- sc_catalog$listCatalogs() @@ -27,15 +30,15 @@ spark_ide_objects.pyspark_connection <- function( df_catalogs <- data.frame(name = map_chr(catalogs, ~ .x$name)) df_catalogs$type <- "catalog" } - - out <- rbind(df_tables, df_catalogs) + comb <- rbind(df_tables, df_catalogs) + out <- head(comb, limit) } else { sc_catalog$setCurrentCatalog(catalog) if (is.null(schema)) { databases <- sc_catalog$listDatabases() df_databases <- data.frame(name = map_chr(databases, ~ .x$name)) df_databases$type <- "schema" - out <- df_databases + out <- head(df_databases, limit) } else { tables <- sc_catalog$listTables(dbName = schema) if (length(tables) > 0) { @@ -48,7 +51,7 @@ spark_ide_objects.pyspark_connection <- function( tables <- tables[schemas] df_tables <- rs_tables(tables) } - out <- df_tables + out <- head(df_tables, limit) } } From 21287ebcb98124061599c408600c26df99a1c53e Mon Sep 17 00:00:00 2001 From: Edgar Ruiz Date: Fri, 29 Sep 2023 19:17:00 -0500 Subject: [PATCH 2/3] Fixes #38 --- R/spark-connect.R | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/R/spark-connect.R b/R/spark-connect.R index ce69add..1b29db8 100644 --- a/R/spark-connect.R +++ b/R/spark-connect.R @@ -250,6 +250,11 @@ build_user_agent <- function() { in_rstudio <- FALSE in_connect <- FALSE + env_var <- Sys.getenv("SPARK_CONNECT_USER_AGENT", unset = NA) + if(!is.na(env_var)) { + return(env_var) + } + if (Sys.getenv("RSTUDIO_PRODUCT") == "CONNECT") { product <- "posit-connect" } From 63047dbcd5879176dbf9aed37a9df49acf4a97da Mon Sep 17 00:00:00 2001 From: Edgar Ruiz Date: Fri, 29 Sep 2023 19:17:48 -0500 Subject: [PATCH 3/3] styler updates --- R/connections-pane.R | 2 +- R/import-check.R | 8 +++--- R/install.R | 13 ++++------ R/local-connect.R | 2 +- R/pyspark-connection.R | 2 +- R/spark-connect.R | 55 +++++++++++++++++++++--------------------- 6 files changed, 39 insertions(+), 43 deletions(-) diff --git a/R/connections-pane.R b/R/connections-pane.R index b2accca..40aab3e 100644 --- a/R/connections-pane.R +++ b/R/connections-pane.R @@ -12,7 +12,7 @@ spark_ide_objects.pyspark_connection <- function( limit <- as.numeric( Sys.getenv("SPARKLYR_CONNECTION_OBJECT_LIMIT", unset = 1000) - ) + ) sc_catalog <- python_conn(con)$catalog current_catalog <- sc_catalog$currentCatalog() diff --git a/R/import-check.R b/R/import-check.R index 6d6e95c..cc42ba1 100644 --- a/R/import-check.R +++ b/R/import-check.R @@ -31,14 +31,14 @@ import_check <- function(x, envname) { inst <- NULL - if(substr(envname, 1, 22) == "r-sparklyr-databricks-") { + if (substr(envname, 1, 22) == "r-sparklyr-databricks-") { inst <- paste0( " {.run pysparklyr::install_databricks(", "envname = \"{envname}\")}" ) } - if(substr(envname, 1, 19) == "r-sparklyr-pyspark-") { + if (substr(envname, 1, 19) == "r-sparklyr-pyspark-") { inst <- paste0( " {.run pysparklyr::install_pyspark(", "envname = \"{envname}\")}" @@ -47,7 +47,7 @@ import_check <- function(x, envname) { msg_install <- NULL msg_restart <- NULL - if(!is.null(inst)) { + if (!is.null(inst)) { # msg_install <- paste("{.header - Use} ", inst, "{.header to install.}") # msg_restart <- paste("- Restart your R session, and run:", inst) } @@ -115,7 +115,7 @@ env_type <- function(envname) { } if (is.na(ret)) { check_conda <- try(conda_python(envname), silent = TRUE) - if(!inherits(check_conda, "try-error")) { + if (!inherits(check_conda, "try-error")) { ret <- "conda" } } diff --git a/R/install.R b/R/install.R index 5e96520..fcb7972 100644 --- a/R/install.R +++ b/R/install.R @@ -20,7 +20,6 @@ install_pyspark <- function( new_env = TRUE, method = c("auto", "virtualenv", "conda"), ...) { - install_environment( libs = "pyspark", version = version, @@ -47,7 +46,6 @@ install_databricks <- function( new_env = TRUE, method = c("auto", "virtualenv", "conda"), ...) { - if (!is.null(version) && !is.null(cluster_id)) { cli_div(theme = cli_colors()) cli_alert_warning( @@ -82,7 +80,6 @@ install_environment <- function( new_env = NULL, method = c("auto", "virtualenv", "conda"), ...) { - if (is.null(version)) { cli_div(theme = cli_colors()) cli_alert_success( @@ -94,10 +91,10 @@ install_environment <- function( cli_end() } else { lib <- py_library_info(libs, version) - if(is.null(lib)) { + if (is.null(lib)) { cli_alert_success( "{.header Checking if provided version is valid against PyPi.org}" - ) + ) cli_abort( "{.header Version } {.emph '{version}' }{.header does not exist}" ) @@ -106,12 +103,12 @@ install_environment <- function( ver_name <- version_prep(version) - if(version == ver_name) { + if (version == ver_name) { version <- paste0(version, ".*") } if (is.null(envname)) { - if(libs == "databricks-connect") { + if (libs == "databricks-connect") { ln <- "databricks" } else { ln <- libs @@ -234,7 +231,7 @@ require_python <- function(package, version) { py_library_info <- function(lib, ver = NULL) { url <- paste0("https://pypi.org/pypi/", lib) - if(!is.null(ver)) { + if (!is.null(ver)) { url <- paste0(url, "/", ver) } url <- paste0(url, "/json") diff --git a/R/local-connect.R b/R/local-connect.R index d5f71de..b27cc05 100644 --- a/R/local-connect.R +++ b/R/local-connect.R @@ -16,7 +16,7 @@ spark_connect_service_start <- function(version = "3.4", "--packages", glue("org.apache.spark:spark-connect_{scala_version}:{get_version$sparkVersion}") ) - if(!include_args) { + if (!include_args) { args <- "" } prs <- process$new( diff --git a/R/pyspark-connection.R b/R/pyspark-connection.R index 02333f3..9563da0 100644 --- a/R/pyspark-connection.R +++ b/R/pyspark-connection.R @@ -35,7 +35,7 @@ spark_session.pyspark_connection <- function(sc) { # should be more flexible, taking row specs, multiple args, etc. matching # semantics of R dataframes and take advantage of # reticulate::`[.python.builtin.object` for constructing slices, etc. - if(is.null(i)) { + if (is.null(i)) { # special case, since pyspark has no "emptyDataFrame" method to invoke sc <- spark_connection(x) diff --git a/R/spark-connect.R b/R/spark-connect.R index 1b29db8..fb01faa 100644 --- a/R/spark-connect.R +++ b/R/spark-connect.R @@ -33,7 +33,6 @@ spark_connect_method.spark_method_databricks_connect <- function( extensions, scala_version, ...) { - py_spark_connect( master = master, method = method, @@ -56,42 +55,45 @@ py_spark_connect <- function(master, conn <- NULL if (method == "spark_connect") { - - if(is.null(envname)) { + if (is.null(envname)) { env_base <- "r-sparklyr-pyspark-" envs <- find_environments(env_base) - if(length(envs) == 0) { + if (length(envs) == 0) { cli_div(theme = cli_colors()) cli_abort(c( - paste0("{.header No environment name provided, and no environment was }", - "{.header automatically identified.}"), + paste0( + "{.header No environment name provided, and no environment was }", + "{.header automatically identified.}" + ), "Run {.run pysparklyr::install_pyspark()} to install." ), call = NULL) cli_end() } else { - if(!is.null(spark_version)) { + if (!is.null(spark_version)) { sp_version <- version_prep(spark_version) envname <- glue("{env_base}{sp_version}") matched <- envs[envs == envname] - if(length(matched) == 0) { + if (length(matched) == 0) { envname <- envs[[1]] cli_div(theme = cli_colors()) cli_alert_warning(paste( "{.header A Python environment with a matching version was not found}", "* {.header Will attempt connecting using }{.emph '{envname}'}", - paste0("* {.header To install the proper Python environment use:}", - " {.run pysparklyr::install_pyspark(version = \"{sp_version}\")}" + paste0( + "* {.header To install the proper Python environment use:}", + " {.run pysparklyr::install_pyspark(version = \"{sp_version}\")}" ), sep = "\n" )) cli_end() } else { - envname <- matched - } + envname <- matched + } } else { envname <- envs[[1]] + } } - }} + } pyspark <- import_check("pyspark", envname) pyspark_sql <- pyspark$sql @@ -103,12 +105,12 @@ py_spark_connect <- function(master, if (method == "databricks_connect") { cluster_id <- cluster_id %||% Sys.getenv("DATABRICKS_CLUSTER_ID") master <- master %||% Sys.getenv("DATABRICKS_HOST") - if(is.null(dbr_version)) { + if (is.null(dbr_version)) { dbr <- cluster_dbr_version( cluster_id = cluster_id, host = master, token = token - ) + ) } else { dbr <- version_prep(dbr_version) } @@ -117,17 +119,18 @@ py_spark_connect <- function(master, envname <- glue("{env_base}{dbr}") envs <- find_environments(env_base) matched <- envs[envs == envname] - if(length(matched) == 0) { + if (length(matched) == 0) { envname <- envs[[1]] cli_div(theme = cli_colors()) cli_alert_warning(paste( "{.header A Python environment with a matching version was not found}", "* {.header Will attempt connecting using }{.emph '{envname}'}", - paste0("* {.header To install the proper Python environment use:}", - " {.run pysparklyr::install_databricks(version = \"{dbr}\")}" - ), + paste0( + "* {.header To install the proper Python environment use:}", + " {.run pysparklyr::install_databricks(version = \"{dbr}\")}" + ), sep = "\n" - )) + )) cli_end() } @@ -251,7 +254,7 @@ build_user_agent <- function() { in_connect <- FALSE env_var <- Sys.getenv("SPARK_CONNECT_USER_AGENT", unset = NA) - if(!is.na(env_var)) { + if (!is.na(env_var)) { return(env_var) } @@ -297,9 +300,7 @@ build_user_agent <- function() { cluster_dbr_version <- function(cluster_id, host = Sys.getenv("DATABRICKS_HOST"), - token = Sys.getenv("DATABRICKS_TOKEN") - ) { - + token = Sys.getenv("DATABRICKS_TOKEN")) { cli_div(theme = cli_colors()) cli_alert_warning( "{.header Retrieving version from cluster }{.emph '{cluster_id}'}" @@ -325,13 +326,11 @@ cluster_dbr_version <- function(cluster_id, cluster_dbr_info <- function(cluster_id, host = Sys.getenv("DATABRICKS_HOST"), - token = Sys.getenv("DATABRICKS_TOKEN") -) { - + token = Sys.getenv("DATABRICKS_TOKEN")) { paste0( host, "/api/2.0/clusters/get" - ) %>% + ) %>% request() %>% req_auth_bearer_token(token) %>% req_body_json(list(cluster_id = cluster_id)) %>%