Skip to content

Commit

Permalink
use case class instead of tuple as function return type
Browse files Browse the repository at this point in the history
  • Loading branch information
linzhou-db committed Sep 2, 2023
1 parent aa62bc9 commit a076f16
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@ trait DeltaSharingClient {
def getProfileProvider: DeltaSharingProfileProvider = null
}

case class ParsedDeltaSharingTablePath(
profileFile: String,
share: String,
schema: String,
table: String)

private[sharing] trait PaginationResponse {
def nextPageToken: Option[String]
}
Expand Down Expand Up @@ -841,7 +847,7 @@ object DeltaSharingRestClient extends Logging {
* Parse the user provided path `profile_file#share.schema.share` to
* `(profile_file, share, schema, share)`.
*/
def parsePath(path: String): (String, String, String, String) = {
def parsePath(path: String): ParsedDeltaSharingTablePath = {
val shapeIndex = path.lastIndexOf('#')
if (shapeIndex < 0) {
throw new IllegalArgumentException(s"path $path is not valid")
Expand All @@ -855,7 +861,12 @@ object DeltaSharingRestClient extends Logging {
tableSplits(1).isEmpty || tableSplits(2).isEmpty) {
throw new IllegalArgumentException(s"path $path is not valid")
}
(profileFile, tableSplits(0), tableSplits(1), tableSplits(2))
ParsedDeltaSharingTablePath(
profileFile = profileFile,
share = tableSplits(0),
schema = tableSplits(1),
table = tableSplits(2)
)
}

def apply(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ private[sharing] class DeltaSharingFileSystem extends FileSystem {

override def getUri(): URI = URI.create(s"$SCHEME:///")

// delta-sharing:///
override def open(f: Path, bufferSize: Int): FSDataInputStream = {
val path = DeltaSharingFileSystem.decode(f)
val fetcher =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,12 @@ class DeltaSharingRestClientSuite extends DeltaSharingIntegrationTest {
import DeltaSharingRestClient._

test("parsePath") {
assert(DeltaSharingRestClient.parsePath("file:///foo/bar#a.b.c") == ("file:///foo/bar", "a", "b", "c"))
assert(DeltaSharingRestClient.parsePath("file:///foo/bar#a.b.c") ==
ParsedDeltaSharingTablePath("file:///foo/bar", "a", "b", "c"))
assert(DeltaSharingRestClient.parsePath("file:///foo/bar#bar#a.b.c") ==
("file:///foo/bar#bar", "a", "b", "c"))
ParsedDeltaSharingTablePath("file:///foo/bar#bar", "a", "b", "c"))
assert(DeltaSharingRestClient.parsePath("file:///foo/bar#bar#a.b.c ") ==
("file:///foo/bar#bar", "a", "b", "c "))
ParsedDeltaSharingTablePath("file:///foo/bar#bar", "a", "b", "c "))
intercept[IllegalArgumentException] {
DeltaSharingRestClient.parsePath("file:///foo/bar")
}
Expand Down
10 changes: 7 additions & 3 deletions spark/src/main/scala/io/delta/sharing/spark/RemoteDeltaLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,13 @@ private[sharing] object RemoteDeltaLog {
path: String,
forStreaming: Boolean = false,
responseFormat: String = DeltaSharingOptions.RESPONSE_FORMAT_PARQUET): RemoteDeltaLog = {
val (profileFile, share, schema, table) = DeltaSharingRestClient.parsePath(path)
val client = DeltaSharingRestClient(profileFile, forStreaming, responseFormat)
val deltaSharingTable = DeltaSharingTable(name = table, schema = schema, share = share)
val parsedPath = DeltaSharingRestClient.parsePath(path)
val client = DeltaSharingRestClient(parsedPath.profileFile, forStreaming, responseFormat)
val deltaSharingTable = DeltaSharingTable(
name = parsedPath.table,
schema = parsedPath.schema,
share = parsedPath.share
)
new RemoteDeltaLog(deltaSharingTable, new Path(path), client)
}
}
Expand Down

0 comments on commit a076f16

Please sign in to comment.