diff --git a/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala b/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala index e7b960b63..750d27271 100644 --- a/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala +++ b/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala @@ -1144,12 +1144,19 @@ object DeltaSharingRestClient extends Logging { * Parse the user provided path `profile_file#share.schema.table` to * ParsedDeltaSharingTablePath. */ - def parsePath(path: String): ParsedDeltaSharingTablePath = { + def parsePath( + path: String, + shareCredentialsOptions: Map[String, String]): ParsedDeltaSharingTablePath = { val shapeIndex = path.lastIndexOf('#') - if (shapeIndex < 0) { - throw new IllegalArgumentException(s"path $path is not valid") + val (profileFile, tablePath) = if (shapeIndex < 0) { + if (shareCredentialsOptions.nonEmpty) { + ("", path) + } else { + throw new IllegalArgumentException(s"path $path is not valid") + } + } else { + (path.substring(0, shapeIndex), path.substring(shapeIndex + 1)) } - val profileFile = path.substring(0, shapeIndex) val tableSplits = path.substring(shapeIndex + 1).split("\\.", -1) if (tableSplits.length != 3) { throw new IllegalArgumentException(s"path $path is not valid") @@ -1168,19 +1175,23 @@ object DeltaSharingRestClient extends Logging { def apply( profileFile: String, + shareCredentialsOptions: Map[String, String], forStreaming: Boolean = false, responseFormat: String = RESPONSE_FORMAT_PARQUET, readerFeatures: String = "" ): DeltaSharingClient = { val sqlConf = SparkSession.active.sessionState.conf - val profileProviderClass = ConfUtils.profileProviderClass(sqlConf) - val profileProvider: DeltaSharingProfileProvider = + val profileProvider: DeltaSharingProfileProvider = if (shareCredentialsOptions.nonEmpty) { + new DeltaSharingOptionsProfileProvider(shareCredentialsOptions) + } else { + val profileProviderClass = ConfUtils.profileProviderClass(sqlConf) Class.forName(profileProviderClass) .getConstructor(classOf[Configuration], classOf[String]) .newInstance(SparkSession.active.sessionState.newHadoopConf(), profileFile) .asInstanceOf[DeltaSharingProfileProvider] + } // This is a flag to test the local https server. Should never be used in production. val sslTrustAll = ConfUtils.sslTrustAll(sqlConf) diff --git a/client/src/main/scala/io/delta/sharing/spark/DeltaSharingOptions.scala b/client/src/main/scala/io/delta/sharing/spark/DeltaSharingOptions.scala index 95ab75a6a..92d2481ab 100644 --- a/client/src/main/scala/io/delta/sharing/spark/DeltaSharingOptions.scala +++ b/client/src/main/scala/io/delta/sharing/spark/DeltaSharingOptions.scala @@ -214,7 +214,7 @@ object DeltaSharingOptions extends Logging { val RESPONSE_FORMAT_DELTA = "delta" val PROFILE_SHARE_CREDENTIALS_VERSION = "shareCredentialsVersion" - val PROFILE_TYPE = "type" + val PROFILE_TYPE = "shareCredentialsType" val PROFILE_ENDPOINT = "endpoint" val PROFILE_TOKEN_ENDPOINT = "tokenEndpoint" val PROFILE_CLIENT_ID = "clientId" diff --git a/client/src/main/scala/io/delta/sharing/spark/RemoteDeltaLog.scala b/client/src/main/scala/io/delta/sharing/spark/RemoteDeltaLog.scala index e2977fa13..cb17e0eef 100644 --- a/client/src/main/scala/io/delta/sharing/spark/RemoteDeltaLog.scala +++ b/client/src/main/scala/io/delta/sharing/spark/RemoteDeltaLog.scala @@ -126,11 +126,13 @@ private[sharing] object RemoteDeltaLog { def apply( path: String, + shareCredentialsOptions: Map[String, String], forStreaming: Boolean = false, responseFormat: String = DeltaSharingOptions.RESPONSE_FORMAT_PARQUET, initDeltaTableMetadata: Option[DeltaTableMetadata] = None): RemoteDeltaLog = { - val parsedPath = DeltaSharingRestClient.parsePath(path) - val client = DeltaSharingRestClient(parsedPath.profileFile, forStreaming, responseFormat) + val parsedPath = DeltaSharingRestClient.parsePath(path, shareCredentialsOptions) + val client = DeltaSharingRestClient( + parsedPath.profileFile, shareCredentialsOptions, forStreaming, responseFormat) val deltaSharingTable = DeltaSharingTable( name = parsedPath.table, schema = parsedPath.schema, diff --git a/client/src/test/scala/io/delta/sharing/client/DeltaSharingRestClientSuite.scala b/client/src/test/scala/io/delta/sharing/client/DeltaSharingRestClientSuite.scala index 441d81ff6..c9cead44e 100644 --- a/client/src/test/scala/io/delta/sharing/client/DeltaSharingRestClientSuite.scala +++ b/client/src/test/scala/io/delta/sharing/client/DeltaSharingRestClientSuite.scala @@ -40,29 +40,30 @@ class DeltaSharingRestClientSuite extends DeltaSharingIntegrationTest { import DeltaSharingRestClient._ test("parsePath") { - assert(DeltaSharingRestClient.parsePath("file:///foo/bar#a.b.c") == - ParsedDeltaSharingTablePath("file:///foo/bar", "a", "b", "c")) - assert(DeltaSharingRestClient.parsePath("file:///foo/bar#bar#a.b.c") == + val emptyShareCredentialsOptions: Map[String, String] = Map.empty + assert(DeltaSharingRestClient.parsePath("file:///foo/bar#a.b.c", emptyShareCredentialsOptions) == + ParsedDeltaSharingTablePath("file:///foo/bar", "a", "b", "c"), emptyShareCredentialsOptions) + assert(DeltaSharingRestClient.parsePath("file:///foo/bar#bar#a.b.c", emptyShareCredentialsOptions) == ParsedDeltaSharingTablePath("file:///foo/bar#bar", "a", "b", "c")) - assert(DeltaSharingRestClient.parsePath("file:///foo/bar#bar#a.b.c ") == + assert(DeltaSharingRestClient.parsePath("file:///foo/bar#bar#a.b.c ", emptyShareCredentialsOptions) == ParsedDeltaSharingTablePath("file:///foo/bar#bar", "a", "b", "c ")) intercept[IllegalArgumentException] { - DeltaSharingRestClient.parsePath("file:///foo/bar") + DeltaSharingRestClient.parsePath("file:///foo/bar", emptyShareCredentialsOptions) } intercept[IllegalArgumentException] { - DeltaSharingRestClient.parsePath("file:///foo/bar#a.b") + DeltaSharingRestClient.parsePath("file:///foo/bar#a.b", emptyShareCredentialsOptions) } intercept[IllegalArgumentException] { - DeltaSharingRestClient.parsePath("file:///foo/bar#a.b.c.d") + DeltaSharingRestClient.parsePath("file:///foo/bar#a.b.c.d", emptyShareCredentialsOptions) } intercept[IllegalArgumentException] { - DeltaSharingRestClient.parsePath("#a.b.c") + DeltaSharingRestClient.parsePath("#a.b.c", emptyShareCredentialsOptions) } intercept[IllegalArgumentException] { - DeltaSharingRestClient.parsePath("foo#a.b.") + DeltaSharingRestClient.parsePath("foo#a.b.", emptyShareCredentialsOptions) } intercept[IllegalArgumentException] { - DeltaSharingRestClient.parsePath("foo#a.b.c.") + DeltaSharingRestClient.parsePath("foo#a.b.c.", emptyShareCredentialsOptions) } } diff --git a/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingDataSource.scala b/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingDataSource.scala index 41258941d..5baead021 100644 --- a/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingDataSource.scala +++ b/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingDataSource.scala @@ -48,7 +48,8 @@ private[sharing] class DeltaSharingDataSource val path = options.options.getOrElse("path", throw DeltaSharingErrors.pathNotSpecifiedException) val deltaLog = RemoteDeltaLog( - path, forStreaming = false, responseFormat = options.responseFormat + path, options.shareCredentialsOptions, + forStreaming = false, responseFormat = options.responseFormat ) deltaLog.createRelation(options.versionAsOf, options.timestampAsOf, options.cdfOptions) } @@ -69,7 +70,8 @@ private[sharing] class DeltaSharingDataSource val path = options.options.getOrElse("path", throw DeltaSharingErrors.pathNotSpecifiedException) val deltaLog = RemoteDeltaLog( - path, forStreaming = true, responseFormat = options.responseFormat + path, options.shareCredentialsOptions, + forStreaming = true, responseFormat = options.responseFormat ) val schemaToUse = deltaLog.snapshot().schema if (schemaToUse.isEmpty) { @@ -95,7 +97,9 @@ private[sharing] class DeltaSharingDataSource } val options = new DeltaSharingOptions(parameters) val path = options.options.getOrElse("path", throw DeltaSharingErrors.pathNotSpecifiedException) - val deltaLog = RemoteDeltaLog(path, forStreaming = true, options.responseFormat) + val deltaLog = RemoteDeltaLog( + path, options.shareCredentialsOptions, forStreaming = true, options.responseFormat + ) DeltaSharingSource(SparkSession.active, deltaLog, options) } diff --git a/spark/src/test/scala/io/delta/sharing/spark/DeltaSharingSourceCDFSuite.scala b/spark/src/test/scala/io/delta/sharing/spark/DeltaSharingSourceCDFSuite.scala index e68eb512c..a77ec4c5d 100644 --- a/spark/src/test/scala/io/delta/sharing/spark/DeltaSharingSourceCDFSuite.scala +++ b/spark/src/test/scala/io/delta/sharing/spark/DeltaSharingSourceCDFSuite.scala @@ -40,6 +40,7 @@ class DeltaSharingSourceCDFSuite extends QueryTest with SharedSparkSession with DeltaSharingIntegrationTest { import testImplicits._ + lazy val shareCredentialsOptions: Map[String, String] = Map.empty // VERSION 0: CREATE TABLE // VERSION 1: INSERT 3 rows, 3 add files @@ -66,7 +67,7 @@ class DeltaSharingSourceCDFSuite extends QueryTest // VERSION 4: REMOVE 4 rows, 2 remove files lazy val cdfTablePath = testProfileFile.getCanonicalPath + "#share8.default.streaming_cdf_table" - lazy val deltaLog = RemoteDeltaLog(cdfTablePath, forStreaming = true) + lazy val deltaLog = RemoteDeltaLog(cdfTablePath, shareCredentialsOptions, forStreaming = true) def getSource(parameters: Map[String, String]): DeltaSharingSource = { val options = new DeltaSharingOptions(parameters ++ Map("readChangeFeed" -> "true")) diff --git a/spark/src/test/scala/io/delta/sharing/spark/DeltaSharingSourceParamsSuite.scala b/spark/src/test/scala/io/delta/sharing/spark/DeltaSharingSourceParamsSuite.scala index dbe94ad7b..426627f30 100644 --- a/spark/src/test/scala/io/delta/sharing/spark/DeltaSharingSourceParamsSuite.scala +++ b/spark/src/test/scala/io/delta/sharing/spark/DeltaSharingSourceParamsSuite.scala @@ -26,6 +26,7 @@ class DeltaSharingSourceParamsSuite extends QueryTest with SharedSparkSession with DeltaSharingIntegrationTest { import testImplicits._ + lazy val shareCredentialsOptions: Map[String, String] = Map.empty // VERSION 0: CREATE TABLE // VERSION 1: INSERT 3 rows, 3 add files @@ -33,7 +34,7 @@ class DeltaSharingSourceParamsSuite extends QueryTest // VERSION 3: UPDATE 1 row, 1 remove file and 1 add file lazy val tablePath = testProfileFile.getCanonicalPath + "#share8.default.cdf_table_cdf_enabled" - lazy val deltaLog = RemoteDeltaLog(tablePath, forStreaming = true) + lazy val deltaLog = RemoteDeltaLog(tablePath, shareCredentialsOptions, forStreaming = true) def getSource(parameters: Map[String, String]): DeltaSharingSource = { val options = new DeltaSharingOptions(parameters) diff --git a/spark/src/test/scala/io/delta/sharing/spark/DeltaSharingSourceSuite.scala b/spark/src/test/scala/io/delta/sharing/spark/DeltaSharingSourceSuite.scala index ce852254b..c49f9c991 100644 --- a/spark/src/test/scala/io/delta/sharing/spark/DeltaSharingSourceSuite.scala +++ b/spark/src/test/scala/io/delta/sharing/spark/DeltaSharingSourceSuite.scala @@ -42,6 +42,7 @@ class DeltaSharingSourceSuite extends QueryTest // https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html import testImplicits._ + lazy val shareCredentialsOptions: Map[String, String] = Map.empty // VERSION 0: CREATE TABLE // VERSION 1: INSERT 3 rows, 3 add files @@ -67,7 +68,7 @@ class DeltaSharingSourceSuite extends QueryTest lazy val toNotNullTable = testProfileFile.getCanonicalPath + "#share8.default.streaming_null_to_notnull" - lazy val deltaLog = RemoteDeltaLog(tablePath, forStreaming = true) + lazy val deltaLog = RemoteDeltaLog(tablePath, shareCredentialsOptions, forStreaming = true) def getSource(parameters: Map[String, String]): DeltaSharingSource = { val options = new DeltaSharingOptions(parameters)