Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for delta sharing profile via options #573

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -1304,17 +1304,29 @@ object DeltaSharingRestClient extends Logging {
* Parse the user provided path `profile_file#share.schema.table` to
* ParsedDeltaSharingTablePath.
*/
def parsePath(path: String): ParsedDeltaSharingTablePath = {
def parsePath(
path: String,
shareCredentialsOptions: Map[String, String]): ParsedDeltaSharingTablePath = {
val shapeIndex = path.lastIndexOf('#')
if (shapeIndex < 0) {
throw new IllegalArgumentException(s"path $path is not valid")

val (profileFile, tablePath) = {
if (shareCredentialsOptions.nonEmpty && shapeIndex < 0) {
("", path)
} else if (shareCredentialsOptions.nonEmpty && shapeIndex >= 0) {
throw new IllegalArgumentException(
"cannot specify both share credentials options and a profile file path")
} else if (shareCredentialsOptions.isEmpty && shapeIndex < 0) {
throw new IllegalArgumentException(s"path $path is not valid")
} else {
(path.substring(0, shapeIndex), path.substring(shapeIndex + 1))
}
}
val profileFile = path.substring(0, shapeIndex)
val tableSplits = path.substring(shapeIndex + 1).split("\\.", -1)

val tableSplits = tablePath.split("\\.", -1)
if (tableSplits.length != 3) {
throw new IllegalArgumentException(s"path $path is not valid")
}
if (profileFile.isEmpty || tableSplits(0).isEmpty ||
if ((profileFile.isEmpty && shareCredentialsOptions.isEmpty) || tableSplits(0).isEmpty ||
tableSplits(1).isEmpty || tableSplits(2).isEmpty) {
throw new IllegalArgumentException(s"path $path is not valid")
}
Expand All @@ -1328,19 +1340,23 @@ object DeltaSharingRestClient extends Logging {

def apply(
profileFile: String,
shareCredentialsOptions: Map[String, String],
forStreaming: Boolean = false,
responseFormat: String = RESPONSE_FORMAT_PARQUET,
readerFeatures: String = ""
): DeltaSharingClient = {
val sqlConf = SparkSession.active.sessionState.conf

val profileProviderClass = ConfUtils.profileProviderClass(sqlConf)
val profileProvider: DeltaSharingProfileProvider =
val profileProvider: DeltaSharingProfileProvider = if (shareCredentialsOptions.nonEmpty) {
new DeltaSharingOptionsProfileProvider(shareCredentialsOptions)
} else {
val profileProviderClass = ConfUtils.profileProviderClass(sqlConf)
Class.forName(profileProviderClass)
.getConstructor(classOf[Configuration], classOf[String])
.newInstance(SparkSession.active.sessionState.newHadoopConf(),
profileFile)
.asInstanceOf[DeltaSharingProfileProvider]
}

// This is a flag to test the local https server. Should never be used in production.
val sslTrustAll = ConfUtils.sslTrustAll(sqlConf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ sealed trait DeltaSharingProfile {
private [client] def validate(): Unit = {
if (shareCredentialsVersion.isEmpty) {
throw new IllegalArgumentException(
"Cannot find the 'shareCredentialsVersion' field in the profile file")
"Cannot find the 'shareCredentialsVersion' field in the profile")
}

if (shareCredentialsVersion.get > DeltaSharingProfile.CURRENT) {
Expand Down Expand Up @@ -143,14 +143,14 @@ object DeltaSharingProfile {
private [client] def validateNotNullAndEmpty(fieldValue: String,
fieldName: String): Unit = {
if (fieldValue == null || fieldValue.isEmpty) {
throw new IllegalArgumentException(s"Cannot find the '$fieldName' field in the profile file")
throw new IllegalArgumentException(s"Cannot find the '$fieldName' field in the profile")
}
}

private [client] def validateNotNullAndEmpty(fieldValue: Option[Long],
fieldName: String): Unit = {
if (fieldValue == null || fieldValue.isEmpty) {
throw new IllegalArgumentException(s"Cannot find the '$fieldName' field in the profile file")
throw new IllegalArgumentException(s"Cannot find the '$fieldName' field in the profile")
}
}
}
Expand Down Expand Up @@ -199,3 +199,21 @@ private[sharing] class DeltaSharingFileProfileProvider(

override def getProfile: DeltaSharingProfile = profile
}

/**
* Load [[DeltaSharingProfile]] from options.
*/
private[sharing] class DeltaSharingOptionsProfileProvider(
shareCredentialsOptions: Map[String, String]) extends DeltaSharingProfileProvider {

val profile = {
val profile = {
JsonUtils.fromJson[DeltaSharingProfile](JsonUtils.toJson(shareCredentialsOptions))
}
profile.validate()

profile
}

override def getProfile: DeltaSharingProfile = profile
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ trait DeltaSharingReadOptions extends DeltaSharingOptionParser {
str
}.getOrElse(RESPONSE_FORMAT_PARQUET)

val shareCredentialsOptions: Map[String, String] = prepareShareCredentialsOptions()

def isTimeTravel: Boolean = versionAsOf.isDefined || timestampAsOf.isDefined

// Parse the input timestamp string and TimestampType, and generate a formatted timestamp string
Expand Down Expand Up @@ -134,6 +136,21 @@ trait DeltaSharingReadOptions extends DeltaSharingOptionParser {
}
}

private def prepareShareCredentialsOptions(): Map[String, String] = {
validShareCredentialsOptions.filter { option =>
options.contains(option._1)
}.map { option =>
val key = option._1
val value = key match {
case PROFILE_EXPIRATION_TIME =>
getFormattedTimestamp(options.get(key).get)
case _ =>
options.get(key).get
}
key -> value
}
}

private def validateOneStartingOption(): Unit = {
if (startingTimestamp.isDefined && startingVersion.isDefined) {
throw DeltaSharingErrors.versionAndTimestampBothSetException(
Expand Down Expand Up @@ -193,10 +210,19 @@ object DeltaSharingOptions extends Logging {
val TIME_TRAVEL_TIMESTAMP = "timestampAsOf"

val RESPONSE_FORMAT = "responseFormat"

val RESPONSE_FORMAT_PARQUET = "parquet"
val RESPONSE_FORMAT_DELTA = "delta"

val PROFILE_SHARE_CREDENTIALS_VERSION = "shareCredentialsVersion"
val PROFILE_TYPE = "shareCredentialsType"
val PROFILE_ENDPOINT = "endpoint"
val PROFILE_TOKEN_ENDPOINT = "tokenEndpoint"
val PROFILE_CLIENT_ID = "clientId"
val PROFILE_CLIENT_SECRET = "clientSecret"
val PROFILE_SCOPE = "scope"
val PROFILE_BEARER_TOKEN = "bearerToken"
val PROFILE_EXPIRATION_TIME = "expirationTime"

val validCdfOptions = Map(
CDF_READ_OPTION -> "",
CDF_READ_OPTION_LEGACY -> "",
Expand All @@ -205,6 +231,18 @@ object DeltaSharingOptions extends Logging {
CDF_START_VERSION -> "",
CDF_END_VERSION -> ""
)

val validShareCredentialsOptions = Map(
PROFILE_SHARE_CREDENTIALS_VERSION -> "",
PROFILE_TYPE -> "",
PROFILE_ENDPOINT -> "",
PROFILE_TOKEN_ENDPOINT -> "",
PROFILE_CLIENT_ID -> "",
PROFILE_CLIENT_SECRET -> "",
PROFILE_SCOPE -> "",
PROFILE_BEARER_TOKEN -> "",
PROFILE_EXPIRATION_TIME -> ""
)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,13 @@ private[sharing] object RemoteDeltaLog {

def apply(
path: String,
shareCredentialsOptions: Map[String, String],
forStreaming: Boolean = false,
responseFormat: String = DeltaSharingOptions.RESPONSE_FORMAT_PARQUET,
initDeltaTableMetadata: Option[DeltaTableMetadata] = None): RemoteDeltaLog = {
val parsedPath = DeltaSharingRestClient.parsePath(path)
val client = DeltaSharingRestClient(parsedPath.profileFile, forStreaming, responseFormat)
val parsedPath = DeltaSharingRestClient.parsePath(path, shareCredentialsOptions)
val client = DeltaSharingRestClient(
parsedPath.profileFile, shareCredentialsOptions, forStreaming, responseFormat)
val deltaSharingTable = DeltaSharingTable(
name = parsedPath.table,
schema = parsedPath.schema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class DeltaSharingFileProfileProviderSuite extends SparkFunSuite {
)
}
assert(e.getMessage.contains(
"Cannot find the 'shareCredentialsVersion' field in the profile file"))
"Cannot find the 'shareCredentialsVersion' field in the profile"))
}

test("shareCredentialsVersion is incorrect") {
Expand Down Expand Up @@ -126,7 +126,7 @@ class DeltaSharingFileProfileProviderSuite extends SparkFunSuite {
null
)
}
assert(e.getMessage.contains("Cannot find the 'endpoint' field in the profile file"))
assert(e.getMessage.contains("Cannot find the 'endpoint' field in the profile"))
}

test("bearerToken is missing") {
Expand All @@ -140,7 +140,7 @@ class DeltaSharingFileProfileProviderSuite extends SparkFunSuite {
null
)
}
assert(e.getMessage.contains("Cannot find the 'bearerToken' field in the profile file"))
assert(e.getMessage.contains("Cannot find the 'bearerToken' field in the profile"))
}

test("unknown field should be ignored") {
Expand Down Expand Up @@ -241,7 +241,7 @@ class DeltaSharingFileProfileProviderSuite extends SparkFunSuite {
val e = intercept[IllegalArgumentException] {
testProfile(profile, null)
}
assert(e.getMessage.contains(s"Cannot find the '$missingField' field in the profile file"))
assert(e.getMessage.contains(s"Cannot find the '$missingField' field in the profile"))
}
}

Expand Down
Loading
Loading