Skip to content

Commit

Permalink
Support version and timestamp for DeltaSharingClient.getMetadata
Browse files Browse the repository at this point in the history
  • Loading branch information
linzhou-db committed Aug 19, 2023
1 parent 7a0b1d0 commit 7d7edd5
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,10 @@ trait DeltaSharingClient {

def getTableVersion(table: Table, startingTimestamp: Option[String] = None): Long

def getMetadata(table: Table): DeltaTableMetadata
def getMetadata(
table: Table,
versionAsOf: Option[Long] = None,
timestampAsOf: Option[String] = None): DeltaTableMetadata

def getFiles(
table: Table,
Expand Down Expand Up @@ -209,12 +212,18 @@ class DeltaSharingRestClient(
}
}

def getMetadata(table: Table): DeltaTableMetadata = {
def getMetadata(
table: Table,
versionAsOf: Option[Long] = None,
timestampAsOf: Option[String] = None): DeltaTableMetadata = {
val encodedShareName = URLEncoder.encode(table.share, "UTF-8")
val encodedSchemaName = URLEncoder.encode(table.schema, "UTF-8")
val encodedTableName = URLEncoder.encode(table.name, "UTF-8")
val encodedParams = getEncodedMetadataParams(versionAsOf, timestampAsOf)

val target = getTargetUrl(
s"/shares/$encodedShareName/schemas/$encodedSchemaName/tables/$encodedTableName/metadata")
s"/shares/$encodedShareName/schemas/$encodedSchemaName/tables/$encodedTableName/metadata" +
s"$encodedParams")
val (version, respondedFormat, lines) = getNDJson(target)
if (responseFormat != respondedFormat) {
// This could only happen when the asked format is delta and the server doesn't support
Expand Down Expand Up @@ -545,6 +554,16 @@ class DeltaSharingRestClient(
}
}

private def getEncodedMetadataParams(
versionAsOf: Option[Long], timestampAsOf: Option[String]): String = {
val paramMap = versionAsOf.map("version" -> _.toString).toMap ++
timestampAsOf.map("timestamp" -> _).toMap
val params = paramMap.map {
case (key, value) => s"$key=${URLEncoder.encode(value)}"
}.mkString("&")
Option(params).map{x => if (x.nonEmpty) { "?" + x } else { "" }}.get
}

private def getEncodedCDFParams(
cdfOptions: Map[String, String],
includeHistoricalMetadata: Boolean): String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,65 @@ class DeltaSharingRestClientSuite extends DeltaSharingIntegrationTest {
}
}

integrationTest("getMetadata with parameters") {
val client = new DeltaSharingRestClient(testProfileProvider, sslTrustAll = true)
try {
val metadataV0 = Metadata(
id = "1e2201ff-12ad-4c3b-a539-4d34e9e36680",
format = Format(),
schemaString = """{"type":"struct","fields":[{"name":"name","type":"string","nullable":false,"metadata":{}}]}""",
configuration = Map("enableChangeDataFeed" -> "true"),
partitionColumns = Nil)
val responseV0 =
client.getMetadata(
Table(name = "streaming_notnull_to_null", schema = "default", share = "share8"),
versionAsOf = Some(0)
)
assert(Protocol(minReaderVersion = 1) == responseV0.protocol)
assert(metadataV0 == responseV0.metadata)

val metadataV2 = metadataV0.copy(
schemaString = """{"type":"struct","fields":[{"name":"name","type":"string","nullable":true,"metadata":{}}]}""",
)
val responseV2 =
client.getMetadata(
Table(name = "streaming_notnull_to_null", schema = "default", share = "share8"),
versionAsOf = Some(2)
)
assert(Protocol(minReaderVersion = 1) == responseV0.protocol)
assert(metadataV2 == responseV2.metadata)

val responseTimestamp =
client.getMetadata(
Table(name = "streaming_notnull_to_null", schema = "default", share = "share8"),
timestampAsOf = Some("2022-11-13T08:10:50Z")
)
assert(Protocol(minReaderVersion = 1) == responseV0.protocol)
assert(metadataV2 == responseTimestamp.metadata)
} finally {
client.close()
}
}

integrationTest("getMetadata with parameters - exceptions") {
val client = new DeltaSharingRestClient(testProfileProvider, sslTrustAll = true)
var errorMessage = intercept[UnexpectedHttpStatus] {
client.getMetadata(
Table(name = "streaming_notnull_to_null", schema = "default", share = "share8"),
versionAsOf = Some(6)
)
}.getMessage
assert(errorMessage.contains("Cannot time travel Delta table to version 6"))

errorMessage = intercept[UnexpectedHttpStatus] {
client.getMetadata(
Table(name = "streaming_notnull_to_null", schema = "default", share = "share8"),
timestampAsOf = Some("2021-01-01T00:00:00Z")
)
}.getMessage
assert(errorMessage.contains("is before the earliest version available"))
}

integrationTest("getFiles") {
Seq(true, false).foreach { paginationEnabled =>
val client = new DeltaSharingRestClient(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,10 @@ class TestDeltaSharingClient(

override def listAllTables(): Seq[Table] = Nil

override def getMetadata(table: Table): DeltaTableMetadata = {
override def getMetadata(
table: Table,
versionAsOf: Option[Long] = None,
timestampAsOf: Option[String] = None): DeltaTableMetadata = {
DeltaTableMetadata(0, Protocol(0), metadata)
}

Expand Down

0 comments on commit 7d7edd5

Please sign in to comment.