Skip to content

Commit

Permalink
a couple small changse
Browse files Browse the repository at this point in the history
  • Loading branch information
linzhou-db committed Oct 10, 2024
1 parent 0b97eb4 commit 2e79c9d
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -983,8 +983,8 @@ class DeltaSharingRestClient(
)
case None =>
logWarning(s"Client sets ${DELTA_SHARING_INCLUDE_END_STREAM_ACTION}=true in the" +
s" header, but server didn't respond with the header(${capabilities}), " +
s"for query($dsQueryId)."
s" header, but server didn't respond with the header(${capabilities})," +
getDsQueryIdForLogging
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,6 @@ class DeltaSharingRestClientSuite extends DeltaSharingIntegrationTest {
}

integrationTest("Check headers") {
val httpRequest = new HttpGet("random_url")

def checkUserAgent(request: HttpRequestBase, containsStreaming: Boolean): Unit = {
val h = request.getFirstHeader(HttpHeaders.USER_AGENT).getValue
if (containsStreaming) {
Expand Down Expand Up @@ -109,38 +107,39 @@ class DeltaSharingRestClientSuite extends DeltaSharingIntegrationTest {
(false, true),
(false, false)
).foreach { case (forStreaming, endStreamActionEnabled) =>
var client = new DeltaSharingRestClient(
val httpRequest = new HttpGet("random_url")
var request = new DeltaSharingRestClient(
testProfileProvider,
forStreaming = forStreaming,
endStreamActionEnabled = endStreamActionEnabled,
readerFeatures = "willBeIgnored")
.prepareHeaders(httpRequest, setIncludeEndStreamAction = endStreamActionEnabled)
checkUserAgent(client, forStreaming)
checkDeltaSharingCapabilities(client, "parquet", "", endStreamActionEnabled)
checkUserAgent(request, forStreaming)
checkDeltaSharingCapabilities(request, "parquet", "", endStreamActionEnabled)

val readerFeatures = "deletionVectors,columnMapping,timestampNTZ"
client = new DeltaSharingRestClient(
request = new DeltaSharingRestClient(
testProfileProvider,
forStreaming = forStreaming,
endStreamActionEnabled = endStreamActionEnabled,
responseFormat = RESPONSE_FORMAT_DELTA,
readerFeatures = readerFeatures)
.prepareHeaders(httpRequest, setIncludeEndStreamAction = endStreamActionEnabled)
checkUserAgent(client, forStreaming)
checkUserAgent(request, forStreaming)
checkDeltaSharingCapabilities(
client, "delta", s";$READER_FEATURES=$readerFeatures", endStreamActionEnabled
request, "delta", s";$READER_FEATURES=$readerFeatures", endStreamActionEnabled
)

client = new DeltaSharingRestClient(
request = new DeltaSharingRestClient(
testProfileProvider,
forStreaming = forStreaming,
endStreamActionEnabled = endStreamActionEnabled,
responseFormat = s"$RESPONSE_FORMAT_DELTA,$RESPONSE_FORMAT_PARQUET",
readerFeatures = readerFeatures)
.prepareHeaders(httpRequest, setIncludeEndStreamAction = endStreamActionEnabled)
checkUserAgent(client, forStreaming)
checkUserAgent(request, forStreaming)
checkDeltaSharingCapabilities(
client, s"delta,parquet", s";$READER_FEATURES=$readerFeatures", endStreamActionEnabled
request, s"delta,parquet", s";$READER_FEATURES=$readerFeatures", endStreamActionEnabled
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,22 @@ class ConfUtilsSuite extends SparkFunSuite {
}.getMessage.contains(TIMEOUT_CONF)
}

test("includeEndStreamAction") {
assert(includeEndStreamAction(newConf()) == true)
assert(includeEndStreamAction(newConf(Map(INCLUDE_END_STREAM_ACTION_CONF -> "false"))) == false)
assert(includeEndStreamAction(newConf(Map(INCLUDE_END_STREAM_ACTION_CONF -> "random"))) == true)

assert(includeEndStreamAction(newSqlConf()) == true)
assert(
includeEndStreamAction(newSqlConf(Map(INCLUDE_END_STREAM_ACTION_CONF -> "false"))) == false
)
intercept[IllegalArgumentException] {
assert(
includeEndStreamAction(newSqlConf(Map(INCLUDE_END_STREAM_ACTION_CONF -> "random"))) == false
)
}.getMessage.contains(INCLUDE_END_STREAM_ACTION_CONF)
}

test("maxConnections") {
assert(maxConnections(newConf()) == MAX_CONNECTION_DEFAULT)
assert(maxConnections(newConf(Map(MAX_CONNECTION_CONF -> "100"))) == 100)
Expand Down

0 comments on commit 2e79c9d

Please sign in to comment.