Skip to content

Commit

Permalink
Move parse path from RemoteDeltaLog to DeltaSharingRestClient (#374)
Browse files Browse the repository at this point in the history
* Update Python connector version to 0.8.0

* Setting version to 0.8.0

* Setting version to 0.8.1-SNAPSHOT

* Move parsePath from RemoteDeltaLog to DeltaSharingRestClient

* revert

* move parsePath from RemoteDeltaLog to DeltaSharingRestClient

* revert

* fix
  • Loading branch information
linzhou-db authored Aug 15, 2023
1 parent 58862af commit 2965b24
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -809,6 +809,27 @@ object DeltaSharingRestClient extends Logging {
if (value == null) "<unknown>" else value.replace(' ', '_')
}

/**
* Parse the user provided path `profile_file#share.schema.share` to
* `(profile_file, share, schema, share)`.
*/
def parsePath(path: String): (String, String, String, String) = {
val shapeIndex = path.lastIndexOf('#')
if (shapeIndex < 0) {
throw new IllegalArgumentException(s"path $path is not valid")
}
val profileFile = path.substring(0, shapeIndex)
val tableSplits = path.substring(shapeIndex + 1).split("\\.", -1)
if (tableSplits.length != 3) {
throw new IllegalArgumentException(s"path $path is not valid")
}
if (profileFile.isEmpty || tableSplits(0).isEmpty ||
tableSplits(1).isEmpty || tableSplits(2).isEmpty) {
throw new IllegalArgumentException(s"path $path is not valid")
}
(profileFile, tableSplits(0), tableSplits(1), tableSplits(2))
}

def apply(
profileFile: String,
forStreaming: Boolean = false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,32 @@ import io.delta.sharing.client.util.UnexpectedHttpStatus
// scalastyle:off maxLineLength
class DeltaSharingRestClientSuite extends DeltaSharingIntegrationTest {

test("parsePath") {
assert(DeltaSharingRestClient.parsePath("file:///foo/bar#a.b.c") == ("file:///foo/bar", "a", "b", "c"))
assert(DeltaSharingRestClient.parsePath("file:///foo/bar#bar#a.b.c") ==
("file:///foo/bar#bar", "a", "b", "c"))
assert(DeltaSharingRestClient.parsePath("file:///foo/bar#bar#a.b.c ") ==
("file:///foo/bar#bar", "a", "b", "c "))
intercept[IllegalArgumentException] {
DeltaSharingRestClient.parsePath("file:///foo/bar")
}
intercept[IllegalArgumentException] {
DeltaSharingRestClient.parsePath("file:///foo/bar#a.b")
}
intercept[IllegalArgumentException] {
DeltaSharingRestClient.parsePath("file:///foo/bar#a.b.c.d")
}
intercept[IllegalArgumentException] {
DeltaSharingRestClient.parsePath("#a.b.c")
}
intercept[IllegalArgumentException] {
DeltaSharingRestClient.parsePath("foo#a.b.")
}
intercept[IllegalArgumentException] {
DeltaSharingRestClient.parsePath("foo#a.b.c.")
}
}

integrationTest("Check headers") {
val httpRequest = new HttpGet("random_url")

Expand Down
23 changes: 1 addition & 22 deletions spark/src/main/scala/io/delta/sharing/spark/RemoteDeltaLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -109,32 +109,11 @@ private[sharing] object RemoteDeltaLog {
_addFileEncoder.copy()
}

/**
* Parse the user provided path `profile_file#share.schema.share` to
* `(profile_file, share, schema, share)`.
*/
def parsePath(path: String): (String, String, String, String) = {
val shapeIndex = path.lastIndexOf('#')
if (shapeIndex < 0) {
throw new IllegalArgumentException(s"path $path is not valid")
}
val profileFile = path.substring(0, shapeIndex)
val tableSplits = path.substring(shapeIndex + 1).split("\\.", -1)
if (tableSplits.length != 3) {
throw new IllegalArgumentException(s"path $path is not valid")
}
if (profileFile.isEmpty || tableSplits(0).isEmpty ||
tableSplits(1).isEmpty || tableSplits(2).isEmpty) {
throw new IllegalArgumentException(s"path $path is not valid")
}
(profileFile, tableSplits(0), tableSplits(1), tableSplits(2))
}

def apply(
path: String,
forStreaming: Boolean = false,
responseFormat: String = DeltaSharingOptions.RESPONSE_FORMAT_PARQUET): RemoteDeltaLog = {
val (profileFile, share, schema, table) = parsePath(path)
val (profileFile, share, schema, table) = DeltaSharingRestClient.parsePath(path)
val client = DeltaSharingRestClient(profileFile, forStreaming, responseFormat)
val deltaSharingTable = DeltaSharingTable(name = table, schema = schema, share = share)
new RemoteDeltaLog(deltaSharingTable, new Path(path), client)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,32 +46,6 @@ import io.delta.sharing.spark.filters.{BaseOp, OpConverter}

class RemoteDeltaLogSuite extends SparkFunSuite with SharedSparkSession {

test("parsePath") {
assert(RemoteDeltaLog.parsePath("file:///foo/bar#a.b.c") == ("file:///foo/bar", "a", "b", "c"))
assert(RemoteDeltaLog.parsePath("file:///foo/bar#bar#a.b.c") ==
("file:///foo/bar#bar", "a", "b", "c"))
assert(RemoteDeltaLog.parsePath("file:///foo/bar#bar#a.b.c ") ==
("file:///foo/bar#bar", "a", "b", "c "))
intercept[IllegalArgumentException] {
RemoteDeltaLog.parsePath("file:///foo/bar")
}
intercept[IllegalArgumentException] {
RemoteDeltaLog.parsePath("file:///foo/bar#a.b")
}
intercept[IllegalArgumentException] {
RemoteDeltaLog.parsePath("file:///foo/bar#a.b.c.d")
}
intercept[IllegalArgumentException] {
RemoteDeltaLog.parsePath("#a.b.c")
}
intercept[IllegalArgumentException] {
RemoteDeltaLog.parsePath("foo#a.b.")
}
intercept[IllegalArgumentException] {
RemoteDeltaLog.parsePath("foo#a.b.c.")
}
}

test("RemoteSnapshot getFiles with limit and jsonPredicateHints") {
val spark = SparkSession.active
spark.sessionState.conf.setConfString("spark.delta.sharing.jsonPredicateHints.enabled", "true")
Expand Down

0 comments on commit 2965b24

Please sign in to comment.