Skip to content

Commit

Permalink
Merge branch 'main' of github.com:delta-io/delta-sharing into ESA-pm-…
Browse files Browse the repository at this point in the history
…cleanup
  • Loading branch information
linzhou-db committed Oct 7, 2024
2 parents 5c173e3 + 88496bd commit 7c15178
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ class DeltaSharingRestClient(
var (filteredLines, endStreamAction) = maybeExtractEndStreamAction(lines)
if (endStreamAction.isEmpty) {
logWarning(
s"EndStreamAction is not returned in the response" + getDsQueryIdForLogging
s"EndStreamAction is not returned in the paginated response" + getDsQueryIdForLogging
)
}

Expand Down Expand Up @@ -614,7 +614,7 @@ class DeltaSharingRestClient(
endStreamAction = res._2
if (endStreamAction.isEmpty) {
logWarning(
s"EndStreamAction is not returned in the response" + getDsQueryIdForLogging
s"EndStreamAction is not returned in the paginated response" + getDsQueryIdForLogging
)
}
// Throw an error if the first page is expiring before we get all pages
Expand Down Expand Up @@ -710,7 +710,7 @@ class DeltaSharingRestClient(
var (filteredLines, endStreamAction) = maybeExtractEndStreamAction(response.lines)
if (endStreamAction.isEmpty) {
logWarning(
s"EndStreamAction is not returned in the response" + getDsQueryIdForLogging
s"EndStreamAction is not returned in the paginated response" + getDsQueryIdForLogging
)
}
val protocol = filteredLines(0)
Expand Down Expand Up @@ -741,7 +741,7 @@ class DeltaSharingRestClient(
endStreamAction = res._2
if (endStreamAction.isEmpty) {
logWarning(
s"EndStreamAction is not returned in the response" + getDsQueryIdForLogging
s"EndStreamAction is not returned in the paginated response" + getDsQueryIdForLogging
)
}
// Throw an error if the first page is expiring before we get all pages
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,37 +84,62 @@ class DeltaSharingRestClientSuite extends DeltaSharingIntegrationTest {
assert(h.contains(" java/"))
}

def checkDeltaSharingCapabilities(request: HttpRequestBase, expected: String): Unit = {
def getEndStreamActionHeader(includeEndStreamAction: Boolean): String = {
if (includeEndStreamAction) {
s";$DELTA_SHARING_INCLUDE_END_STREAM_ACTION=true"
} else {
""
}
}

def checkDeltaSharingCapabilities(
request: HttpRequestBase,
responseFormat: String,
readerFeatures: String,
includeEndStreamAction: Boolean): Unit = {
val expected = s"${RESPONSE_FORMAT}=$responseFormat$readerFeatures" +
getEndStreamActionHeader(includeEndStreamAction)
val h = request.getFirstHeader(DELTA_SHARING_CAPABILITIES_HEADER)
assert(h.getValue == expected)
}

var httpRequestBase = new DeltaSharingRestClient(
testProfileProvider, forStreaming = false, readerFeatures = "willBeIgnored").prepareHeaders(httpRequest)
checkUserAgent(httpRequestBase, false)
checkDeltaSharingCapabilities(httpRequestBase, s"${RESPONSE_FORMAT}=parquet;$DELTA_SHARING_INCLUDE_END_STREAM_ACTION=true")
Seq(
(true, true),
(true, false),
(false, true),
(false, false)
).foreach { case (forStreaming, includeEndStreamAction) =>
var client = new DeltaSharingRestClient(
testProfileProvider,
forStreaming = forStreaming,
includeEndStreamAction = includeEndStreamAction,
readerFeatures = "willBeIgnored").prepareHeaders(httpRequest)
checkUserAgent(client, forStreaming)
checkDeltaSharingCapabilities(client, "parquet", "", includeEndStreamAction)

val readerFeatures = "deletionVectors,columnMapping,timestampNTZ"
httpRequestBase = new DeltaSharingRestClient(
testProfileProvider,
forStreaming = true,
responseFormat = RESPONSE_FORMAT_DELTA,
readerFeatures = readerFeatures).prepareHeaders(httpRequest)
checkUserAgent(httpRequestBase, true)
checkDeltaSharingCapabilities(
httpRequestBase, s"$RESPONSE_FORMAT=delta;$READER_FEATURES=$readerFeatures;$DELTA_SHARING_INCLUDE_END_STREAM_ACTION=true"
)
val readerFeatures = "deletionVectors,columnMapping,timestampNTZ"
client = new DeltaSharingRestClient(
testProfileProvider,
forStreaming = forStreaming,
includeEndStreamAction = includeEndStreamAction,
responseFormat = RESPONSE_FORMAT_DELTA,
readerFeatures = readerFeatures).prepareHeaders(httpRequest)
checkUserAgent(client, forStreaming)
checkDeltaSharingCapabilities(
client, "delta", s";$READER_FEATURES=$readerFeatures", includeEndStreamAction
)

httpRequestBase = new DeltaSharingRestClient(
testProfileProvider,
forStreaming = true,
responseFormat = s"$RESPONSE_FORMAT_DELTA,$RESPONSE_FORMAT_PARQUET",
readerFeatures = readerFeatures,
includeEndStreamAction = false).prepareHeaders(httpRequest)
checkUserAgent(httpRequestBase, true)
checkDeltaSharingCapabilities(
httpRequestBase, s"responseformat=delta,parquet;readerfeatures=$readerFeatures"
)
client = new DeltaSharingRestClient(
testProfileProvider,
forStreaming = forStreaming,
includeEndStreamAction = includeEndStreamAction,
responseFormat = s"$RESPONSE_FORMAT_DELTA,$RESPONSE_FORMAT_PARQUET",
readerFeatures = readerFeatures).prepareHeaders(httpRequest)
checkUserAgent(client, forStreaming)
checkDeltaSharingCapabilities(
client, s"delta,parquet", s";$READER_FEATURES=$readerFeatures", includeEndStreamAction
)
}
}

integrationTest("listAllTables") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,7 @@ class DeltaSharingService(serverConfig: ServerConfig) {
}

val responseFormatSet = getResponseFormatSet(capabilitiesMap)
val includeEndStreamAction = getEndStreamActionInHeader(capabilitiesMap)
val includeEndStreamAction = getRequestEndStreamAction(capabilitiesMap)
val queryResult = deltaSharedTableLoader.loadTable(tableConfig).queryCDF(
getCdfOptionsMap(
Option(startingVersion),
Expand Down Expand Up @@ -776,12 +776,6 @@ object DeltaSharingService {
endingTimestamp.map(DeltaDataSource.CDF_END_TIMESTAMP_KEY -> _)).toMap
}

private def getEndStreamActionInHeader(headerCapabilities: Map[String, String]): Boolean = {
headerCapabilities.get(
DELTA_SHARING_INCLUDE_END_STREAM_ACTION
).map(_.toBoolean).getOrElse(false)
}

private[server] def getResponseFormatSet(headerCapabilities: Map[String, String]): Set[String] = {
headerCapabilities.get(DELTA_SHARING_RESPONSE_FORMAT).getOrElse(
DeltaSharedTable.RESPONSE_FORMAT_PARQUET
Expand Down

0 comments on commit 7c15178

Please sign in to comment.