Skip to content

Commit

Permalink
Use share credentials options if any are specified
Browse files Browse the repository at this point in the history
Signed-off-by: Steven Ayers <[email protected]>
  • Loading branch information
stevenayers committed Sep 14, 2024
1 parent 961b66a commit a658e19
Show file tree
Hide file tree
Showing 10 changed files with 79 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1144,17 +1144,29 @@ object DeltaSharingRestClient extends Logging {
* Parse the user provided path `profile_file#share.schema.table` to
* ParsedDeltaSharingTablePath.
*/
def parsePath(path: String): ParsedDeltaSharingTablePath = {
def parsePath(
path: String,
shareCredentialsOptions: Map[String, String]): ParsedDeltaSharingTablePath = {
val shapeIndex = path.lastIndexOf('#')
if (shapeIndex < 0) {
throw new IllegalArgumentException(s"path $path is not valid")

val (profileFile, tablePath) = {
if (shareCredentialsOptions.nonEmpty && shapeIndex < 0) {
("", path)
} else if (shareCredentialsOptions.nonEmpty && shapeIndex >= 0) {
throw new IllegalArgumentException(
"cannot specify both share credentials options and a profile file path")
} else if (shareCredentialsOptions.isEmpty && shapeIndex < 0) {
throw new IllegalArgumentException(s"path $path is not valid")
} else {
(path.substring(0, shapeIndex), path.substring(shapeIndex + 1))
}
}
val profileFile = path.substring(0, shapeIndex)
val tableSplits = path.substring(shapeIndex + 1).split("\\.", -1)

val tableSplits = tablePath.split("\\.", -1)
if (tableSplits.length != 3) {
throw new IllegalArgumentException(s"path $path is not valid")
}
if (profileFile.isEmpty || tableSplits(0).isEmpty ||
if ((profileFile.isEmpty && shareCredentialsOptions.isEmpty) || tableSplits(0).isEmpty ||
tableSplits(1).isEmpty || tableSplits(2).isEmpty) {
throw new IllegalArgumentException(s"path $path is not valid")
}
Expand All @@ -1168,19 +1180,23 @@ object DeltaSharingRestClient extends Logging {

def apply(
profileFile: String,
shareCredentialsOptions: Map[String, String],
forStreaming: Boolean = false,
responseFormat: String = RESPONSE_FORMAT_PARQUET,
readerFeatures: String = ""
): DeltaSharingClient = {
val sqlConf = SparkSession.active.sessionState.conf

val profileProviderClass = ConfUtils.profileProviderClass(sqlConf)
val profileProvider: DeltaSharingProfileProvider =
val profileProvider: DeltaSharingProfileProvider = if (shareCredentialsOptions.nonEmpty) {
new DeltaSharingOptionsProfileProvider(shareCredentialsOptions)
} else {
val profileProviderClass = ConfUtils.profileProviderClass(sqlConf)
Class.forName(profileProviderClass)
.getConstructor(classOf[Configuration], classOf[String])
.newInstance(SparkSession.active.sessionState.newHadoopConf(),
profileFile)
.asInstanceOf[DeltaSharingProfileProvider]
}

// This is a flag to test the local https server. Should never be used in production.
val sslTrustAll = ConfUtils.sslTrustAll(sqlConf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ object DeltaSharingOptions extends Logging {
val RESPONSE_FORMAT_DELTA = "delta"

val PROFILE_SHARE_CREDENTIALS_VERSION = "shareCredentialsVersion"
val PROFILE_TYPE = "type"
val PROFILE_TYPE = "shareCredentialsType"
val PROFILE_ENDPOINT = "endpoint"
val PROFILE_TOKEN_ENDPOINT = "tokenEndpoint"
val PROFILE_CLIENT_ID = "clientId"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,13 @@ private[sharing] object RemoteDeltaLog {

def apply(
path: String,
shareCredentialsOptions: Map[String, String],
forStreaming: Boolean = false,
responseFormat: String = DeltaSharingOptions.RESPONSE_FORMAT_PARQUET,
initDeltaTableMetadata: Option[DeltaTableMetadata] = None): RemoteDeltaLog = {
val parsedPath = DeltaSharingRestClient.parsePath(path)
val client = DeltaSharingRestClient(parsedPath.profileFile, forStreaming, responseFormat)
val parsedPath = DeltaSharingRestClient.parsePath(path, shareCredentialsOptions)
val client = DeltaSharingRestClient(
parsedPath.profileFile, shareCredentialsOptions, forStreaming, responseFormat)
val deltaSharingTable = DeltaSharingTable(
name = parsedPath.table,
schema = parsedPath.schema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,29 +40,38 @@ class DeltaSharingRestClientSuite extends DeltaSharingIntegrationTest {
import DeltaSharingRestClient._

test("parsePath") {
assert(DeltaSharingRestClient.parsePath("file:///foo/bar#a.b.c") ==
val emptyShareCredentialsOptions: Map[String, String] = Map.empty
val testShareCredentialsOptions: Map[String, String] = Map("key" -> "value")

assert(
DeltaSharingRestClient.parsePath("file:///foo/bar#a.b.c", emptyShareCredentialsOptions) ==
ParsedDeltaSharingTablePath("file:///foo/bar", "a", "b", "c"))
assert(DeltaSharingRestClient.parsePath("file:///foo/bar#bar#a.b.c") ==
assert(DeltaSharingRestClient.parsePath("file:///foo/bar#bar#a.b.c", emptyShareCredentialsOptions) ==
ParsedDeltaSharingTablePath("file:///foo/bar#bar", "a", "b", "c"))
assert(DeltaSharingRestClient.parsePath("file:///foo/bar#bar#a.b.c ") ==
assert(DeltaSharingRestClient.parsePath("file:///foo/bar#bar#a.b.c ", emptyShareCredentialsOptions) ==
ParsedDeltaSharingTablePath("file:///foo/bar#bar", "a", "b", "c "))
assert(DeltaSharingRestClient.parsePath("a.b.c", testShareCredentialsOptions) ==
ParsedDeltaSharingTablePath("", "a", "b", "c"))
intercept[IllegalArgumentException] {
DeltaSharingRestClient.parsePath("file:///foo/barr#a.b.c ", testShareCredentialsOptions)
}
intercept[IllegalArgumentException] {
DeltaSharingRestClient.parsePath("file:///foo/bar")
DeltaSharingRestClient.parsePath("file:///foo/bar", emptyShareCredentialsOptions)
}
intercept[IllegalArgumentException] {
DeltaSharingRestClient.parsePath("file:///foo/bar#a.b")
DeltaSharingRestClient.parsePath("file:///foo/bar#a.b", emptyShareCredentialsOptions)
}
intercept[IllegalArgumentException] {
DeltaSharingRestClient.parsePath("file:///foo/bar#a.b.c.d")
DeltaSharingRestClient.parsePath("file:///foo/bar#a.b.c.d", emptyShareCredentialsOptions)
}
intercept[IllegalArgumentException] {
DeltaSharingRestClient.parsePath("#a.b.c")
DeltaSharingRestClient.parsePath("#a.b.c", emptyShareCredentialsOptions)
}
intercept[IllegalArgumentException] {
DeltaSharingRestClient.parsePath("foo#a.b.")
DeltaSharingRestClient.parsePath("foo#a.b.", emptyShareCredentialsOptions)
}
intercept[IllegalArgumentException] {
DeltaSharingRestClient.parsePath("foo#a.b.c.")
DeltaSharingRestClient.parsePath("foo#a.b.c.", emptyShareCredentialsOptions)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class DeltaSharingOptionsSuite extends SparkFunSuite {
// profile as opts
var options = new DeltaSharingOptions(Map(
"shareCredentialsVersion" -> "1",
"type" -> "bearer_token",
"shareCredentialsType" -> "bearer_token",
"endpoint" -> "foo",
"tokenEndpoint" -> "bar",
"clientId" -> "abc",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ private[sharing] class DeltaSharingDataSource
val path = options.options.getOrElse("path", throw DeltaSharingErrors.pathNotSpecifiedException)

val deltaLog = RemoteDeltaLog(
path, forStreaming = false, responseFormat = options.responseFormat
path, options.shareCredentialsOptions,
forStreaming = false, responseFormat = options.responseFormat
)
deltaLog.createRelation(options.versionAsOf, options.timestampAsOf, options.cdfOptions)
}
Expand All @@ -69,7 +70,8 @@ private[sharing] class DeltaSharingDataSource

val path = options.options.getOrElse("path", throw DeltaSharingErrors.pathNotSpecifiedException)
val deltaLog = RemoteDeltaLog(
path, forStreaming = true, responseFormat = options.responseFormat
path, options.shareCredentialsOptions,
forStreaming = true, responseFormat = options.responseFormat
)
val schemaToUse = deltaLog.snapshot().schema
if (schemaToUse.isEmpty) {
Expand All @@ -95,7 +97,9 @@ private[sharing] class DeltaSharingDataSource
}
val options = new DeltaSharingOptions(parameters)
val path = options.options.getOrElse("path", throw DeltaSharingErrors.pathNotSpecifiedException)
val deltaLog = RemoteDeltaLog(path, forStreaming = true, options.responseFormat)
val deltaLog = RemoteDeltaLog(
path, options.shareCredentialsOptions, forStreaming = true, options.responseFormat
)

DeltaSharingSource(SparkSession.active, deltaLog, options)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class DeltaSharingSourceCDFSuite extends QueryTest
with SharedSparkSession with DeltaSharingIntegrationTest {

import testImplicits._
lazy val shareCredentialsOptions: Map[String, String] = Map.empty

// VERSION 0: CREATE TABLE
// VERSION 1: INSERT 3 rows, 3 add files
Expand All @@ -66,7 +67,7 @@ class DeltaSharingSourceCDFSuite extends QueryTest
// VERSION 4: REMOVE 4 rows, 2 remove files
lazy val cdfTablePath = testProfileFile.getCanonicalPath + "#share8.default.streaming_cdf_table"

lazy val deltaLog = RemoteDeltaLog(cdfTablePath, forStreaming = true)
lazy val deltaLog = RemoteDeltaLog(cdfTablePath, shareCredentialsOptions, forStreaming = true)

def getSource(parameters: Map[String, String]): DeltaSharingSource = {
val options = new DeltaSharingOptions(parameters ++ Map("readChangeFeed" -> "true"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,15 @@ class DeltaSharingSourceParamsSuite extends QueryTest
with SharedSparkSession with DeltaSharingIntegrationTest {

import testImplicits._
lazy val shareCredentialsOptions: Map[String, String] = Map.empty

// VERSION 0: CREATE TABLE
// VERSION 1: INSERT 3 rows, 3 add files
// VERSION 2: REMOVE 1 row, 1 remove file
// VERSION 3: UPDATE 1 row, 1 remove file and 1 add file
lazy val tablePath = testProfileFile.getCanonicalPath + "#share8.default.cdf_table_cdf_enabled"

lazy val deltaLog = RemoteDeltaLog(tablePath, forStreaming = true)
lazy val deltaLog = RemoteDeltaLog(tablePath, shareCredentialsOptions, forStreaming = true)

def getSource(parameters: Map[String, String]): DeltaSharingSource = {
val options = new DeltaSharingOptions(parameters)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class DeltaSharingSourceSuite extends QueryTest
// https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html

import testImplicits._
lazy val shareCredentialsOptions: Map[String, String] = Map.empty

// VERSION 0: CREATE TABLE
// VERSION 1: INSERT 3 rows, 3 add files
Expand All @@ -67,7 +68,7 @@ class DeltaSharingSourceSuite extends QueryTest
lazy val toNotNullTable = testProfileFile.getCanonicalPath +
"#share8.default.streaming_null_to_notnull"

lazy val deltaLog = RemoteDeltaLog(tablePath, forStreaming = true)
lazy val deltaLog = RemoteDeltaLog(tablePath, shareCredentialsOptions, forStreaming = true)

def getSource(parameters: Map[String, String]): DeltaSharingSource = {
val options = new DeltaSharingOptions(parameters)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,24 @@ class DeltaSharingSuite extends QueryTest with SharedSparkSession with DeltaShar

import testImplicits._

integrationTest("table1 passing profile with read options") {
val tablePath = "share1.default.table1"
val expected = Seq(
Row(sqlTimestamp("2021-04-27 23:32:02.07"), sqlDate("2021-04-28")),
Row(sqlTimestamp("2021-04-27 23:32:22.421"), sqlDate("2021-04-28"))
)
val readOptions = Map(
"endpoint" -> "https://localhost:$TEST_PORT/delta-sharing",
"bearerToken" -> "dapi5e3574ec767ca1548ae5bbed1a2dc04d",
"shareCredentialsVersion" -> "1"
)
checkAnswer(spark.read.format("deltaSharing").options(readOptions).load(tablePath), expected)
withTable("delta_sharing_test") {
sql(s"CREATE TABLE delta_sharing_test USING deltaSharing LOCATION '$tablePath'")
checkAnswer(sql(s"SELECT * FROM delta_sharing_test"), expected)
}
}

integrationTest("table1") {
val tablePath = testProfileFile.getCanonicalPath + "#share1.default.table1"
val expected = Seq(
Expand Down

0 comments on commit a658e19

Please sign in to comment.