Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds support for Apache Hive LLAP #107

Merged
merged 29 commits into from
Feb 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
ad77353
added Hive support
aklochkova Aug 26, 2019
5f5b8a9
updated ReadMe
aklochkova Sep 26, 2019
ba90108
changed jdbc pattern
aklochkova Oct 1, 2019
75d5a83
changed jdbc pattern
aklochkova Oct 1, 2019
f6809f7
debug info
aklochkova Oct 1, 2019
8bb5dec
removed debug info
aklochkova Oct 1, 2019
603342e
added non-batch insert
aklochkova Oct 23, 2019
8ebf250
added missing jar
aklochkova Oct 23, 2019
ed859b2
added missing jar
aklochkova Oct 23, 2019
06c283d
reverted non-batch insert
aklochkova Oct 24, 2019
a571505
Merge branch 'master' into issue-1168-hive-support
aklochkova Oct 24, 2019
b251bbb
Merge remote-tracking branch 'upstream/develop' into issue-1168-hive-…
aklochkova Oct 24, 2019
04ffdeb
Merge branch 'develop' into issue-1168-hive-support
wivern Oct 28, 2019
c99cb95
fixes hive batched insert
wivern Oct 29, 2019
ff765e6
Merge remote-tracking branch 'origin/issue-1168-hive-support' into is…
aklochkova Oct 29, 2019
2415e04
reduces batch size for hive
wivern Oct 30, 2019
02c4a79
Merge remote-tracking branch 'origin/issue-1168-hive-support' into is…
aklochkova Oct 30, 2019
2092911
adds bulk load for hive
wivern Nov 1, 2019
0fe3236
Merge remote-tracking branch 'origin/issue-1168-hive-support' into is…
aklochkova Nov 5, 2019
4194020
improved condition for hive
aklochkova Nov 11, 2019
2084d46
changed table to internal
aklochkova Nov 13, 2019
0611b13
made equality comparison case-insensitive
aklochkova Nov 14, 2019
b1af1c8
made ssh work via system commands
aklochkova Nov 18, 2019
843289e
added missed parenthesis
aklochkova Nov 18, 2019
c2acdc1
added windows functionality for copying file to hadoop
aklochkova Nov 19, 2019
956eb3f
added windows functionality for copying file to hadoop
aklochkova Nov 19, 2019
2e7c7db
added windows functionality for copying file to hadoop
aklochkova Nov 19, 2019
adb3727
improved condition for package 'ssh'
aklochkova Nov 19, 2019
a502b75
after review fix: improved readability
wivern Feb 7, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,5 @@
.RData
R/LocalEnvironment.R
/bin/
/DatabaseConnector.iml
.idea/
20 changes: 20 additions & 0 deletions R/Connect.R
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,26 @@ connect <- function(connectionDetails = NULL,
attr(connection, "dbms") <- dbms
return(connection)
}
if (dbms == "hive") {
writeLines("Connecting using Hive driver")
jarPath <- findPathToJar("^hive-jdbc-standalone\\.jar$", pathToDriver)
driver <- getJbcDriverSingleton("org.apache.hive.jdbc.HiveDriver", jarPath)

if (missing(connectionString) || is.null(connectionString)) {
connectionString <- paste0("jdbc:hive2://", server, ":", port, "/", schema)
if (!missing(extraSettings) && !is.null(extraSettings)) {
connectionString <- paste0(connectionString, ";", extraSettings)
}
}
connection <- connectUsingJdbcDriver(driver,
connectionString,
user = user,
password = password,
dbms = dbms)

attr(connection, "dbms") <- dbms
return(connection)
}
if (dbms == "bigquery") {
writeLines("Connecting using BigQuery driver")

Expand Down
130 changes: 115 additions & 15 deletions R/InsertTable.R
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,19 @@ castValues <- function(values, fts) {
sep = "")
}

formatRow <- function(data, castValues, fts) {
formatRow <- function(data, aliases = c(), castValues, fts) {
if (castValues) {
data <- castValues(data, fts)
}
return(paste(data, collapse = ","))
return(paste(data, aliases, collapse = ","))
}

ctasHack <- function(connection, qname, tempTable, varNames, fts, data, progressBar, oracleTempSchema) {
batchSize <- 1000
if (attr(connection, "dbms") == "hive") {
batchSize <- 750
} else {
batchSize <- 1000
}
mergeSize <- 300

if (any(tolower(names(data)) == "subject_id")) {
Expand All @@ -125,16 +129,18 @@ ctasHack <- function(connection, qname, tempTable, varNames, fts, data, progress
tempNames <- c(mergedName)
}
end <- min(start + batchSize - 1, nrow(data))
batch <- toStrings(data[start:end, , drop = FALSE], fts)
batch <- toStrings(data[start:end, , drop = FALSE], fts)

varAliases <- strsplit(varNames, ",")[[1]]
# First line gets type information:
valueString <- formatRow(batch[1, , drop = FALSE], castValues = TRUE, fts = fts)
valueString <- formatRow(batch[1, , drop = FALSE], varAliases, castValues = TRUE, fts = fts)
if (end > start) {
# Other lines only get type information if BigQuery:
valueString <- paste(c(valueString, apply(batch[2:nrow(batch), , drop = FALSE],
MARGIN = 1,
FUN = formatRow,
castValues = attr(connection, "dbms") == "bigquery",
aliases = varAliases,
castValues = attr(connection, "dbms") %in% c("bigquery", "hive"),
fts = fts)),
collapse = "\nUNION ALL\nSELECT ")
}
Expand Down Expand Up @@ -272,7 +278,7 @@ insertTable.default <- function(connection,
tempTable <- TRUE
warning("Temp table name detected, setting tempTable parameter to TRUE")
}
if (Sys.getenv("USE_MPP_BULK_LOAD") == "TRUE") {
if (toupper(Sys.getenv("USE_MPP_BULK_LOAD")) == "TRUE") {
useMppBulkLoad <- TRUE
}
if (dropTableIfExists)
Expand Down Expand Up @@ -323,7 +329,7 @@ insertTable.default <- function(connection,
fdef <- paste(.sql.qescape(names(data), TRUE, connection@identifierQuote), fts, collapse = ",")
qname <- .sql.qescape(tableName, TRUE, connection@identifierQuote)
varNames <- paste(.sql.qescape(names(data), TRUE, connection@identifierQuote), collapse = ",")

if (dropTableIfExists) {
if (tempTable) {
sql <- "IF OBJECT_ID('tempdb..@tableName', 'U') IS NOT NULL DROP TABLE @tableName;"
Expand All @@ -337,26 +343,35 @@ insertTable.default <- function(connection,
executeSql(connection, sql, progressBar = FALSE, reportOverallTime = FALSE)
}

if (createTable && !tempTable && useMppBulkLoad) {
if (createTable && (!tempTable || attr(connection, "dbms") == "hive") && useMppBulkLoad) {
ensure_installed("uuid")
ensure_installed("R.utils")
ensure_installed("urltools")
if (!.checkMppCredentials(connection)) {
stop("MPP credentials could not be confirmed. Please review them or set 'useMppBulkLoad' to FALSE")
}
writeLines("Attempting to use MPP bulk loading...")
sql <- paste("CREATE TABLE ", qname, " (", fdef, ");", sep = "")
sql <- SqlRender::translate(sql, targetDialect = attr(connection, "dbms"))
executeSql(connection, sql, progressBar = FALSE, reportOverallTime = FALSE)
dbms <- attr(connection, "dbms")

if (dbms != "hive") {
sql <- paste("CREATE TABLE ", qname, " (", fdef, ");", sep = "")
sql <- SqlRender::translate(sql, targetDialect = attr(connection, "dbms"))
executeSql(connection, sql, progressBar = FALSE, reportOverallTime = FALSE)
}

if (attr(connection, "dbms") == "redshift") {
if (dbms == "redshift") {
ensure_installed("aws.s3")
.bulkLoadRedshift(connection, qname, data)
} else if (attr(connection, "dbms") == "pdw") {
} else if (dbms == "pdw") {
.bulkLoadPdw(connection, qname, fts, data)
} else if (dbms == "hive") {
if (tolower(Sys.info()["sysname"]) == "windows") {
ensure_installed("ssh")
}
.bulkLoadHive(connection, qname, strsplit(varNames, ",")[[1]], data)
}
} else {
if (attr(connection, "dbms") %in% c("pdw", "redshift", "bigquery") && createTable && nrow(data) > 0) {
if (attr(connection, "dbms") %in% c("pdw", "redshift", "bigquery", "hive") && createTable && nrow(data) > 0) {
ctasHack(connection, qname, tempTable, varNames, fts, data, progressBar, oracleTempSchema)
} else {
if (createTable) {
Expand Down Expand Up @@ -484,11 +499,37 @@ insertTable.DatabaseConnectorDbiConnection <- function(connection,
warning("Not using Server Side Encryption for AWS S3")
}
return(envSet & bucket)
} else if (attr(connection, "dbms") == "hive") {
if (tolower(Sys.info()["sysname"]) == "windows" && !require("ssh")) {
writeLines("Please install ssh package, it's required")
return(FALSE)
}
if (Sys.getenv("HIVE_NODE_HOST") == "") {
writeLines("Please set environment variable HIVE_NODE_HOST to the Hive Node's host:port")
return(FALSE)
}
if (Sys.getenv("HIVE_SSH_USER") == "") {
warning(paste("HIVE_SSH_USER is not set, using default", .getHiveSshUser()))
}
if (Sys.getenv("HIVE_SSH_PASSWORD") == "" && Sys.getenv("HIVE_KEYFILE") == "") {
writeLines("At least one of the following environment variables: HIVE_PASSWORD and/or HIVE_KEYFILE should be set")
return(FALSE)
}
if (Sys.getenv("HIVE_KEYFILE") == "") {
warning("Using ssh password authentication, it's recommended to use keyfile instead")
}

return(TRUE)
} else {
return(FALSE)
}
}

.getHiveSshUser <- function() {
sshUser <- Sys.getenv("HIVE_SSH_USER")
return (if (sshUser == "") "root" else sshUser)
}

.bulkLoadPdw <- function(connection, qname, fts, data) {
start <- Sys.time()
# Format integer fields to prevent scientific notation:
Expand Down Expand Up @@ -598,6 +639,65 @@ insertTable.DatabaseConnectorDbiConnection <- function(connection,
})
}

.bulkLoadHive <- function(connection, qname, varNames, data) {
start <- Sys.time()
fileName <- sprintf("hive_insert_%s.csv", uuid::UUIDgenerate(use.time = TRUE))
filePath <- file.path(tempdir(), fileName)
write.csv(x = data, na = "", file = filePath, row.names = FALSE, quote = TRUE)

hiveUser <- .getHiveSshUser()
hivePasswd <- Sys.getenv("HIVE_SSH_PASSWORD")
hiveHost <- Sys.getenv("HIVE_NODE_HOST")
sshPort <- (function(port) if (port == "") "2222" else port)(Sys.getenv("HIVE_SSH_PORT"))
nodePort <- (function(port) if (port == "") "8020" else port)(Sys.getenv("HIVE_NODE_PORT"))
hiveKeyFile <- (function(keyfile) if (keyfile == "") NULL else keyfile)(Sys.getenv("HIVE_KEYFILE"))
hadoopUser <- (function(hadoopUser) if (hadoopUser == "") "hive" else hadoopUser)(Sys.getenv("HADOOP_USER_NAME"))

tryCatch({
if (tolower(Sys.info()["sysname"]) == "windows") {
session <- ssh_connect(host = sprintf("%s@%s:%s", hiveUser, hiveHost, sshPort), passwd = hivePasswd, keyfile = hiveKeyFile)
remoteFile <- paste0("/tmp/", fileName)
scp_upload(session, filePath, to = remoteFile, verbose = FALSE)
hadoopDir <- sprintf("/user/%s/%s", hadoopUser, uuid::UUIDgenerate(use.time = TRUE))
hadoopFile <- paste0(hadoopDir, "/", fileName)
ssh_exec_wait(session, sprintf("HADOOP_USER_NAME=%s hadoop fs -mkdir %s", hadoopUser, hadoopDir))
command <- sprintf("HADOOP_USER_NAME=%s hadoop fs -put %s %s", hadoopUser, remoteFile, hadoopFile)
ssh_exec_wait(session, command = command)
} else {
remoteFile <- paste0("/tmp/", fileName)
scp_command <- sprintf("sshpass -p \'%s\' scp -P %s %s %s:%s", hivePasswd, sshPort, filePath, hiveHost, remoteFile)
system(scp_command)
hadoopDir <- sprintf("/user/%s/%s", hadoopUser, uuid::UUIDgenerate(use.time = TRUE))
hadoopFile <- paste0(hadoopDir, "/", fileName)
hdp_mk_dir_command <- sprintf("sshpass -p \'%s\' ssh %s -p %s HADOOP_USER_NAME=%s hadoop fs -mkdir %s", hivePasswd, hiveHost, sshPort, hadoopUser, hadoopDir)
system(hdp_mk_dir_command)
hdp_put_command <- sprintf("sshpass -p \'%s\' ssh %s -p %s HADOOP_USER_NAME=%s hadoop fs -put %s %s", hivePasswd, hiveHost, sshPort, hadoopUser, remoteFile, hadoopFile)
system(hdp_put_command)
}
def <- function(name) {
return(paste(name, "STRING"))
}
fdef <- paste(sapply(varNames, def), collapse = ", ")
sql <- SqlRender::render("CREATE TABLE @table(@fdef) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE LOCATION 'hdfs://@hiveHost:@nodePort@filename';",
filename = hadoopDir, table = qname, fdef = fdef, hiveHost = hiveHost, nodePort = nodePort)
sql <- SqlRender::translate(sql, targetDialect = "hive", oracleTempSchema = NULL)

tryCatch({
DatabaseConnector::executeSql(connection = connection, sql = sql, reportOverallTime = FALSE)
delta <- Sys.time() - start
writeLines(paste("Bulk load to Hive took", signif(delta, 3), attr(delta, "units")))
}, error = function(e) {
writeLines(paste("Error:", e))
stop("Error in Hive bulk upload.")
}, finally = {
try(invisible(file.remove(filePath)))
})
}, finally = {
if (tolower(Sys.info()["sysname"]) == "windows")
ssh_disconnect(session)
})
}

# Borrowed from devtools:
# https://github.com/hadley/devtools/blob/ba7a5a4abd8258c52cb156e7b26bb4bf47a79f0b/R/utils.r#L44
is_installed <- function(pkg, version = 0) {
Expand Down