Skip to content

Commit

Permalink
Merge pull request #40 from mlverse/updates
Browse files Browse the repository at this point in the history
Updates
  • Loading branch information
edgararuiz authored Sep 30, 2023
2 parents 45fd217 + 63047db commit d70a151
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 47 deletions.
15 changes: 9 additions & 6 deletions R/connections-pane.R
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ spark_ide_objects.pyspark_connection <- function(
df_tables <- data.frame()
df_cat <- data.frame()

limit <- as.numeric(
Sys.getenv("SPARKLYR_CONNECTION_OBJECT_LIMIT", unset = 1000)
)

sc_catalog <- python_conn(con)$catalog
current_catalog <- sc_catalog$currentCatalog()
if (is.null(catalog)) {
Expand All @@ -18,24 +22,23 @@ spark_ide_objects.pyspark_connection <- function(
if (length(tables) > 0) {
temps <- tables[map_lgl(tables, ~ .x$isTemporary)]
df_tables <- temps %>%
rs_tables() %>%
rbind(df_cat)
rs_tables()
}

catalogs <- sc_catalog$listCatalogs()
if (length(catalogs) > 0) {
df_catalogs <- data.frame(name = map_chr(catalogs, ~ .x$name))
df_catalogs$type <- "catalog"
}

out <- rbind(df_tables, df_catalogs)
comb <- rbind(df_tables, df_catalogs)
out <- head(comb, limit)
} else {
sc_catalog$setCurrentCatalog(catalog)
if (is.null(schema)) {
databases <- sc_catalog$listDatabases()
df_databases <- data.frame(name = map_chr(databases, ~ .x$name))
df_databases$type <- "schema"
out <- df_databases
out <- head(df_databases, limit)
} else {
tables <- sc_catalog$listTables(dbName = schema)
if (length(tables) > 0) {
Expand All @@ -48,7 +51,7 @@ spark_ide_objects.pyspark_connection <- function(
tables <- tables[schemas]
df_tables <- rs_tables(tables)
}
out <- df_tables
out <- head(df_tables, limit)
}
}

Expand Down
8 changes: 4 additions & 4 deletions R/import-check.R
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@ import_check <- function(x, envname) {

inst <- NULL

if(substr(envname, 1, 22) == "r-sparklyr-databricks-") {
if (substr(envname, 1, 22) == "r-sparklyr-databricks-") {
inst <- paste0(
" {.run pysparklyr::install_databricks(",
"envname = \"{envname}\")}"
)
}

if(substr(envname, 1, 19) == "r-sparklyr-pyspark-") {
if (substr(envname, 1, 19) == "r-sparklyr-pyspark-") {
inst <- paste0(
" {.run pysparklyr::install_pyspark(",
"envname = \"{envname}\")}"
Expand All @@ -47,7 +47,7 @@ import_check <- function(x, envname) {

msg_install <- NULL
msg_restart <- NULL
if(!is.null(inst)) {
if (!is.null(inst)) {
# msg_install <- paste("{.header - Use} ", inst, "{.header to install.}")
# msg_restart <- paste("- Restart your R session, and run:", inst)
}
Expand Down Expand Up @@ -115,7 +115,7 @@ env_type <- function(envname) {
}
if (is.na(ret)) {
check_conda <- try(conda_python(envname), silent = TRUE)
if(!inherits(check_conda, "try-error")) {
if (!inherits(check_conda, "try-error")) {
ret <- "conda"
}
}
Expand Down
13 changes: 5 additions & 8 deletions R/install.R
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ install_pyspark <- function(
new_env = TRUE,
method = c("auto", "virtualenv", "conda"),
...) {

install_environment(
libs = "pyspark",
version = version,
Expand All @@ -47,7 +46,6 @@ install_databricks <- function(
new_env = TRUE,
method = c("auto", "virtualenv", "conda"),
...) {

if (!is.null(version) && !is.null(cluster_id)) {
cli_div(theme = cli_colors())
cli_alert_warning(
Expand Down Expand Up @@ -82,7 +80,6 @@ install_environment <- function(
new_env = NULL,
method = c("auto", "virtualenv", "conda"),
...) {

if (is.null(version)) {
cli_div(theme = cli_colors())
cli_alert_success(
Expand All @@ -94,10 +91,10 @@ install_environment <- function(
cli_end()
} else {
lib <- py_library_info(libs, version)
if(is.null(lib)) {
if (is.null(lib)) {
cli_alert_success(
"{.header Checking if provided version is valid against PyPi.org}"
)
)
cli_abort(
"{.header Version } {.emph '{version}' }{.header does not exist}"
)
Expand All @@ -106,12 +103,12 @@ install_environment <- function(

ver_name <- version_prep(version)

if(version == ver_name) {
if (version == ver_name) {
version <- paste0(version, ".*")
}

if (is.null(envname)) {
if(libs == "databricks-connect") {
if (libs == "databricks-connect") {
ln <- "databricks"
} else {
ln <- libs
Expand Down Expand Up @@ -234,7 +231,7 @@ require_python <- function(package, version) {

py_library_info <- function(lib, ver = NULL) {
url <- paste0("https://pypi.org/pypi/", lib)
if(!is.null(ver)) {
if (!is.null(ver)) {
url <- paste0(url, "/", ver)
}
url <- paste0(url, "/json")
Expand Down
2 changes: 1 addition & 1 deletion R/local-connect.R
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ spark_connect_service_start <- function(version = "3.4",
"--packages",
glue("org.apache.spark:spark-connect_{scala_version}:{get_version$sparkVersion}")
)
if(!include_args) {
if (!include_args) {
args <- ""
}
prs <- process$new(
Expand Down
2 changes: 1 addition & 1 deletion R/pyspark-connection.R
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ spark_session.pyspark_connection <- function(sc) {
# should be more flexible, taking row specs, multiple args, etc. matching
# semantics of R dataframes and take advantage of
# reticulate::`[.python.builtin.object` for constructing slices, etc.
if(is.null(i)) {
if (is.null(i)) {
# special case, since pyspark has no "emptyDataFrame" method to invoke
sc <- spark_connection(x)

Expand Down
58 changes: 31 additions & 27 deletions R/spark-connect.R
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ spark_connect_method.spark_method_databricks_connect <- function(
extensions,
scala_version,
...) {

py_spark_connect(
master = master,
method = method,
Expand All @@ -56,42 +55,45 @@ py_spark_connect <- function(master,
conn <- NULL

if (method == "spark_connect") {

if(is.null(envname)) {
if (is.null(envname)) {
env_base <- "r-sparklyr-pyspark-"
envs <- find_environments(env_base)
if(length(envs) == 0) {
if (length(envs) == 0) {
cli_div(theme = cli_colors())
cli_abort(c(
paste0("{.header No environment name provided, and no environment was }",
"{.header automatically identified.}"),
paste0(
"{.header No environment name provided, and no environment was }",
"{.header automatically identified.}"
),
"Run {.run pysparklyr::install_pyspark()} to install."
), call = NULL)
cli_end()
} else {
if(!is.null(spark_version)) {
if (!is.null(spark_version)) {
sp_version <- version_prep(spark_version)
envname <- glue("{env_base}{sp_version}")
matched <- envs[envs == envname]
if(length(matched) == 0) {
if (length(matched) == 0) {
envname <- envs[[1]]
cli_div(theme = cli_colors())
cli_alert_warning(paste(
"{.header A Python environment with a matching version was not found}",
"* {.header Will attempt connecting using }{.emph '{envname}'}",
paste0("* {.header To install the proper Python environment use:}",
" {.run pysparklyr::install_pyspark(version = \"{sp_version}\")}"
paste0(
"* {.header To install the proper Python environment use:}",
" {.run pysparklyr::install_pyspark(version = \"{sp_version}\")}"
),
sep = "\n"
))
cli_end()
} else {
envname <- matched
}
envname <- matched
}
} else {
envname <- envs[[1]]
}
}
}}
}

pyspark <- import_check("pyspark", envname)
pyspark_sql <- pyspark$sql
Expand All @@ -103,12 +105,12 @@ py_spark_connect <- function(master,
if (method == "databricks_connect") {
cluster_id <- cluster_id %||% Sys.getenv("DATABRICKS_CLUSTER_ID")
master <- master %||% Sys.getenv("DATABRICKS_HOST")
if(is.null(dbr_version)) {
if (is.null(dbr_version)) {
dbr <- cluster_dbr_version(
cluster_id = cluster_id,
host = master,
token = token
)
)
} else {
dbr <- version_prep(dbr_version)
}
Expand All @@ -117,17 +119,18 @@ py_spark_connect <- function(master,
envname <- glue("{env_base}{dbr}")
envs <- find_environments(env_base)
matched <- envs[envs == envname]
if(length(matched) == 0) {
if (length(matched) == 0) {
envname <- envs[[1]]
cli_div(theme = cli_colors())
cli_alert_warning(paste(
"{.header A Python environment with a matching version was not found}",
"* {.header Will attempt connecting using }{.emph '{envname}'}",
paste0("* {.header To install the proper Python environment use:}",
" {.run pysparklyr::install_databricks(version = \"{dbr}\")}"
),
paste0(
"* {.header To install the proper Python environment use:}",
" {.run pysparklyr::install_databricks(version = \"{dbr}\")}"
),
sep = "\n"
))
))
cli_end()
}

Expand Down Expand Up @@ -250,6 +253,11 @@ build_user_agent <- function() {
in_rstudio <- FALSE
in_connect <- FALSE

env_var <- Sys.getenv("SPARK_CONNECT_USER_AGENT", unset = NA)
if (!is.na(env_var)) {
return(env_var)
}

if (Sys.getenv("RSTUDIO_PRODUCT") == "CONNECT") {
product <- "posit-connect"
}
Expand Down Expand Up @@ -292,9 +300,7 @@ build_user_agent <- function() {

cluster_dbr_version <- function(cluster_id,
host = Sys.getenv("DATABRICKS_HOST"),
token = Sys.getenv("DATABRICKS_TOKEN")
) {

token = Sys.getenv("DATABRICKS_TOKEN")) {
cli_div(theme = cli_colors())
cli_alert_warning(
"{.header Retrieving version from cluster }{.emph '{cluster_id}'}"
Expand All @@ -320,13 +326,11 @@ cluster_dbr_version <- function(cluster_id,

cluster_dbr_info <- function(cluster_id,
host = Sys.getenv("DATABRICKS_HOST"),
token = Sys.getenv("DATABRICKS_TOKEN")
) {

token = Sys.getenv("DATABRICKS_TOKEN")) {
paste0(
host,
"/api/2.0/clusters/get"
) %>%
) %>%
request() %>%
req_auth_bearer_token(token) %>%
req_body_json(list(cluster_id = cluster_id)) %>%
Expand Down

0 comments on commit d70a151

Please sign in to comment.